Merge remote-tracking branch 'origin/3.0' into feature/dnode3
This commit is contained in:
commit
47b26fb335
|
@ -13,5 +13,6 @@ int taosGetFqdnPortFromEp(const char *ep, char *fqdn, uint16_t *port);
|
||||||
bool isEpsetEqual(const SEpSet *s1, const SEpSet *s2);
|
bool isEpsetEqual(const SEpSet *s1, const SEpSet *s2);
|
||||||
|
|
||||||
void updateEpSet_s(SCorEpSet *pEpSet, SEpSet *pNewEpSet);
|
void updateEpSet_s(SCorEpSet *pEpSet, SEpSet *pNewEpSet);
|
||||||
|
SEpSet getEpSet_s(SCorEpSet *pEpSet);
|
||||||
|
|
||||||
#endif // TDENGINE_TEP_H
|
#endif // TDENGINE_TEP_H
|
||||||
|
|
|
@ -371,7 +371,7 @@ typedef struct SColIndex {
|
||||||
int16_t colId; // column id
|
int16_t colId; // column id
|
||||||
int16_t colIndex; // column index in colList if it is a normal column or index in tagColList if a tag
|
int16_t colIndex; // column index in colList if it is a normal column or index in tagColList if a tag
|
||||||
int16_t flag; // denote if it is a tag or a normal column
|
int16_t flag; // denote if it is a tag or a normal column
|
||||||
char name[TSDB_COL_NAME_LEN + TSDB_DB_NAME_LEN + 1];
|
char name[TSDB_DB_FNAME_LEN];
|
||||||
} SColIndex;
|
} SColIndex;
|
||||||
|
|
||||||
typedef struct SColumnFilterInfo {
|
typedef struct SColumnFilterInfo {
|
||||||
|
@ -518,7 +518,7 @@ typedef struct SRetrieveTableRsp {
|
||||||
} SRetrieveTableRsp;
|
} SRetrieveTableRsp;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
char db[TSDB_FULL_DB_NAME_LEN];
|
char db[TSDB_DB_FNAME_LEN];
|
||||||
int32_t numOfVgroups;
|
int32_t numOfVgroups;
|
||||||
int32_t cacheBlockSize; // MB
|
int32_t cacheBlockSize; // MB
|
||||||
int32_t totalBlocks;
|
int32_t totalBlocks;
|
||||||
|
@ -542,7 +542,7 @@ typedef struct {
|
||||||
} SCreateDbMsg;
|
} SCreateDbMsg;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
char db[TSDB_FULL_DB_NAME_LEN];
|
char db[TSDB_DB_FNAME_LEN];
|
||||||
int32_t totalBlocks;
|
int32_t totalBlocks;
|
||||||
int32_t daysToKeep0;
|
int32_t daysToKeep0;
|
||||||
int32_t daysToKeep1;
|
int32_t daysToKeep1;
|
||||||
|
@ -692,7 +692,7 @@ typedef struct {
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int32_t vgId;
|
int32_t vgId;
|
||||||
int32_t dnodeId;
|
int32_t dnodeId;
|
||||||
char db[TSDB_FULL_DB_NAME_LEN];
|
char db[TSDB_DB_FNAME_LEN];
|
||||||
uint64_t dbUid;
|
uint64_t dbUid;
|
||||||
int32_t vgVersion;
|
int32_t vgVersion;
|
||||||
int32_t cacheBlockSize;
|
int32_t cacheBlockSize;
|
||||||
|
@ -719,7 +719,7 @@ typedef struct {
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int32_t vgId;
|
int32_t vgId;
|
||||||
int32_t dnodeId;
|
int32_t dnodeId;
|
||||||
char db[TSDB_FULL_DB_NAME_LEN];
|
char db[TSDB_DB_FNAME_LEN];
|
||||||
uint64_t dbUid;
|
uint64_t dbUid;
|
||||||
} SDropVnodeMsg, SSyncVnodeMsg, SCompactVnodeMsg;
|
} SDropVnodeMsg, SSyncVnodeMsg, SCompactVnodeMsg;
|
||||||
|
|
||||||
|
@ -795,7 +795,7 @@ typedef struct {
|
||||||
} STagData;
|
} STagData;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
char db[TSDB_FULL_DB_NAME_LEN];
|
char db[TSDB_DB_FNAME_LEN];
|
||||||
int32_t vgVersion;
|
int32_t vgVersion;
|
||||||
int32_t vgNum;
|
int32_t vgNum;
|
||||||
int8_t hashMethod;
|
int8_t hashMethod;
|
||||||
|
@ -809,13 +809,13 @@ typedef struct {
|
||||||
*/
|
*/
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int8_t type;
|
int8_t type;
|
||||||
char db[TSDB_FULL_DB_NAME_LEN];
|
char db[TSDB_DB_FNAME_LEN];
|
||||||
int16_t payloadLen;
|
int16_t payloadLen;
|
||||||
char payload[];
|
char payload[];
|
||||||
} SShowMsg;
|
} SShowMsg;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
char db[TSDB_FULL_DB_NAME_LEN];
|
char db[TSDB_DB_FNAME_LEN];
|
||||||
int32_t numOfVgroup;
|
int32_t numOfVgroup;
|
||||||
int32_t vgid[];
|
int32_t vgid[];
|
||||||
} SCompactMsg;
|
} SCompactMsg;
|
||||||
|
|
|
@ -67,14 +67,14 @@ int32_t catalogUpdateDBVgroupCache(struct SCatalog* pCatalog, const char* dbName
|
||||||
/**
|
/**
|
||||||
* Get a table's meta data.
|
* Get a table's meta data.
|
||||||
* @param pCatalog (input, got with catalogGetHandle)
|
* @param pCatalog (input, got with catalogGetHandle)
|
||||||
* @param pRpc (input, rpc object)
|
* @param pTransporter (input, rpc object)
|
||||||
* @param pMgmtEps (input, mnode EPs)
|
* @param pMgmtEps (input, mnode EPs)
|
||||||
* @param pDBName (input, full db name)
|
* @param pDBName (input, full db name)
|
||||||
* @param pTableName (input, table name, NOT including db name)
|
* @param pTableName (input, table name, NOT including db name)
|
||||||
* @param pTableMeta(output, table meta data, NEED to free it by calller)
|
* @param pTableMeta(output, table meta data, NEED to free it by calller)
|
||||||
* @return error code
|
* @return error code
|
||||||
*/
|
*/
|
||||||
int32_t catalogGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, STableMeta** pTableMeta);
|
int32_t catalogGetTableMeta(struct SCatalog* pCatalog, void * pTransporter, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, STableMeta** pTableMeta);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Force renew a table's local cached meta data.
|
* Force renew a table's local cached meta data.
|
||||||
|
|
|
@ -44,9 +44,12 @@ typedef struct SField {
|
||||||
} SField;
|
} SField;
|
||||||
|
|
||||||
typedef struct SParseBasicCtx {
|
typedef struct SParseBasicCtx {
|
||||||
const char *db;
|
uint64_t requestId;
|
||||||
int32_t acctId;
|
int32_t acctId;
|
||||||
uint64_t requestId;
|
const char *db;
|
||||||
|
void *pTransporter;
|
||||||
|
SEpSet mgmtEpSet;
|
||||||
|
struct SCatalog *pCatalog;
|
||||||
} SParseBasicCtx;
|
} SParseBasicCtx;
|
||||||
|
|
||||||
typedef struct SFieldInfo {
|
typedef struct SFieldInfo {
|
||||||
|
|
|
@ -23,10 +23,7 @@ extern "C" {
|
||||||
#include "parsenodes.h"
|
#include "parsenodes.h"
|
||||||
|
|
||||||
typedef struct SParseContext {
|
typedef struct SParseContext {
|
||||||
SParseBasicCtx ctx;
|
SParseBasicCtx ctx;
|
||||||
void *pRpc;
|
|
||||||
struct SCatalog *pCatalog;
|
|
||||||
const SEpSet *pEpSet;
|
|
||||||
int8_t schemaAttached; // denote if submit block is built with table schema or not
|
int8_t schemaAttached; // denote if submit block is built with table schema or not
|
||||||
const char *pSql; // sql string
|
const char *pSql; // sql string
|
||||||
size_t sqlLen; // length of the sql string
|
size_t sqlLen; // length of the sql string
|
||||||
|
|
|
@ -21,6 +21,7 @@ extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
#include "tmsg.h"
|
#include "tmsg.h"
|
||||||
|
#include "tarray.h"
|
||||||
|
|
||||||
#define QUERY_TYPE_MERGE 1
|
#define QUERY_TYPE_MERGE 1
|
||||||
#define QUERY_TYPE_PARTIAL 2
|
#define QUERY_TYPE_PARTIAL 2
|
||||||
|
@ -131,7 +132,7 @@ typedef struct SSubplan {
|
||||||
typedef struct SQueryDag {
|
typedef struct SQueryDag {
|
||||||
uint64_t queryId;
|
uint64_t queryId;
|
||||||
int32_t numOfSubplans;
|
int32_t numOfSubplans;
|
||||||
SArray *pSubplans; // Element is SArray*, and nested element is SSubplan. The execution level of subplan, starting from 0.
|
SArray *pSubplans; // SArray*<SArray*<SSubplan*>>. The execution level of subplan, starting from 0.
|
||||||
} SQueryDag;
|
} SQueryDag;
|
||||||
|
|
||||||
struct SQueryNode;
|
struct SQueryNode;
|
||||||
|
@ -165,8 +166,11 @@ void qDestroySubplan(SSubplan* pSubplan);
|
||||||
*/
|
*/
|
||||||
void qDestroyQueryDag(SQueryDag* pDag);
|
void qDestroyQueryDag(SQueryDag* pDag);
|
||||||
|
|
||||||
|
char* qDagToString(const SQueryDag* pDag);
|
||||||
|
SQueryDag* qStringToDag(const char* pStr);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
#endif /*_TD_PLANNER_H_*/
|
#endif /*_TD_PLANNER_H_*/
|
||||||
|
|
|
@ -81,7 +81,7 @@ typedef struct SDBVgroupInfo {
|
||||||
} SDBVgroupInfo;
|
} SDBVgroupInfo;
|
||||||
|
|
||||||
typedef struct SUseDbOutput {
|
typedef struct SUseDbOutput {
|
||||||
char db[TSDB_FULL_DB_NAME_LEN];
|
char db[TSDB_DB_FNAME_LEN];
|
||||||
SDBVgroupInfo dbVgroup;
|
SDBVgroupInfo dbVgroup;
|
||||||
} SUseDbOutput;
|
} SUseDbOutput;
|
||||||
|
|
||||||
|
|
|
@ -149,7 +149,7 @@ do { \
|
||||||
#define IS_RELATION_OPTR(op) (((op) >= TSDB_RELATION_LESS) && ((op) < TSDB_RELATION_IN))
|
#define IS_RELATION_OPTR(op) (((op) >= TSDB_RELATION_LESS) && ((op) < TSDB_RELATION_IN))
|
||||||
#define IS_ARITHMETIC_OPTR(op) (((op) >= TSDB_BINARY_OP_ADD) && ((op) <= TSDB_BINARY_OP_REMAINDER))
|
#define IS_ARITHMETIC_OPTR(op) (((op) >= TSDB_BINARY_OP_ADD) && ((op) <= TSDB_BINARY_OP_REMAINDER))
|
||||||
|
|
||||||
#define TS_PATH_DELIMITER_LEN 1
|
#define TSDB_NAME_DELIMITER_LEN 1
|
||||||
|
|
||||||
#define TSDB_UNI_LEN 24
|
#define TSDB_UNI_LEN 24
|
||||||
#define TSDB_USER_LEN TSDB_UNI_LEN
|
#define TSDB_USER_LEN TSDB_UNI_LEN
|
||||||
|
@ -165,17 +165,17 @@ do { \
|
||||||
#define TSDB_TABLE_NAME_LEN 193 // it is a null-terminated string
|
#define TSDB_TABLE_NAME_LEN 193 // it is a null-terminated string
|
||||||
#define TSDB_TOPIC_NAME_LEN 193 // it is a null-terminated string
|
#define TSDB_TOPIC_NAME_LEN 193 // it is a null-terminated string
|
||||||
#define TSDB_DB_NAME_LEN 65
|
#define TSDB_DB_NAME_LEN 65
|
||||||
#define TSDB_FULL_DB_NAME_LEN (TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN)
|
#define TSDB_DB_FNAME_LEN (TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN + TSDB_NAME_DELIMITER_LEN)
|
||||||
|
|
||||||
#define TSDB_FUNC_NAME_LEN 65
|
#define TSDB_FUNC_NAME_LEN 65
|
||||||
#define TSDB_FUNC_COMMENT_LEN 4096
|
#define TSDB_FUNC_COMMENT_LEN 4096
|
||||||
#define TSDB_FUNC_CODE_LEN (65535 - 512)
|
#define TSDB_FUNC_CODE_LEN (65535 - 512)
|
||||||
#define TSDB_FUNC_BUF_SIZE 512
|
#define TSDB_FUNC_BUF_SIZE 512
|
||||||
#define TSDB_FUNC_TYPE_SCALAR 1
|
#define TSDB_FUNC_TYPE_SCALAR 1
|
||||||
#define TSDB_FUNC_TYPE_AGGREGATE 2
|
#define TSDB_FUNC_TYPE_AGGREGATE 2
|
||||||
|
|
||||||
#define TSDB_TYPE_STR_MAX_LEN 32
|
#define TSDB_TYPE_STR_MAX_LEN 32
|
||||||
#define TSDB_TABLE_FNAME_LEN (TSDB_FULL_DB_NAME_LEN + TSDB_TABLE_NAME_LEN)
|
#define TSDB_TABLE_FNAME_LEN (TSDB_DB_FNAME_LEN + TSDB_TABLE_NAME_LEN + TSDB_NAME_DELIMITER_LEN)
|
||||||
#define TSDB_TOPIC_FNAME_LEN TSDB_TABLE_FNAME_LEN
|
#define TSDB_TOPIC_FNAME_LEN TSDB_TABLE_FNAME_LEN
|
||||||
#define TSDB_COL_NAME_LEN 65
|
#define TSDB_COL_NAME_LEN 65
|
||||||
#define TSDB_MAX_SAVED_SQL_LEN TSDB_MAX_COLUMNS * 64
|
#define TSDB_MAX_SAVED_SQL_LEN TSDB_MAX_COLUMNS * 64
|
||||||
|
|
|
@ -75,16 +75,16 @@ typedef struct SAppInfo {
|
||||||
} SAppInfo;
|
} SAppInfo;
|
||||||
|
|
||||||
typedef struct STscObj {
|
typedef struct STscObj {
|
||||||
char user[TSDB_USER_LEN];
|
char user[TSDB_USER_LEN];
|
||||||
char pass[TSDB_PASSWORD_LEN];
|
char pass[TSDB_PASSWORD_LEN];
|
||||||
char db[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN];
|
char db[TSDB_DB_FNAME_LEN];
|
||||||
int32_t acctId;
|
int32_t acctId;
|
||||||
uint32_t connId;
|
uint32_t connId;
|
||||||
uint64_t id; // ref ID returned by taosAddRef
|
uint64_t id; // ref ID returned by taosAddRef
|
||||||
void *pTransporter;
|
void *pTransporter;
|
||||||
pthread_mutex_t mutex; // used to protect the operation on db
|
pthread_mutex_t mutex; // used to protect the operation on db
|
||||||
int32_t numOfReqs; // number of sqlObj from this tscObj
|
int32_t numOfReqs; // number of sqlObj from this tscObj
|
||||||
SAppInstInfo *pAppInfo;
|
SAppInstInfo *pAppInfo;
|
||||||
} STscObj;
|
} STscObj;
|
||||||
|
|
||||||
typedef struct SReqResultInfo {
|
typedef struct SReqResultInfo {
|
||||||
|
|
|
@ -145,15 +145,32 @@ int32_t buildRequest(STscObj *pTscObj, const char *sql, int sqlLen, SRequestObj*
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t parseSql(SRequestObj* pRequest, SQueryNode** pQuery) {
|
int32_t parseSql(SRequestObj* pRequest, SQueryNode** pQuery) {
|
||||||
|
STscObj* pTscObj = pRequest->pTscObj;
|
||||||
|
|
||||||
SParseContext cxt = {
|
SParseContext cxt = {
|
||||||
.ctx = {.requestId = pRequest->requestId, .acctId = pRequest->pTscObj->acctId, .db = getConnectionDB(pRequest->pTscObj)},
|
.ctx = {.requestId = pRequest->requestId, .acctId = pTscObj->acctId, .db = getConnectionDB(pTscObj), .pTransporter = pTscObj->pTransporter},
|
||||||
.pSql = pRequest->sqlstr,
|
.pSql = pRequest->sqlstr,
|
||||||
.sqlLen = pRequest->sqlLen,
|
.sqlLen = pRequest->sqlLen,
|
||||||
.pMsg = pRequest->msgBuf,
|
.pMsg = pRequest->msgBuf,
|
||||||
.msgLen = ERROR_MSG_BUF_DEFAULT_SIZE
|
.msgLen = ERROR_MSG_BUF_DEFAULT_SIZE
|
||||||
};
|
};
|
||||||
|
|
||||||
int32_t code = qParseQuerySql(&cxt, pQuery);
|
cxt.ctx.mgmtEpSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
|
||||||
|
|
||||||
|
// todo OPT performance
|
||||||
|
char buf[12] = {0};
|
||||||
|
sprintf(buf, "%"PRId64, pTscObj->pAppInfo->clusterId);
|
||||||
|
|
||||||
|
struct SCatalog* pCatalog = NULL;
|
||||||
|
int32_t code = catalogGetHandle(buf, &pCatalog);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
tfree(cxt.ctx.db);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
cxt.ctx.pCatalog = pCatalog;
|
||||||
|
code = qParseQuerySql(&cxt, pQuery);
|
||||||
|
|
||||||
tfree(cxt.ctx.db);
|
tfree(cxt.ctx.db);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -165,7 +182,7 @@ int32_t execDdlQuery(SRequestObj* pRequest, SQueryNode* pQuery) {
|
||||||
|
|
||||||
STscObj* pTscObj = pRequest->pTscObj;
|
STscObj* pTscObj = pRequest->pTscObj;
|
||||||
|
|
||||||
SMsgSendInfo* body = buildSendMsgInfoImpl(pRequest);
|
SMsgSendInfo* pSendMsg = buildSendMsgInfoImpl(pRequest);
|
||||||
SEpSet* pEpSet = &pTscObj->pAppInfo->mgmtEp.epSet;
|
SEpSet* pEpSet = &pTscObj->pAppInfo->mgmtEp.epSet;
|
||||||
|
|
||||||
if (pDcl->msgType == TDMT_VND_CREATE_TABLE) {
|
if (pDcl->msgType == TDMT_VND_CREATE_TABLE) {
|
||||||
|
@ -178,34 +195,34 @@ int32_t execDdlQuery(SRequestObj* pRequest, SQueryNode* pQuery) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
SCreateTableMsg* pMsg = body->msgInfo.pData;
|
SCreateTableMsg* pMsg = pSendMsg->msgInfo.pData;
|
||||||
|
|
||||||
SName t = {0};
|
SName t = {0};
|
||||||
tNameFromString(&t, pMsg->name, T_NAME_ACCT|T_NAME_DB|T_NAME_TABLE);
|
tNameFromString(&t, pMsg->name, T_NAME_ACCT|T_NAME_DB|T_NAME_TABLE);
|
||||||
|
|
||||||
char db[TSDB_DB_NAME_LEN + TS_PATH_DELIMITER_LEN + TSDB_ACCT_ID_LEN] = {0};
|
char db[TSDB_DB_NAME_LEN + TSDB_NAME_DELIMITER_LEN + TSDB_ACCT_ID_LEN] = {0};
|
||||||
tNameGetFullDbName(&t, db);
|
tNameGetFullDbName(&t, db);
|
||||||
|
|
||||||
SVgroupInfo info = {0};
|
SVgroupInfo info = {0};
|
||||||
catalogGetTableHashVgroup(pCatalog, pRequest->pTscObj->pTransporter, pEpSet, db, tNameGetTableName(&t), &info);
|
catalogGetTableHashVgroup(pCatalog, pRequest->pTscObj->pTransporter, pEpSet, db, tNameGetTableName(&t), &info);
|
||||||
|
|
||||||
int64_t transporterId = 0;
|
int64_t transporterId = 0;
|
||||||
SEpSet ep = {0};
|
SEpSet ep = {0};
|
||||||
ep.inUse = info.inUse;
|
ep.inUse = info.inUse;
|
||||||
ep.numOfEps = info.numOfEps;
|
ep.numOfEps = info.numOfEps;
|
||||||
for(int32_t i = 0; i < ep.numOfEps; ++i) {
|
for(int32_t i = 0; i < ep.numOfEps; ++i) {
|
||||||
ep.port[i] = info.epAddr[i].port;
|
ep.port[i] = info.epAddr[i].port;
|
||||||
tstrncpy(ep.fqdn[i], info.epAddr[i].fqdn, tListLen(ep.fqdn[i]));
|
tstrncpy(ep.fqdn[i], info.epAddr[i].fqdn, tListLen(ep.fqdn[i]));
|
||||||
}
|
}
|
||||||
|
|
||||||
asyncSendMsgToServer(pTscObj->pTransporter, &ep, &transporterId, body);
|
asyncSendMsgToServer(pTscObj->pTransporter, &ep, &transporterId, pSendMsg);
|
||||||
} else {
|
} else {
|
||||||
int64_t transporterId = 0;
|
int64_t transporterId = 0;
|
||||||
asyncSendMsgToServer(pTscObj->pTransporter, pEpSet, &transporterId, body);
|
asyncSendMsgToServer(pTscObj->pTransporter, pEpSet, &transporterId, pSendMsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
tsem_wait(&pRequest->body.rspSem);
|
tsem_wait(&pRequest->body.rspSem);
|
||||||
destroySendMsgInfo(body);
|
destroySendMsgInfo(pSendMsg);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -139,7 +139,7 @@ int32_t processShowRsp(void* param, const SDataBuf* pMsg, int32_t code) {
|
||||||
pFields[i].bytes = pSchema[i].bytes;
|
pFields[i].bytes = pSchema[i].bytes;
|
||||||
}
|
}
|
||||||
|
|
||||||
// pRequest->body.resInfo.pRspMsg = pMsg->pData;
|
pRequest->body.resInfo.pRspMsg = pMsg->pData;
|
||||||
SReqResultInfo* pResInfo = &pRequest->body.resInfo;
|
SReqResultInfo* pResInfo = &pRequest->body.resInfo;
|
||||||
|
|
||||||
pResInfo->fields = pFields;
|
pResInfo->fields = pFields;
|
||||||
|
@ -295,6 +295,6 @@ void initMsgHandleFp() {
|
||||||
handleRequestRspFp[TDMT_MND_SHOW_RETRIEVE] = processRetrieveMnodeRsp;
|
handleRequestRspFp[TDMT_MND_SHOW_RETRIEVE] = processRetrieveMnodeRsp;
|
||||||
handleRequestRspFp[TDMT_MND_CREATE_DB] = processCreateDbRsp;
|
handleRequestRspFp[TDMT_MND_CREATE_DB] = processCreateDbRsp;
|
||||||
handleRequestRspFp[TDMT_MND_USE_DB] = processUseDbRsp;
|
handleRequestRspFp[TDMT_MND_USE_DB] = processUseDbRsp;
|
||||||
handleRequestRspFp[TDMT_MND_CREATE_STB] = processCreateTableRsp;
|
handleRequestRspFp[TDMT_MND_CREATE_STB] = processCreateTableRsp;
|
||||||
handleRequestRspFp[TDMT_MND_DROP_DB] = processDropDbRsp;
|
handleRequestRspFp[TDMT_MND_DROP_DB] = processDropDbRsp;
|
||||||
}
|
}
|
|
@ -39,3 +39,12 @@ void updateEpSet_s(SCorEpSet *pEpSet, SEpSet *pNewEpSet) {
|
||||||
taosCorEndWrite(&pEpSet->version);
|
taosCorEndWrite(&pEpSet->version);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
SEpSet getEpSet_s(SCorEpSet *pEpSet) {
|
||||||
|
SEpSet ep = {0};
|
||||||
|
taosCorBeginRead(&pEpSet->version);
|
||||||
|
ep = pEpSet->epSet;
|
||||||
|
taosCorEndRead(&pEpSet->version);
|
||||||
|
|
||||||
|
return ep;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
|
@ -110,7 +110,7 @@ int32_t tNameExtractFullName(const SName* name, char* dst) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t len = snprintf(dst, TSDB_FULL_DB_NAME_LEN, "%d.%s", name->acctId, name->dbname);
|
int32_t len = snprintf(dst, TSDB_DB_FNAME_LEN, "%d.%s", name->acctId, name->dbname);
|
||||||
|
|
||||||
size_t tnameLen = strlen(name->tname);
|
size_t tnameLen = strlen(name->tname);
|
||||||
if (tnameLen > 0) {
|
if (tnameLen > 0) {
|
||||||
|
@ -134,10 +134,10 @@ int32_t tNameLen(const SName* name) {
|
||||||
|
|
||||||
if (name->type == TSDB_DB_NAME_T) {
|
if (name->type == TSDB_DB_NAME_T) {
|
||||||
assert(len2 == 0);
|
assert(len2 == 0);
|
||||||
return len + len1 + TS_PATH_DELIMITER_LEN;
|
return len + len1 + TSDB_NAME_DELIMITER_LEN;
|
||||||
} else {
|
} else {
|
||||||
assert(len2 > 0);
|
assert(len2 > 0);
|
||||||
return len + len1 + len2 + TS_PATH_DELIMITER_LEN * 2;
|
return len + len1 + len2 + TSDB_NAME_DELIMITER_LEN * 2;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -171,8 +171,7 @@ int32_t tNameGetDbName(const SName* name, char* dst) {
|
||||||
|
|
||||||
int32_t tNameGetFullDbName(const SName* name, char* dst) {
|
int32_t tNameGetFullDbName(const SName* name, char* dst) {
|
||||||
assert(name != NULL && dst != NULL);
|
assert(name != NULL && dst != NULL);
|
||||||
snprintf(dst, TSDB_ACCT_ID_LEN + TS_PATH_DELIMITER_LEN + TSDB_DB_NAME_LEN, // there is a over write risk
|
snprintf(dst, TSDB_DB_FNAME_LEN, "%d.%s", name->acctId, name->dbname);
|
||||||
"%d.%s", name->acctId, name->dbname);
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -22,7 +22,7 @@ typedef struct {
|
||||||
int32_t vgVersion;
|
int32_t vgVersion;
|
||||||
int8_t dropped;
|
int8_t dropped;
|
||||||
uint64_t dbUid;
|
uint64_t dbUid;
|
||||||
char db[TSDB_FULL_DB_NAME_LEN];
|
char db[TSDB_DB_FNAME_LEN];
|
||||||
char path[PATH_MAX + 20];
|
char path[PATH_MAX + 20];
|
||||||
} SWrapperCfg;
|
} SWrapperCfg;
|
||||||
|
|
||||||
|
@ -319,7 +319,7 @@ static int32_t dndGetVnodesFromFile(SDnode *pDnode, SWrapperCfg **ppCfgs, int32_
|
||||||
dError("failed to read %s since db not found", file);
|
dError("failed to read %s since db not found", file);
|
||||||
goto PRASE_VNODE_OVER;
|
goto PRASE_VNODE_OVER;
|
||||||
}
|
}
|
||||||
tstrncpy(pCfg->db, db->valuestring, TSDB_FULL_DB_NAME_LEN);
|
tstrncpy(pCfg->db, db->valuestring, TSDB_DB_FNAME_LEN);
|
||||||
}
|
}
|
||||||
|
|
||||||
*ppCfgs = pCfgs;
|
*ppCfgs = pCfgs;
|
||||||
|
@ -569,7 +569,7 @@ static void dndGenerateVnodeCfg(SCreateVnodeMsg *pCreate, SVnodeCfg *pCfg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static void dndGenerateWrapperCfg(SDnode *pDnode, SCreateVnodeMsg *pCreate, SWrapperCfg *pCfg) {
|
static void dndGenerateWrapperCfg(SDnode *pDnode, SCreateVnodeMsg *pCreate, SWrapperCfg *pCfg) {
|
||||||
memcpy(pCfg->db, pCreate->db, TSDB_FULL_DB_NAME_LEN);
|
memcpy(pCfg->db, pCreate->db, TSDB_DB_FNAME_LEN);
|
||||||
pCfg->dbUid = pCreate->dbUid;
|
pCfg->dbUid = pCreate->dbUid;
|
||||||
pCfg->dropped = 0;
|
pCfg->dropped = 0;
|
||||||
snprintf(pCfg->path, sizeof(pCfg->path), "%s/vnode%d", pDnode->dir.vnodes, pCreate->vgId);
|
snprintf(pCfg->path, sizeof(pCfg->path), "%s/vnode%d", pDnode->dir.vnodes, pCreate->vgId);
|
||||||
|
|
|
@ -209,7 +209,7 @@ typedef struct {
|
||||||
} SDbCfg;
|
} SDbCfg;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
char name[TSDB_FULL_DB_NAME_LEN];
|
char name[TSDB_DB_FNAME_LEN];
|
||||||
char acct[TSDB_USER_LEN];
|
char acct[TSDB_USER_LEN];
|
||||||
int64_t createdTime;
|
int64_t createdTime;
|
||||||
int64_t updateTime;
|
int64_t updateTime;
|
||||||
|
@ -232,7 +232,7 @@ typedef struct {
|
||||||
int32_t version;
|
int32_t version;
|
||||||
uint32_t hashBegin;
|
uint32_t hashBegin;
|
||||||
uint32_t hashEnd;
|
uint32_t hashEnd;
|
||||||
char dbName[TSDB_FULL_DB_NAME_LEN];
|
char dbName[TSDB_DB_FNAME_LEN];
|
||||||
int64_t dbUid;
|
int64_t dbUid;
|
||||||
int32_t numOfTables;
|
int32_t numOfTables;
|
||||||
int32_t numOfTimeSeries;
|
int32_t numOfTimeSeries;
|
||||||
|
@ -246,7 +246,7 @@ typedef struct {
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
char name[TSDB_TABLE_FNAME_LEN];
|
char name[TSDB_TABLE_FNAME_LEN];
|
||||||
char db[TSDB_FULL_DB_NAME_LEN];
|
char db[TSDB_DB_FNAME_LEN];
|
||||||
int64_t createdTime;
|
int64_t createdTime;
|
||||||
int64_t updateTime;
|
int64_t updateTime;
|
||||||
uint64_t uid;
|
uint64_t uid;
|
||||||
|
@ -286,7 +286,7 @@ typedef struct {
|
||||||
int32_t payloadLen;
|
int32_t payloadLen;
|
||||||
void *pIter;
|
void *pIter;
|
||||||
SMnode *pMnode;
|
SMnode *pMnode;
|
||||||
char db[TSDB_FULL_DB_NAME_LEN];
|
char db[TSDB_DB_FNAME_LEN];
|
||||||
int16_t offset[TSDB_MAX_COLUMNS];
|
int16_t offset[TSDB_MAX_COLUMNS];
|
||||||
int32_t bytes[TSDB_MAX_COLUMNS];
|
int32_t bytes[TSDB_MAX_COLUMNS];
|
||||||
char payload[];
|
char payload[];
|
||||||
|
@ -294,7 +294,7 @@ typedef struct {
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
char name[TSDB_TOPIC_FNAME_LEN];
|
char name[TSDB_TOPIC_FNAME_LEN];
|
||||||
char db[TSDB_FULL_DB_NAME_LEN];
|
char db[TSDB_DB_FNAME_LEN];
|
||||||
int64_t createTime;
|
int64_t createTime;
|
||||||
int64_t updateTime;
|
int64_t updateTime;
|
||||||
uint64_t uid;
|
uint64_t uid;
|
||||||
|
@ -309,7 +309,7 @@ typedef struct {
|
||||||
|
|
||||||
typedef struct SMnodeMsg {
|
typedef struct SMnodeMsg {
|
||||||
char user[TSDB_USER_LEN];
|
char user[TSDB_USER_LEN];
|
||||||
char db[TSDB_FULL_DB_NAME_LEN];
|
char db[TSDB_DB_FNAME_LEN];
|
||||||
int32_t acctId;
|
int32_t acctId;
|
||||||
SMnode *pMnode;
|
SMnode *pMnode;
|
||||||
int64_t createdTime;
|
int64_t createdTime;
|
||||||
|
|
|
@ -69,7 +69,7 @@ static SSdbRaw *mndDbActionEncode(SDbObj *pDb) {
|
||||||
if (pRaw == NULL) return NULL;
|
if (pRaw == NULL) return NULL;
|
||||||
|
|
||||||
int32_t dataPos = 0;
|
int32_t dataPos = 0;
|
||||||
SDB_SET_BINARY(pRaw, dataPos, pDb->name, TSDB_FULL_DB_NAME_LEN)
|
SDB_SET_BINARY(pRaw, dataPos, pDb->name, TSDB_DB_FNAME_LEN)
|
||||||
SDB_SET_BINARY(pRaw, dataPos, pDb->acct, TSDB_USER_LEN)
|
SDB_SET_BINARY(pRaw, dataPos, pDb->acct, TSDB_USER_LEN)
|
||||||
SDB_SET_INT64(pRaw, dataPos, pDb->createdTime)
|
SDB_SET_INT64(pRaw, dataPos, pDb->createdTime)
|
||||||
SDB_SET_INT64(pRaw, dataPos, pDb->updateTime)
|
SDB_SET_INT64(pRaw, dataPos, pDb->updateTime)
|
||||||
|
@ -116,7 +116,7 @@ static SSdbRow *mndDbActionDecode(SSdbRaw *pRaw) {
|
||||||
if (pDb == NULL) return NULL;
|
if (pDb == NULL) return NULL;
|
||||||
|
|
||||||
int32_t dataPos = 0;
|
int32_t dataPos = 0;
|
||||||
SDB_GET_BINARY(pRaw, pRow, dataPos, pDb->name, TSDB_FULL_DB_NAME_LEN)
|
SDB_GET_BINARY(pRaw, pRow, dataPos, pDb->name, TSDB_DB_FNAME_LEN)
|
||||||
SDB_GET_BINARY(pRaw, pRow, dataPos, pDb->acct, TSDB_USER_LEN)
|
SDB_GET_BINARY(pRaw, pRow, dataPos, pDb->acct, TSDB_USER_LEN)
|
||||||
SDB_GET_INT64(pRaw, pRow, dataPos, &pDb->createdTime)
|
SDB_GET_INT64(pRaw, pRow, dataPos, &pDb->createdTime)
|
||||||
SDB_GET_INT64(pRaw, pRow, dataPos, &pDb->updateTime)
|
SDB_GET_INT64(pRaw, pRow, dataPos, &pDb->updateTime)
|
||||||
|
@ -353,11 +353,11 @@ static int32_t mndSetCreateDbUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj
|
||||||
|
|
||||||
static int32_t mndCreateDb(SMnode *pMnode, SMnodeMsg *pMsg, SCreateDbMsg *pCreate, SUserObj *pUser) {
|
static int32_t mndCreateDb(SMnode *pMnode, SMnodeMsg *pMsg, SCreateDbMsg *pCreate, SUserObj *pUser) {
|
||||||
SDbObj dbObj = {0};
|
SDbObj dbObj = {0};
|
||||||
memcpy(dbObj.name, pCreate->db, TSDB_FULL_DB_NAME_LEN);
|
memcpy(dbObj.name, pCreate->db, TSDB_DB_FNAME_LEN);
|
||||||
memcpy(dbObj.acct, pUser->acct, TSDB_USER_LEN);
|
memcpy(dbObj.acct, pUser->acct, TSDB_USER_LEN);
|
||||||
dbObj.createdTime = taosGetTimestampMs();
|
dbObj.createdTime = taosGetTimestampMs();
|
||||||
dbObj.updateTime = dbObj.createdTime;
|
dbObj.updateTime = dbObj.createdTime;
|
||||||
dbObj.uid = mndGenerateUid(dbObj.name, TSDB_FULL_DB_NAME_LEN);
|
dbObj.uid = mndGenerateUid(dbObj.name, TSDB_DB_FNAME_LEN);
|
||||||
dbObj.cfgVersion = 1;
|
dbObj.cfgVersion = 1;
|
||||||
dbObj.vgVersion = 1;
|
dbObj.vgVersion = 1;
|
||||||
dbObj.hashMethod = 1;
|
dbObj.hashMethod = 1;
|
||||||
|
@ -891,7 +891,7 @@ static int32_t mndProcessUseDbMsg(SMnodeMsg *pMsg) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
memcpy(pRsp->db, pDb->name, TSDB_FULL_DB_NAME_LEN);
|
memcpy(pRsp->db, pDb->name, TSDB_DB_FNAME_LEN);
|
||||||
pRsp->vgVersion = htonl(pDb->vgVersion);
|
pRsp->vgVersion = htonl(pDb->vgVersion);
|
||||||
pRsp->vgNum = htonl(vindex);
|
pRsp->vgNum = htonl(vindex);
|
||||||
pRsp->hashMethod = pDb->hashMethod;
|
pRsp->hashMethod = pDb->hashMethod;
|
||||||
|
|
|
@ -194,7 +194,7 @@ static int32_t mndProcessConnectMsg(SMnodeMsg *pMsg) {
|
||||||
taosIp2String(info.clientIp, ip);
|
taosIp2String(info.clientIp, ip);
|
||||||
|
|
||||||
if (pReq->db[0]) {
|
if (pReq->db[0]) {
|
||||||
snprintf(pMsg->db, TSDB_FULL_DB_NAME_LEN, "%d%s%s", pMsg->acctId, TS_PATH_DELIMITER, pReq->db);
|
snprintf(pMsg->db, TSDB_DB_FNAME_LEN, "%d%s%s", pMsg->acctId, TS_PATH_DELIMITER, pReq->db);
|
||||||
SDbObj *pDb = mndAcquireDb(pMnode, pMsg->db);
|
SDbObj *pDb = mndAcquireDb(pMnode, pMsg->db);
|
||||||
if (pDb == NULL) {
|
if (pDb == NULL) {
|
||||||
terrno = TSDB_CODE_MND_INVALID_DB;
|
terrno = TSDB_CODE_MND_INVALID_DB;
|
||||||
|
|
|
@ -62,7 +62,7 @@ static SShowObj *mndCreateShowObj(SMnode *pMnode, SShowMsg *pMsg) {
|
||||||
pShow->pMnode = pMnode;
|
pShow->pMnode = pMnode;
|
||||||
pShow->type = pMsg->type;
|
pShow->type = pMsg->type;
|
||||||
pShow->payloadLen = pMsg->payloadLen;
|
pShow->payloadLen = pMsg->payloadLen;
|
||||||
memcpy(pShow->db, pMsg->db, TSDB_FULL_DB_NAME_LEN);
|
memcpy(pShow->db, pMsg->db, TSDB_DB_FNAME_LEN);
|
||||||
memcpy(pShow->payload, pMsg->payload, pMsg->payloadLen);
|
memcpy(pShow->payload, pMsg->payload, pMsg->payloadLen);
|
||||||
} else {
|
} else {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
|
|
@ -76,7 +76,7 @@ static SSdbRaw *mndStbActionEncode(SStbObj *pStb) {
|
||||||
|
|
||||||
int32_t dataPos = 0;
|
int32_t dataPos = 0;
|
||||||
SDB_SET_BINARY(pRaw, dataPos, pStb->name, TSDB_TABLE_FNAME_LEN)
|
SDB_SET_BINARY(pRaw, dataPos, pStb->name, TSDB_TABLE_FNAME_LEN)
|
||||||
SDB_SET_BINARY(pRaw, dataPos, pStb->db, TSDB_FULL_DB_NAME_LEN)
|
SDB_SET_BINARY(pRaw, dataPos, pStb->db, TSDB_DB_FNAME_LEN)
|
||||||
SDB_SET_INT64(pRaw, dataPos, pStb->createdTime)
|
SDB_SET_INT64(pRaw, dataPos, pStb->createdTime)
|
||||||
SDB_SET_INT64(pRaw, dataPos, pStb->updateTime)
|
SDB_SET_INT64(pRaw, dataPos, pStb->updateTime)
|
||||||
SDB_SET_INT64(pRaw, dataPos, pStb->uid)
|
SDB_SET_INT64(pRaw, dataPos, pStb->uid)
|
||||||
|
@ -117,7 +117,7 @@ static SSdbRow *mndStbActionDecode(SSdbRaw *pRaw) {
|
||||||
|
|
||||||
int32_t dataPos = 0;
|
int32_t dataPos = 0;
|
||||||
SDB_GET_BINARY(pRaw, pRow, dataPos, pStb->name, TSDB_TABLE_FNAME_LEN)
|
SDB_GET_BINARY(pRaw, pRow, dataPos, pStb->name, TSDB_TABLE_FNAME_LEN)
|
||||||
SDB_GET_BINARY(pRaw, pRow, dataPos, pStb->db, TSDB_FULL_DB_NAME_LEN)
|
SDB_GET_BINARY(pRaw, pRow, dataPos, pStb->db, TSDB_DB_FNAME_LEN)
|
||||||
SDB_GET_INT64(pRaw, pRow, dataPos, &pStb->createdTime)
|
SDB_GET_INT64(pRaw, pRow, dataPos, &pStb->createdTime)
|
||||||
SDB_GET_INT64(pRaw, pRow, dataPos, &pStb->updateTime)
|
SDB_GET_INT64(pRaw, pRow, dataPos, &pStb->updateTime)
|
||||||
SDB_GET_INT64(pRaw, pRow, dataPos, &pStb->uid)
|
SDB_GET_INT64(pRaw, pRow, dataPos, &pStb->uid)
|
||||||
|
@ -435,7 +435,7 @@ static int32_t mndSetCreateStbUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj
|
||||||
static int32_t mndCreateStb(SMnode *pMnode, SMnodeMsg *pMsg, SCreateStbMsg *pCreate, SDbObj *pDb) {
|
static int32_t mndCreateStb(SMnode *pMnode, SMnodeMsg *pMsg, SCreateStbMsg *pCreate, SDbObj *pDb) {
|
||||||
SStbObj stbObj = {0};
|
SStbObj stbObj = {0};
|
||||||
tstrncpy(stbObj.name, pCreate->name, TSDB_TABLE_FNAME_LEN);
|
tstrncpy(stbObj.name, pCreate->name, TSDB_TABLE_FNAME_LEN);
|
||||||
tstrncpy(stbObj.db, pDb->name, TSDB_FULL_DB_NAME_LEN);
|
tstrncpy(stbObj.db, pDb->name, TSDB_DB_FNAME_LEN);
|
||||||
stbObj.createdTime = taosGetTimestampMs();
|
stbObj.createdTime = taosGetTimestampMs();
|
||||||
stbObj.updateTime = stbObj.createdTime;
|
stbObj.updateTime = stbObj.createdTime;
|
||||||
stbObj.uid = mndGenerateUid(pCreate->name, TSDB_TABLE_FNAME_LEN);
|
stbObj.uid = mndGenerateUid(pCreate->name, TSDB_TABLE_FNAME_LEN);
|
||||||
|
|
|
@ -79,7 +79,7 @@ static SSdbRaw *mndTopicActionEncode(STopicObj *pTopic) {
|
||||||
|
|
||||||
int32_t dataPos = 0;
|
int32_t dataPos = 0;
|
||||||
SDB_SET_BINARY(pRaw, dataPos, pTopic->name, TSDB_TABLE_FNAME_LEN);
|
SDB_SET_BINARY(pRaw, dataPos, pTopic->name, TSDB_TABLE_FNAME_LEN);
|
||||||
SDB_SET_BINARY(pRaw, dataPos, pTopic->db, TSDB_FULL_DB_NAME_LEN);
|
SDB_SET_BINARY(pRaw, dataPos, pTopic->db, TSDB_DB_FNAME_LEN);
|
||||||
SDB_SET_INT64(pRaw, dataPos, pTopic->createTime);
|
SDB_SET_INT64(pRaw, dataPos, pTopic->createTime);
|
||||||
SDB_SET_INT64(pRaw, dataPos, pTopic->updateTime);
|
SDB_SET_INT64(pRaw, dataPos, pTopic->updateTime);
|
||||||
SDB_SET_INT64(pRaw, dataPos, pTopic->uid);
|
SDB_SET_INT64(pRaw, dataPos, pTopic->uid);
|
||||||
|
@ -113,7 +113,7 @@ static SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) {
|
||||||
|
|
||||||
int32_t dataPos = 0;
|
int32_t dataPos = 0;
|
||||||
SDB_GET_BINARY(pRaw, pRow, dataPos, pTopic->name, TSDB_TABLE_FNAME_LEN);
|
SDB_GET_BINARY(pRaw, pRow, dataPos, pTopic->name, TSDB_TABLE_FNAME_LEN);
|
||||||
SDB_GET_BINARY(pRaw, pRow, dataPos, pTopic->db, TSDB_FULL_DB_NAME_LEN);
|
SDB_GET_BINARY(pRaw, pRow, dataPos, pTopic->db, TSDB_DB_FNAME_LEN);
|
||||||
SDB_GET_INT64(pRaw, pRow, dataPos, &pTopic->createTime);
|
SDB_GET_INT64(pRaw, pRow, dataPos, &pTopic->createTime);
|
||||||
SDB_GET_INT64(pRaw, pRow, dataPos, &pTopic->updateTime);
|
SDB_GET_INT64(pRaw, pRow, dataPos, &pTopic->updateTime);
|
||||||
SDB_GET_INT64(pRaw, pRow, dataPos, &pTopic->uid);
|
SDB_GET_INT64(pRaw, pRow, dataPos, &pTopic->uid);
|
||||||
|
@ -348,7 +348,7 @@ static int32_t mndSetCreateTopicUndoActions(SMnode *pMnode, STrans *pTrans, SDbO
|
||||||
static int32_t mndCreateTopic(SMnode *pMnode, SMnodeMsg *pMsg, SCreateTopicMsg *pCreate, SDbObj *pDb) {
|
static int32_t mndCreateTopic(SMnode *pMnode, SMnodeMsg *pMsg, SCreateTopicMsg *pCreate, SDbObj *pDb) {
|
||||||
STopicObj topicObj = {0};
|
STopicObj topicObj = {0};
|
||||||
tstrncpy(topicObj.name, pCreate->name, TSDB_TABLE_FNAME_LEN);
|
tstrncpy(topicObj.name, pCreate->name, TSDB_TABLE_FNAME_LEN);
|
||||||
tstrncpy(topicObj.db, pDb->name, TSDB_FULL_DB_NAME_LEN);
|
tstrncpy(topicObj.db, pDb->name, TSDB_DB_FNAME_LEN);
|
||||||
topicObj.createTime = taosGetTimestampMs();
|
topicObj.createTime = taosGetTimestampMs();
|
||||||
topicObj.updateTime = topicObj.createTime;
|
topicObj.updateTime = topicObj.createTime;
|
||||||
topicObj.uid = mndGenerateUid(pCreate->name, TSDB_TABLE_FNAME_LEN);
|
topicObj.uid = mndGenerateUid(pCreate->name, TSDB_TABLE_FNAME_LEN);
|
||||||
|
|
|
@ -80,7 +80,7 @@ SSdbRaw *mndVgroupActionEncode(SVgObj *pVgroup) {
|
||||||
SDB_SET_INT32(pRaw, dataPos, pVgroup->version)
|
SDB_SET_INT32(pRaw, dataPos, pVgroup->version)
|
||||||
SDB_SET_INT32(pRaw, dataPos, pVgroup->hashBegin)
|
SDB_SET_INT32(pRaw, dataPos, pVgroup->hashBegin)
|
||||||
SDB_SET_INT32(pRaw, dataPos, pVgroup->hashEnd)
|
SDB_SET_INT32(pRaw, dataPos, pVgroup->hashEnd)
|
||||||
SDB_SET_BINARY(pRaw, dataPos, pVgroup->dbName, TSDB_FULL_DB_NAME_LEN)
|
SDB_SET_BINARY(pRaw, dataPos, pVgroup->dbName, TSDB_DB_FNAME_LEN)
|
||||||
SDB_SET_INT64(pRaw, dataPos, pVgroup->dbUid)
|
SDB_SET_INT64(pRaw, dataPos, pVgroup->dbUid)
|
||||||
SDB_SET_INT8(pRaw, dataPos, pVgroup->replica)
|
SDB_SET_INT8(pRaw, dataPos, pVgroup->replica)
|
||||||
for (int8_t i = 0; i < pVgroup->replica; ++i) {
|
for (int8_t i = 0; i < pVgroup->replica; ++i) {
|
||||||
|
@ -115,7 +115,7 @@ SSdbRow *mndVgroupActionDecode(SSdbRaw *pRaw) {
|
||||||
SDB_GET_INT32(pRaw, pRow, dataPos, &pVgroup->version)
|
SDB_GET_INT32(pRaw, pRow, dataPos, &pVgroup->version)
|
||||||
SDB_GET_INT32(pRaw, pRow, dataPos, &pVgroup->hashBegin)
|
SDB_GET_INT32(pRaw, pRow, dataPos, &pVgroup->hashBegin)
|
||||||
SDB_GET_INT32(pRaw, pRow, dataPos, &pVgroup->hashEnd)
|
SDB_GET_INT32(pRaw, pRow, dataPos, &pVgroup->hashEnd)
|
||||||
SDB_GET_BINARY(pRaw, pRow, dataPos, pVgroup->dbName, TSDB_FULL_DB_NAME_LEN)
|
SDB_GET_BINARY(pRaw, pRow, dataPos, pVgroup->dbName, TSDB_DB_FNAME_LEN)
|
||||||
SDB_GET_INT64(pRaw, pRow, dataPos, &pVgroup->dbUid)
|
SDB_GET_INT64(pRaw, pRow, dataPos, &pVgroup->dbUid)
|
||||||
SDB_GET_INT8(pRaw, pRow, dataPos, &pVgroup->replica)
|
SDB_GET_INT8(pRaw, pRow, dataPos, &pVgroup->replica)
|
||||||
for (int8_t i = 0; i < pVgroup->replica; ++i) {
|
for (int8_t i = 0; i < pVgroup->replica; ++i) {
|
||||||
|
@ -172,7 +172,7 @@ SCreateVnodeMsg *mndBuildCreateVnodeMsg(SMnode *pMnode, SDnodeObj *pDnode, SDbOb
|
||||||
|
|
||||||
pCreate->vgId = htonl(pVgroup->vgId);
|
pCreate->vgId = htonl(pVgroup->vgId);
|
||||||
pCreate->dnodeId = htonl(pDnode->id);
|
pCreate->dnodeId = htonl(pDnode->id);
|
||||||
memcpy(pCreate->db, pDb->name, TSDB_FULL_DB_NAME_LEN);
|
memcpy(pCreate->db, pDb->name, TSDB_DB_FNAME_LEN);
|
||||||
pCreate->dbUid = htobe64(pDb->uid);
|
pCreate->dbUid = htobe64(pDb->uid);
|
||||||
pCreate->vgVersion = htonl(pVgroup->version);
|
pCreate->vgVersion = htonl(pVgroup->version);
|
||||||
pCreate->cacheBlockSize = htonl(pDb->cfg.cacheBlockSize);
|
pCreate->cacheBlockSize = htonl(pDb->cfg.cacheBlockSize);
|
||||||
|
@ -231,7 +231,7 @@ SDropVnodeMsg *mndBuildDropVnodeMsg(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *p
|
||||||
|
|
||||||
pDrop->dnodeId = htonl(pDnode->id);
|
pDrop->dnodeId = htonl(pDnode->id);
|
||||||
pDrop->vgId = htonl(pVgroup->vgId);
|
pDrop->vgId = htonl(pVgroup->vgId);
|
||||||
memcpy(pDrop->db, pDb->name, TSDB_FULL_DB_NAME_LEN);
|
memcpy(pDrop->db, pDb->name, TSDB_DB_FNAME_LEN);
|
||||||
pDrop->dbUid = htobe64(pDb->uid);
|
pDrop->dbUid = htobe64(pDb->uid);
|
||||||
|
|
||||||
return pDrop;
|
return pDrop;
|
||||||
|
@ -294,7 +294,7 @@ int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups) {
|
||||||
pVgroup->hashEnd = hashMin + hashInterval * (v + 1) - 1;
|
pVgroup->hashEnd = hashMin + hashInterval * (v + 1) - 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
memcpy(pVgroup->dbName, pDb->name, TSDB_FULL_DB_NAME_LEN);
|
memcpy(pVgroup->dbName, pDb->name, TSDB_DB_FNAME_LEN);
|
||||||
pVgroup->dbUid = pDb->uid;
|
pVgroup->dbUid = pDb->uid;
|
||||||
pVgroup->replica = pDb->cfg.replications;
|
pVgroup->replica = pDb->cfg.replications;
|
||||||
|
|
||||||
|
|
|
@ -500,8 +500,8 @@ int32_t catalogGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet*
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t catalogGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, STableMeta** pTableMeta) {
|
int32_t catalogGetTableMeta(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, STableMeta** pTableMeta) {
|
||||||
return ctgGetTableMetaImpl(pCatalog, pRpc, pMgmtEps, pDBName, pTableName, false, pTableMeta);
|
return ctgGetTableMetaImpl(pCatalog, pTransporter, pMgmtEps, pDBName, pTableName, false, pTableMeta);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName) {
|
int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName) {
|
||||||
|
@ -602,7 +602,7 @@ int32_t catalogGetAllMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* p
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
if (pReq->pTableName) {
|
if (pReq->pTableName) {
|
||||||
char dbName[TSDB_FULL_DB_NAME_LEN];
|
char dbName[TSDB_DB_FNAME_LEN];
|
||||||
int32_t tbNum = (int32_t)taosArrayGetSize(pReq->pTableName);
|
int32_t tbNum = (int32_t)taosArrayGetSize(pReq->pTableName);
|
||||||
if (tbNum > 0) {
|
if (tbNum > 0) {
|
||||||
pRsp->pTableMeta = taosArrayInit(tbNum, POINTER_BYTES);
|
pRsp->pTableMeta = taosArrayInit(tbNum, POINTER_BYTES);
|
||||||
|
|
|
@ -39,6 +39,7 @@ typedef struct IndexCache {
|
||||||
int32_t nTerm;
|
int32_t nTerm;
|
||||||
int8_t type;
|
int8_t type;
|
||||||
|
|
||||||
|
pthread_mutex_t mtx;
|
||||||
} IndexCache;
|
} IndexCache;
|
||||||
|
|
||||||
#define CACHE_VERSION(cache) atomic_load_32(&cache->version)
|
#define CACHE_VERSION(cache) atomic_load_32(&cache->version)
|
||||||
|
@ -71,7 +72,7 @@ void indexCacheUnRef(IndexCache* cache);
|
||||||
|
|
||||||
void indexCacheDebug(IndexCache* cache);
|
void indexCacheDebug(IndexCache* cache);
|
||||||
|
|
||||||
void indexCacheDestroySkiplist(SSkipList* slt);
|
void indexCacheDestroyImm(IndexCache* cache);
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -436,9 +436,7 @@ int indexFlushCacheTFile(SIndex* sIdx, void* cache) {
|
||||||
if (ret != 0) { indexError("faile to write into tindex "); }
|
if (ret != 0) { indexError("faile to write into tindex "); }
|
||||||
}
|
}
|
||||||
// not free later, just put int table cache
|
// not free later, just put int table cache
|
||||||
SSkipList* timm = (SSkipList*)pCache->imm;
|
indexCacheDestroyImm(pCache);
|
||||||
pCache->imm = NULL; // or throw int bg thread
|
|
||||||
indexCacheDestroySkiplist(timm);
|
|
||||||
|
|
||||||
tfileWriteClose(tw);
|
tfileWriteClose(tw);
|
||||||
indexCacheIteratorDestroy(cacheIter);
|
indexCacheIteratorDestroy(cacheIter);
|
||||||
|
|
|
@ -20,7 +20,7 @@
|
||||||
|
|
||||||
#define MAX_INDEX_KEY_LEN 256 // test only, change later
|
#define MAX_INDEX_KEY_LEN 256 // test only, change later
|
||||||
|
|
||||||
#define CACH_LIMIT 1000000
|
#define MEM_TERM_LIMIT 1000000
|
||||||
// ref index_cache.h:22
|
// ref index_cache.h:22
|
||||||
//#define CACHE_KEY_LEN(p) \
|
//#define CACHE_KEY_LEN(p) \
|
||||||
// (sizeof(int32_t) + sizeof(uint16_t) + sizeof(p->colType) + sizeof(p->nColVal) + p->nColVal + sizeof(uint64_t) + sizeof(p->operType))
|
// (sizeof(int32_t) + sizeof(uint16_t) + sizeof(p->colType) + sizeof(p->nColVal) + p->nColVal + sizeof(uint64_t) + sizeof(p->operType))
|
||||||
|
@ -78,6 +78,7 @@ IndexCache* indexCacheCreate(SIndex* idx, const char* colName, int8_t type) {
|
||||||
cache->index = idx;
|
cache->index = idx;
|
||||||
cache->version = 0;
|
cache->version = 0;
|
||||||
|
|
||||||
|
pthread_mutex_init(&cache->mtx, NULL);
|
||||||
indexCacheRef(cache);
|
indexCacheRef(cache);
|
||||||
return cache;
|
return cache;
|
||||||
}
|
}
|
||||||
|
@ -103,13 +104,21 @@ void indexCacheDestroySkiplist(SSkipList* slt) {
|
||||||
}
|
}
|
||||||
tSkipListDestroyIter(iter);
|
tSkipListDestroyIter(iter);
|
||||||
}
|
}
|
||||||
|
void indexCacheDestroyImm(IndexCache* cache) {
|
||||||
|
pthread_mutex_lock(&cache->mtx);
|
||||||
|
SSkipList* timm = (SSkipList*)cache->imm;
|
||||||
|
cache->imm = NULL; // or throw int bg thread
|
||||||
|
pthread_mutex_unlock(&cache->mtx);
|
||||||
|
|
||||||
|
indexCacheDestroySkiplist(timm);
|
||||||
|
}
|
||||||
void indexCacheDestroy(void* cache) {
|
void indexCacheDestroy(void* cache) {
|
||||||
IndexCache* pCache = cache;
|
IndexCache* pCache = cache;
|
||||||
if (pCache == NULL) { return; }
|
if (pCache == NULL) { return; }
|
||||||
tSkipListDestroy(pCache->mem);
|
tSkipListDestroy(pCache->mem);
|
||||||
tSkipListDestroy(pCache->imm);
|
tSkipListDestroy(pCache->imm);
|
||||||
free(pCache->colName);
|
free(pCache->colName);
|
||||||
|
|
||||||
free(pCache);
|
free(pCache);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -170,6 +179,27 @@ int indexCacheSchedToMerge(IndexCache* pCache) {
|
||||||
|
|
||||||
taosScheduleTask(indexQhandle, &schedMsg);
|
taosScheduleTask(indexQhandle, &schedMsg);
|
||||||
}
|
}
|
||||||
|
static void indexCacheMakeRoomForWrite(IndexCache* cache) {
|
||||||
|
while (true) {
|
||||||
|
if (cache->nTerm < MEM_TERM_LIMIT) {
|
||||||
|
cache->nTerm += 1;
|
||||||
|
break;
|
||||||
|
} else if (cache->imm != NULL) {
|
||||||
|
// TODO: wake up by condition variable
|
||||||
|
pthread_mutex_unlock(&cache->mtx);
|
||||||
|
taosMsleep(50);
|
||||||
|
pthread_mutex_lock(&cache->mtx);
|
||||||
|
} else {
|
||||||
|
cache->imm = cache->mem;
|
||||||
|
cache->mem = indexInternalCacheCreate(cache->type);
|
||||||
|
cache->nTerm = 1;
|
||||||
|
// sched to merge
|
||||||
|
// unref cache in bgwork
|
||||||
|
indexCacheSchedToMerge(cache);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
int indexCachePut(void* cache, SIndexTerm* term, uint64_t uid) {
|
int indexCachePut(void* cache, SIndexTerm* term, uint64_t uid) {
|
||||||
if (cache == NULL) { return -1; }
|
if (cache == NULL) { return -1; }
|
||||||
|
|
||||||
|
@ -188,23 +218,12 @@ int indexCachePut(void* cache, SIndexTerm* term, uint64_t uid) {
|
||||||
ct->uid = uid;
|
ct->uid = uid;
|
||||||
ct->operaType = term->operType;
|
ct->operaType = term->operType;
|
||||||
|
|
||||||
|
// ugly code, refactor later
|
||||||
|
pthread_mutex_lock(&pCache->mtx);
|
||||||
|
indexCacheMakeRoomForWrite(pCache);
|
||||||
tSkipListPut(pCache->mem, (char*)ct);
|
tSkipListPut(pCache->mem, (char*)ct);
|
||||||
pCache->nTerm += 1;
|
pthread_mutex_unlock(&pCache->mtx);
|
||||||
|
|
||||||
if (pCache->nTerm >= CACH_LIMIT) {
|
|
||||||
pCache->nTerm = 0;
|
|
||||||
|
|
||||||
while (pCache->imm != NULL) {
|
|
||||||
// do nothong
|
|
||||||
}
|
|
||||||
|
|
||||||
pCache->imm = pCache->mem;
|
|
||||||
pCache->mem = indexInternalCacheCreate(pCache->type);
|
|
||||||
|
|
||||||
// sched to merge
|
|
||||||
// unref cache int bgwork
|
|
||||||
indexCacheSchedToMerge(pCache);
|
|
||||||
}
|
|
||||||
indexCacheUnRef(pCache);
|
indexCacheUnRef(pCache);
|
||||||
return 0;
|
return 0;
|
||||||
// encode end
|
// encode end
|
||||||
|
|
|
@ -445,6 +445,15 @@ tagitemlist1(A) ::= tagitem1(Y). { A = taosArrayInit(4, sizeof(SToken)); taosArr
|
||||||
|
|
||||||
%type tagitem1 {SToken}
|
%type tagitem1 {SToken}
|
||||||
tagitem1(A) ::= MINUS(X) INTEGER(Y). { A.n = X.n + Y.n; A.type = Y.type; }
|
tagitem1(A) ::= MINUS(X) INTEGER(Y). { A.n = X.n + Y.n; A.type = Y.type; }
|
||||||
|
tagitem1(A) ::= MINUS(X) FLOAT(Y). { A.n = X.n + Y.n; A.type = Y.type; }
|
||||||
|
tagitem1(A) ::= PLUS(X) INTEGER(Y). { A.n = X.n + Y.n; A.type = Y.type; }
|
||||||
|
tagitem1(A) ::= PLUS(X) FLOAT(Y). { A.n = X.n + Y.n; A.type = Y.type; }
|
||||||
|
tagitem1(A) ::= INTEGER(X). { A = X; }
|
||||||
|
tagitem1(A) ::= FLOAT(X). { A = X; }
|
||||||
|
tagitem1(A) ::= STRING(X). { A = X; }
|
||||||
|
tagitem1(A) ::= BOOL(X). { A = X; }
|
||||||
|
tagitem1(A) ::= NULL(X). { A = X; }
|
||||||
|
tagitem1(A) ::= NOW(X). { A = X; }
|
||||||
|
|
||||||
%type tagitemlist {SArray*}
|
%type tagitemlist {SArray*}
|
||||||
%destructor tagitemlist {taosArrayDestroy($$);}
|
%destructor tagitemlist {taosArrayDestroy($$);}
|
||||||
|
|
|
@ -215,6 +215,7 @@
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
#define TK_SPACE 300
|
#define TK_SPACE 300
|
||||||
#define TK_COMMENT 301
|
#define TK_COMMENT 301
|
||||||
#define TK_ILLEGAL 302
|
#define TK_ILLEGAL 302
|
||||||
|
|
|
@ -313,7 +313,7 @@ int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx *pCtx, SMsgBuf* p
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
code = tNameExtractFullName(&name, pCreateTableInfo->tagdata.name);
|
code = tNameGetTableName(&name, pCreateTableInfo->tagdata.name);
|
||||||
|
|
||||||
SArray* pValList = pCreateTableInfo->pTagVals;
|
SArray* pValList = pCreateTableInfo->pTagVals;
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
@ -323,6 +323,11 @@ int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx *pCtx, SMsgBuf* p
|
||||||
size_t valSize = taosArrayGetSize(pValList);
|
size_t valSize = taosArrayGetSize(pValList);
|
||||||
STableMeta* pSuperTableMeta = NULL;
|
STableMeta* pSuperTableMeta = NULL;
|
||||||
|
|
||||||
|
char dbName[TSDB_DB_FNAME_LEN] = {0};
|
||||||
|
tNameGetFullDbName(&name, dbName);
|
||||||
|
|
||||||
|
catalogGetTableMeta(pCtx->pCatalog, pCtx->pTransporter, &pCtx->mgmtEpSet, dbName, pCreateTableInfo->tagdata.name, &pSuperTableMeta);
|
||||||
|
|
||||||
// too long tag values will return invalid sql, not be truncated automatically
|
// too long tag values will return invalid sql, not be truncated automatically
|
||||||
SSchema *pTagSchema = getTableTagSchema(pSuperTableMeta);
|
SSchema *pTagSchema = getTableTagSchema(pSuperTableMeta);
|
||||||
STableComInfo tinfo = getTableInfo(pSuperTableMeta);
|
STableComInfo tinfo = getTableInfo(pSuperTableMeta);
|
||||||
|
|
|
@ -162,7 +162,7 @@ static int32_t buildName(SInsertParseContext* pCxt, SToken* pStname, char* fullD
|
||||||
strncpy(fullDbName + n, pStname->z, p - pStname->z);
|
strncpy(fullDbName + n, pStname->z, p - pStname->z);
|
||||||
strncpy(tableName, p + 1, pStname->n - (p - pStname->z) - 1);
|
strncpy(tableName, p + 1, pStname->n - (p - pStname->z) - 1);
|
||||||
} else {
|
} else {
|
||||||
snprintf(fullDbName, TSDB_FULL_DB_NAME_LEN, "%d.%s", pCxt->pComCxt->ctx.acctId, pCxt->pComCxt->ctx.db);
|
snprintf(fullDbName, TSDB_DB_FNAME_LEN, "%d.%s", pCxt->pComCxt->ctx.acctId, pCxt->pComCxt->ctx.db);
|
||||||
strncpy(tableName, pStname->z, pStname->n);
|
strncpy(tableName, pStname->z, pStname->n);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -170,12 +170,12 @@ static int32_t buildName(SInsertParseContext* pCxt, SToken* pStname, char* fullD
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t getTableMeta(SInsertParseContext* pCxt, SToken* pTname) {
|
static int32_t getTableMeta(SInsertParseContext* pCxt, SToken* pTname) {
|
||||||
char fullDbName[TSDB_FULL_DB_NAME_LEN] = {0};
|
char fullDbName[TSDB_DB_FNAME_LEN] = {0};
|
||||||
char tableName[TSDB_TABLE_NAME_LEN] = {0};
|
char tableName[TSDB_TABLE_NAME_LEN] = {0};
|
||||||
CHECK_CODE(buildName(pCxt, pTname, fullDbName, tableName));
|
CHECK_CODE(buildName(pCxt, pTname, fullDbName, tableName));
|
||||||
CHECK_CODE(catalogGetTableMeta(pCxt->pComCxt->pCatalog, pCxt->pComCxt->pRpc, pCxt->pComCxt->pEpSet, fullDbName, tableName, &pCxt->pTableMeta));
|
CHECK_CODE(catalogGetTableMeta(pCxt->pComCxt->ctx.pCatalog, pCxt->pComCxt->ctx.pTransporter, &pCxt->pComCxt->ctx.mgmtEpSet, fullDbName, tableName, &pCxt->pTableMeta));
|
||||||
SVgroupInfo vg;
|
SVgroupInfo vg;
|
||||||
CHECK_CODE(catalogGetTableHashVgroup(pCxt->pComCxt->pCatalog, pCxt->pComCxt->pRpc, pCxt->pComCxt->pEpSet, fullDbName, tableName, &vg));
|
CHECK_CODE(catalogGetTableHashVgroup(pCxt->pComCxt->ctx.pCatalog, pCxt->pComCxt->ctx.pTransporter, &pCxt->pComCxt->ctx.mgmtEpSet, fullDbName, tableName, &vg));
|
||||||
CHECK_CODE(taosHashPut(pCxt->pVgroupsHashObj, (const char*)&vg.vgId, sizeof(vg.vgId), (char*)&vg, sizeof(vg)));
|
CHECK_CODE(taosHashPut(pCxt->pVgroupsHashObj, (const char*)&vg.vgId, sizeof(vg.vgId), (char*)&vg, sizeof(vg)));
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
|
@ -1918,7 +1918,7 @@ char* cloneCurrentDBName(SSqlObj* pSql) {
|
||||||
case TAOS_REQ_FROM_HTTP:
|
case TAOS_REQ_FROM_HTTP:
|
||||||
pCtx = pSql->param;
|
pCtx = pSql->param;
|
||||||
if (pCtx && pCtx->db[0] != '\0') {
|
if (pCtx && pCtx->db[0] != '\0') {
|
||||||
char db[TSDB_FULL_DB_NAME_LEN] = {0};
|
char db[TSDB_DB_FNAME_LEN] = {0};
|
||||||
int32_t len = sprintf(db, "%s%s%s", pTscObj->acctId, TS_PATH_DELIMITER, pCtx->db);
|
int32_t len = sprintf(db, "%s%s%s", pTscObj->acctId, TS_PATH_DELIMITER, pCtx->db);
|
||||||
assert(len <= sizeof(db));
|
assert(len <= sizeof(db));
|
||||||
|
|
||||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -672,7 +672,7 @@ SToken tStrGetToken(const char* str, int32_t* i, bool isPrevOptr) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
t0.z = str + (*i);
|
t0.z = (char*) str + (*i);
|
||||||
*i += t0.n;
|
*i += t0.n;
|
||||||
|
|
||||||
return t0;
|
return t0;
|
||||||
|
|
|
@ -715,7 +715,7 @@ TEST(testCase, show_user_Test) {
|
||||||
ASSERT_EQ(info1.valid, true);
|
ASSERT_EQ(info1.valid, true);
|
||||||
|
|
||||||
SDclStmtInfo output;
|
SDclStmtInfo output;
|
||||||
SParseBasicCtx ct= {.db= "abc", .acctId = 1, .requestId = 1};
|
SParseBasicCtx ct= {.requestId = 1, .acctId = 1, .db = "abc", .pTransporter = NULL};
|
||||||
int32_t code = qParserValidateDclSqlNode(&info1, &ct, &output, msg, buf.len);
|
int32_t code = qParserValidateDclSqlNode(&info1, &ct, &output, msg, buf.len);
|
||||||
ASSERT_EQ(code, 0);
|
ASSERT_EQ(code, 0);
|
||||||
|
|
||||||
|
@ -736,7 +736,7 @@ TEST(testCase, create_user_Test) {
|
||||||
ASSERT_EQ(isDclSqlStatement(&info1), true);
|
ASSERT_EQ(isDclSqlStatement(&info1), true);
|
||||||
|
|
||||||
SDclStmtInfo output;
|
SDclStmtInfo output;
|
||||||
SParseBasicCtx ct= {.db= "abc", .acctId = 1, .requestId = 1};
|
SParseBasicCtx ct= {.requestId = 1, .acctId = 1, .db = "abc"};
|
||||||
int32_t code = qParserValidateDclSqlNode(&info1, &ct, &output, msg, buf.len);
|
int32_t code = qParserValidateDclSqlNode(&info1, &ct, &output, msg, buf.len);
|
||||||
ASSERT_EQ(code, 0);
|
ASSERT_EQ(code, 0);
|
||||||
|
|
||||||
|
|
|
@ -857,3 +857,84 @@ int32_t stringToSubplan(const char* str, SSubplan** subplan) {
|
||||||
*subplan = subplanFromJson(json);
|
*subplan = subplanFromJson(json);
|
||||||
return (NULL == *subplan ? TSDB_CODE_FAILED : TSDB_CODE_SUCCESS);
|
return (NULL == *subplan ? TSDB_CODE_FAILED : TSDB_CODE_SUCCESS);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
cJSON* qDagToJson(const SQueryDag* pDag) {
|
||||||
|
cJSON* pRoot = cJSON_CreateObject();
|
||||||
|
if(pRoot == NULL) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
cJSON_AddNumberToObject(pRoot, "numOfSubplans", pDag->numOfSubplans);
|
||||||
|
cJSON_AddNumberToObject(pRoot, "queryId", pDag->queryId);
|
||||||
|
cJSON *pLevels = cJSON_CreateArray();
|
||||||
|
if(pLevels == NULL) {
|
||||||
|
cJSON_Delete(pRoot);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
cJSON_AddItemToObject(pRoot, "pSubplans", pLevels);
|
||||||
|
size_t level = taosArrayGetSize(pDag->pSubplans);
|
||||||
|
for(size_t i = 0; i < level; i++) {
|
||||||
|
const SArray* pSubplans = (const SArray*)taosArrayGetP(pDag->pSubplans, i);
|
||||||
|
size_t num = taosArrayGetSize(pSubplans);
|
||||||
|
cJSON* plansOneLevel = cJSON_CreateArray();
|
||||||
|
if(plansOneLevel == NULL) {
|
||||||
|
cJSON_Delete(pRoot);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
cJSON_AddItemToArray(pLevels, plansOneLevel);
|
||||||
|
for(size_t j = 0; j < num; j++) {
|
||||||
|
cJSON* pSubplan = subplanToJson((const SSubplan*)taosArrayGetP(pSubplans, j));
|
||||||
|
if(pSubplan == NULL) {
|
||||||
|
cJSON_Delete(pRoot);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
cJSON_AddItemToArray(plansOneLevel, pSubplan);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return pRoot;
|
||||||
|
}
|
||||||
|
|
||||||
|
char* qDagToString(const SQueryDag* pDag) {
|
||||||
|
cJSON* pRoot = qDagToJson(pDag);
|
||||||
|
return cJSON_Print(pRoot);
|
||||||
|
}
|
||||||
|
|
||||||
|
SQueryDag* qJsonToDag(const cJSON* pRoot) {
|
||||||
|
SQueryDag* pDag = malloc(sizeof(SQueryDag));
|
||||||
|
if(pDag == NULL) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
pDag->numOfSubplans = cJSON_GetNumberValue(cJSON_GetObjectItem(pRoot, "numOfSubplans"));
|
||||||
|
pDag->queryId = cJSON_GetNumberValue(cJSON_GetObjectItem(pRoot, "queryId"));
|
||||||
|
pDag->pSubplans = taosArrayInit(0, sizeof(SArray));
|
||||||
|
if (pDag->pSubplans == NULL) {
|
||||||
|
free(pDag);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
cJSON* pLevels = cJSON_GetObjectItem(pRoot, "pSubplans");
|
||||||
|
int level = cJSON_GetArraySize(pLevels);
|
||||||
|
for(int i = 0; i < level; i++) {
|
||||||
|
SArray* plansOneLevel = taosArrayInit(0, sizeof(void*));
|
||||||
|
if(plansOneLevel == NULL) {
|
||||||
|
for(int j = 0; j < i; j++) {
|
||||||
|
taosArrayDestroy(taosArrayGetP(pDag->pSubplans, j));
|
||||||
|
}
|
||||||
|
taosArrayDestroy(pDag->pSubplans);
|
||||||
|
free(pDag);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
cJSON* pItem = cJSON_GetArrayItem(pLevels, i);
|
||||||
|
int sz = cJSON_GetArraySize(pItem);
|
||||||
|
for(int j = 0; j < sz; j++) {
|
||||||
|
cJSON* pSubplanJson = cJSON_GetArrayItem(pItem, j);
|
||||||
|
SSubplan* pSubplan = subplanFromJson(pSubplanJson);
|
||||||
|
taosArrayPush(plansOneLevel, &pSubplan);
|
||||||
|
}
|
||||||
|
taosArrayPush(pDag->pSubplans, plansOneLevel);
|
||||||
|
}
|
||||||
|
return pDag;
|
||||||
|
}
|
||||||
|
|
||||||
|
SQueryDag* qStringToDag(const char* pStr) {
|
||||||
|
cJSON* pRoot = cJSON_Parse(pStr);
|
||||||
|
return qJsonToDag(pRoot);
|
||||||
|
}
|
||||||
|
|
|
@ -83,7 +83,7 @@ protected:
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
SQueryDag* reslut() {
|
SQueryDag* result() {
|
||||||
return dag_.get();
|
return dag_.get();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -149,16 +149,23 @@ TEST_F(PhyPlanTest, tableScanTest) {
|
||||||
pushScan("test", "t1", QNODE_TABLESCAN);
|
pushScan("test", "t1", QNODE_TABLESCAN);
|
||||||
ASSERT_EQ(run(), TSDB_CODE_SUCCESS);
|
ASSERT_EQ(run(), TSDB_CODE_SUCCESS);
|
||||||
explain();
|
explain();
|
||||||
SQueryDag* dag = reslut();
|
SQueryDag* dag = result();
|
||||||
// todo check
|
// todo check
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST_F(PhyPlanTest, serializeTest) {
|
||||||
|
pushScan("test", "t1", QNODE_TABLESCAN);
|
||||||
|
ASSERT_EQ(run(), TSDB_CODE_SUCCESS);
|
||||||
|
SQueryDag* dag = result();
|
||||||
|
cout << qDagToString(dag) << endl;
|
||||||
|
}
|
||||||
|
|
||||||
// select * from supertable
|
// select * from supertable
|
||||||
TEST_F(PhyPlanTest, superTableScanTest) {
|
TEST_F(PhyPlanTest, superTableScanTest) {
|
||||||
pushScan("test", "st1", QNODE_TABLESCAN);
|
pushScan("test", "st1", QNODE_TABLESCAN);
|
||||||
ASSERT_EQ(run(), TSDB_CODE_SUCCESS);
|
ASSERT_EQ(run(), TSDB_CODE_SUCCESS);
|
||||||
explain();
|
explain();
|
||||||
SQueryDag* dag = reslut();
|
SQueryDag* dag = result();
|
||||||
// todo check
|
// todo check
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -166,6 +173,6 @@ TEST_F(PhyPlanTest, superTableScanTest) {
|
||||||
TEST_F(PhyPlanTest, insertTest) {
|
TEST_F(PhyPlanTest, insertTest) {
|
||||||
ASSERT_EQ(run("test", "insert into t1 values (now, 1, \"beijing\")"), TSDB_CODE_SUCCESS);
|
ASSERT_EQ(run("test", "insert into t1 values (now, 1, \"beijing\")"), TSDB_CODE_SUCCESS);
|
||||||
explain();
|
explain();
|
||||||
SQueryDag* dag = reslut();
|
SQueryDag* dag = result();
|
||||||
// todo check
|
// todo check
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue