enh(tmq): remove topic out of db
This commit is contained in:
parent
df1ebbc25b
commit
06a370c8de
|
@ -17,6 +17,7 @@
|
||||||
#include <stdio.h>
|
#include <stdio.h>
|
||||||
#include <string.h>
|
#include <string.h>
|
||||||
#include <time.h>
|
#include <time.h>
|
||||||
|
#include <unistd.h>
|
||||||
#include "taos.h"
|
#include "taos.h"
|
||||||
|
|
||||||
static int running = 1;
|
static int running = 1;
|
||||||
|
@ -47,6 +48,7 @@ int32_t init_env() {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
taos_free_result(pRes);
|
taos_free_result(pRes);
|
||||||
|
sleep(1);
|
||||||
|
|
||||||
pRes = taos_query(pConn, "use abc1");
|
pRes = taos_query(pConn, "use abc1");
|
||||||
if (taos_errno(pRes) != 0) {
|
if (taos_errno(pRes) != 0) {
|
||||||
|
@ -58,6 +60,7 @@ int32_t init_env() {
|
||||||
pRes =
|
pRes =
|
||||||
taos_query(pConn, "create stable if not exists st1 (ts timestamp, c1 int, c2 float, c3 binary(10)) tags(t1 int)");
|
taos_query(pConn, "create stable if not exists st1 (ts timestamp, c1 int, c2 float, c3 binary(10)) tags(t1 int)");
|
||||||
if (taos_errno(pRes) != 0) {
|
if (taos_errno(pRes) != 0) {
|
||||||
|
assert(0);
|
||||||
printf("failed to create super table st1, reason:%s\n", taos_errstr(pRes));
|
printf("failed to create super table st1, reason:%s\n", taos_errstr(pRes));
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -265,10 +268,11 @@ void perf_loop(tmq_t* tmq, tmq_list_t* topics) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int main(int argc, char* argv[]) {
|
int main(int argc, char* argv[]) {
|
||||||
int code;
|
|
||||||
if (argc > 1) {
|
if (argc > 1) {
|
||||||
printf("env init\n");
|
printf("env init\n");
|
||||||
code = init_env();
|
if (init_env() < 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
create_topic();
|
create_topic();
|
||||||
}
|
}
|
||||||
tmq_t* tmq = build_consumer();
|
tmq_t* tmq = build_consumer();
|
||||||
|
|
|
@ -1314,7 +1314,6 @@ typedef struct {
|
||||||
} SMqConsumerLostMsg;
|
} SMqConsumerLostMsg;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int32_t topicNum;
|
|
||||||
int64_t consumerId;
|
int64_t consumerId;
|
||||||
char cgroup[TSDB_CGROUP_LEN];
|
char cgroup[TSDB_CGROUP_LEN];
|
||||||
SArray* topicNames; // SArray<char*>
|
SArray* topicNames; // SArray<char*>
|
||||||
|
@ -1322,22 +1321,27 @@ typedef struct {
|
||||||
|
|
||||||
static FORCE_INLINE int32_t tSerializeSCMSubscribeReq(void** buf, const SCMSubscribeReq* pReq) {
|
static FORCE_INLINE int32_t tSerializeSCMSubscribeReq(void** buf, const SCMSubscribeReq* pReq) {
|
||||||
int32_t tlen = 0;
|
int32_t tlen = 0;
|
||||||
tlen += taosEncodeFixedI32(buf, pReq->topicNum);
|
|
||||||
tlen += taosEncodeFixedI64(buf, pReq->consumerId);
|
tlen += taosEncodeFixedI64(buf, pReq->consumerId);
|
||||||
tlen += taosEncodeString(buf, pReq->cgroup);
|
tlen += taosEncodeString(buf, pReq->cgroup);
|
||||||
|
|
||||||
for (int32_t i = 0; i < pReq->topicNum; i++) {
|
int32_t topicNum = taosArrayGetSize(pReq->topicNames);
|
||||||
|
tlen += taosEncodeFixedI32(buf, topicNum);
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < topicNum; i++) {
|
||||||
tlen += taosEncodeString(buf, (char*)taosArrayGetP(pReq->topicNames, i));
|
tlen += taosEncodeString(buf, (char*)taosArrayGetP(pReq->topicNames, i));
|
||||||
}
|
}
|
||||||
return tlen;
|
return tlen;
|
||||||
}
|
}
|
||||||
|
|
||||||
static FORCE_INLINE void* tDeserializeSCMSubscribeReq(void* buf, SCMSubscribeReq* pReq) {
|
static FORCE_INLINE void* tDeserializeSCMSubscribeReq(void* buf, SCMSubscribeReq* pReq) {
|
||||||
buf = taosDecodeFixedI32(buf, &pReq->topicNum);
|
|
||||||
buf = taosDecodeFixedI64(buf, &pReq->consumerId);
|
buf = taosDecodeFixedI64(buf, &pReq->consumerId);
|
||||||
buf = taosDecodeStringTo(buf, pReq->cgroup);
|
buf = taosDecodeStringTo(buf, pReq->cgroup);
|
||||||
pReq->topicNames = taosArrayInit(pReq->topicNum, sizeof(void*));
|
|
||||||
for (int32_t i = 0; i < pReq->topicNum; i++) {
|
int32_t topicNum;
|
||||||
|
buf = taosDecodeFixedI32(buf, &topicNum);
|
||||||
|
|
||||||
|
pReq->topicNames = taosArrayInit(topicNum, sizeof(void*));
|
||||||
|
for (int32_t i = 0; i < topicNum; i++) {
|
||||||
char* name;
|
char* name;
|
||||||
buf = taosDecodeString(buf, &name);
|
buf = taosDecodeString(buf, &name);
|
||||||
taosArrayPush(pReq->topicNames, &name);
|
taosArrayPush(pReq->topicNames, &name);
|
||||||
|
|
|
@ -506,7 +506,6 @@ tmq_resp_err_t tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) {
|
||||||
tmq->clientTopics = taosArrayInit(sz, sizeof(SMqClientTopic));
|
tmq->clientTopics = taosArrayInit(sz, sizeof(SMqClientTopic));
|
||||||
|
|
||||||
SCMSubscribeReq req;
|
SCMSubscribeReq req;
|
||||||
req.topicNum = sz;
|
|
||||||
req.consumerId = tmq->consumerId;
|
req.consumerId = tmq->consumerId;
|
||||||
strcpy(req.cgroup, tmq->groupId);
|
strcpy(req.cgroup, tmq->groupId);
|
||||||
req.topicNames = taosArrayInit(sz, sizeof(void*));
|
req.topicNames = taosArrayInit(sz, sizeof(void*));
|
||||||
|
@ -516,12 +515,16 @@ tmq_resp_err_t tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) {
|
||||||
char* topicName = taosArrayGetP(container, i);
|
char* topicName = taosArrayGetP(container, i);
|
||||||
|
|
||||||
SName name = {0};
|
SName name = {0};
|
||||||
|
#if 0
|
||||||
char* dbName = getDbOfConnection(tmq->pTscObj);
|
char* dbName = getDbOfConnection(tmq->pTscObj);
|
||||||
if (dbName == NULL) {
|
if (dbName == NULL) {
|
||||||
return TMQ_RESP_ERR__FAIL;
|
return TMQ_RESP_ERR__FAIL;
|
||||||
}
|
}
|
||||||
tNameSetDbName(&name, tmq->pTscObj->acctId, dbName, strlen(dbName));
|
#endif
|
||||||
|
tNameSetDbName(&name, tmq->pTscObj->acctId, topicName, strlen(topicName));
|
||||||
|
#if 0
|
||||||
tNameFromString(&name, topicName, T_NAME_TABLE);
|
tNameFromString(&name, topicName, T_NAME_TABLE);
|
||||||
|
#endif
|
||||||
|
|
||||||
char* topicFname = taosMemoryCalloc(1, TSDB_TOPIC_FNAME_LEN);
|
char* topicFname = taosMemoryCalloc(1, TSDB_TOPIC_FNAME_LEN);
|
||||||
if (topicFname == NULL) {
|
if (topicFname == NULL) {
|
||||||
|
@ -539,7 +542,9 @@ tmq_resp_err_t tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) {
|
||||||
topic.vgs = taosArrayInit(0, sizeof(SMqClientVg));
|
topic.vgs = taosArrayInit(0, sizeof(SMqClientVg));
|
||||||
taosArrayPush(tmq->clientTopics, &topic);
|
taosArrayPush(tmq->clientTopics, &topic);
|
||||||
taosArrayPush(req.topicNames, &topicFname);
|
taosArrayPush(req.topicNames, &topicFname);
|
||||||
|
#if 0
|
||||||
taosMemoryFree(dbName);
|
taosMemoryFree(dbName);
|
||||||
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
int tlen = tSerializeSCMSubscribeReq(NULL, &req);
|
int tlen = tSerializeSCMSubscribeReq(NULL, &req);
|
||||||
|
|
|
@ -232,6 +232,7 @@ void mndReleaseTopic(SMnode *pMnode, SMqTopicObj *pTopic) {
|
||||||
sdbRelease(pSdb, pTopic);
|
sdbRelease(pSdb, pTopic);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#if 0
|
||||||
static SDbObj *mndAcquireDbByTopic(SMnode *pMnode, char *topicName) {
|
static SDbObj *mndAcquireDbByTopic(SMnode *pMnode, char *topicName) {
|
||||||
SName name = {0};
|
SName name = {0};
|
||||||
tNameFromString(&name, topicName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
|
tNameFromString(&name, topicName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
|
||||||
|
@ -241,6 +242,7 @@ static SDbObj *mndAcquireDbByTopic(SMnode *pMnode, char *topicName) {
|
||||||
|
|
||||||
return mndAcquireDb(pMnode, db);
|
return mndAcquireDb(pMnode, db);
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
static SDDropTopicReq *mndBuildDropTopicMsg(SMnode *pMnode, SVgObj *pVgroup, SMqTopicObj *pTopic) {
|
static SDDropTopicReq *mndBuildDropTopicMsg(SMnode *pMnode, SVgObj *pVgroup, SMqTopicObj *pTopic) {
|
||||||
int32_t contLen = sizeof(SDDropTopicReq);
|
int32_t contLen = sizeof(SDDropTopicReq);
|
||||||
|
@ -260,15 +262,11 @@ static SDDropTopicReq *mndBuildDropTopicMsg(SMnode *pMnode, SVgObj *pVgroup, SMq
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndCheckCreateTopicReq(SCMCreateTopicReq *pCreate) {
|
static int32_t mndCheckCreateTopicReq(SCMCreateTopicReq *pCreate) {
|
||||||
if (pCreate->name[0] == 0 || pCreate->sql == NULL || pCreate->sql[0] == 0) {
|
if (pCreate->name[0] == 0 || pCreate->sql == NULL || pCreate->sql[0] == 0 || pCreate->subscribeDbName[0] == 0) {
|
||||||
terrno = TSDB_CODE_MND_INVALID_TOPIC_OPTION;
|
terrno = TSDB_CODE_MND_INVALID_TOPIC_OPTION;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if ((pCreate->ast == NULL || pCreate->ast[0] == 0) && pCreate->subscribeDbName[0] == 0) {
|
|
||||||
terrno = TSDB_CODE_MND_INVALID_TOPIC_OPTION;
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -384,7 +382,7 @@ static int32_t mndProcessCreateTopicReq(SNodeMsg *pReq) {
|
||||||
goto CREATE_TOPIC_OVER;
|
goto CREATE_TOPIC_OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
pDb = mndAcquireDbByTopic(pMnode, createTopicReq.name);
|
pDb = mndAcquireDb(pMnode, createTopicReq.subscribeDbName);
|
||||||
if (pDb == NULL) {
|
if (pDb == NULL) {
|
||||||
terrno = TSDB_CODE_MND_DB_NOT_SELECTED;
|
terrno = TSDB_CODE_MND_DB_NOT_SELECTED;
|
||||||
goto CREATE_TOPIC_OVER;
|
goto CREATE_TOPIC_OVER;
|
||||||
|
|
|
@ -198,7 +198,7 @@ static int32_t getDBVgVersion(STranslateContext* pCxt, const char* pDbFName, int
|
||||||
|
|
||||||
static int32_t getDBCfg(STranslateContext* pCxt, const char* pDbName, SDbCfgInfo* pInfo) {
|
static int32_t getDBCfg(STranslateContext* pCxt, const char* pDbName, SDbCfgInfo* pInfo) {
|
||||||
SParseContext* pParCxt = pCxt->pParseCxt;
|
SParseContext* pParCxt = pCxt->pParseCxt;
|
||||||
SName name;
|
SName name;
|
||||||
tNameSetDbName(&name, pCxt->pParseCxt->acctId, pDbName, strlen(pDbName));
|
tNameSetDbName(&name, pCxt->pParseCxt->acctId, pDbName, strlen(pDbName));
|
||||||
char dbFname[TSDB_DB_FNAME_LEN] = {0};
|
char dbFname[TSDB_DB_FNAME_LEN] = {0};
|
||||||
tNameGetFullDbName(&name, dbFname);
|
tNameGetFullDbName(&name, dbFname);
|
||||||
|
@ -560,8 +560,7 @@ static EDealRes translateOperator(STranslateContext* pCxt, SOperatorNode* pOp) {
|
||||||
pOp->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_DOUBLE].bytes;
|
pOp->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_DOUBLE].bytes;
|
||||||
}
|
}
|
||||||
} else if (nodesIsComparisonOp(pOp)) {
|
} else if (nodesIsComparisonOp(pOp)) {
|
||||||
if (TSDB_DATA_TYPE_BLOB == ldt.type || TSDB_DATA_TYPE_JSON == rdt.type ||
|
if (TSDB_DATA_TYPE_BLOB == ldt.type || TSDB_DATA_TYPE_JSON == rdt.type || TSDB_DATA_TYPE_BLOB == rdt.type) {
|
||||||
TSDB_DATA_TYPE_BLOB == rdt.type) {
|
|
||||||
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_WRONG_VALUE_TYPE, ((SExprNode*)(pOp->pRight))->aliasName);
|
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_WRONG_VALUE_TYPE, ((SExprNode*)(pOp->pRight))->aliasName);
|
||||||
}
|
}
|
||||||
if (OP_TYPE_IN == pOp->opType || OP_TYPE_NOT_IN == pOp->opType) {
|
if (OP_TYPE_IN == pOp->opType || OP_TYPE_NOT_IN == pOp->opType) {
|
||||||
|
@ -569,7 +568,7 @@ static EDealRes translateOperator(STranslateContext* pCxt, SOperatorNode* pOp) {
|
||||||
}
|
}
|
||||||
pOp->node.resType.type = TSDB_DATA_TYPE_BOOL;
|
pOp->node.resType.type = TSDB_DATA_TYPE_BOOL;
|
||||||
pOp->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_BOOL].bytes;
|
pOp->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_BOOL].bytes;
|
||||||
} else if (nodesIsJsonOp(pOp)){
|
} else if (nodesIsJsonOp(pOp)) {
|
||||||
if (TSDB_DATA_TYPE_JSON != ldt.type || TSDB_DATA_TYPE_BINARY != rdt.type) {
|
if (TSDB_DATA_TYPE_JSON != ldt.type || TSDB_DATA_TYPE_BINARY != rdt.type) {
|
||||||
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_WRONG_VALUE_TYPE, ((SExprNode*)(pOp->pRight))->aliasName);
|
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_WRONG_VALUE_TYPE, ((SExprNode*)(pOp->pRight))->aliasName);
|
||||||
}
|
}
|
||||||
|
@ -588,7 +587,9 @@ static EDealRes haveAggFunction(SNode* pNode, void* pContext) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static EDealRes translateFunction(STranslateContext* pCxt, SFunctionNode* pFunc) {
|
static EDealRes translateFunction(STranslateContext* pCxt, SFunctionNode* pFunc) {
|
||||||
SFmGetFuncInfoParam param = { .pCtg = pCxt->pParseCxt->pCatalog, .pRpc = pCxt->pParseCxt->pTransporter, .pMgmtEps = &pCxt->pParseCxt->mgmtEpSet};
|
SFmGetFuncInfoParam param = {.pCtg = pCxt->pParseCxt->pCatalog,
|
||||||
|
.pRpc = pCxt->pParseCxt->pTransporter,
|
||||||
|
.pMgmtEps = &pCxt->pParseCxt->mgmtEpSet};
|
||||||
if (TSDB_CODE_SUCCESS != fmGetFuncInfo(¶m, pFunc->functionName, &pFunc->funcId, &pFunc->funcType)) {
|
if (TSDB_CODE_SUCCESS != fmGetFuncInfo(¶m, pFunc->functionName, &pFunc->funcId, &pFunc->funcType)) {
|
||||||
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_INVALID_FUNTION, pFunc->functionName);
|
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_INVALID_FUNTION, pFunc->functionName);
|
||||||
}
|
}
|
||||||
|
@ -1268,7 +1269,7 @@ static int32_t checkIntervalWindow(STranslateContext* pCxt, SIntervalWindowNode*
|
||||||
static EDealRes checkStateExpr(SNode* pNode, void* pContext) {
|
static EDealRes checkStateExpr(SNode* pNode, void* pContext) {
|
||||||
if (QUERY_NODE_COLUMN == nodeType(pNode)) {
|
if (QUERY_NODE_COLUMN == nodeType(pNode)) {
|
||||||
STranslateContext* pCxt = pContext;
|
STranslateContext* pCxt = pContext;
|
||||||
SColumnNode* pCol = (SColumnNode*)pNode;
|
SColumnNode* pCol = (SColumnNode*)pNode;
|
||||||
if (!IS_INTEGER_TYPE(pCol->node.resType.type)) {
|
if (!IS_INTEGER_TYPE(pCol->node.resType.type)) {
|
||||||
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_INVALID_STATE_WIN_TYPE);
|
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_INVALID_STATE_WIN_TYPE);
|
||||||
}
|
}
|
||||||
|
@ -1345,7 +1346,8 @@ static int32_t translateFrom(STranslateContext* pCxt, SSelectStmt* pSelect) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t checkLimit(STranslateContext* pCxt, SSelectStmt* pSelect) {
|
static int32_t checkLimit(STranslateContext* pCxt, SSelectStmt* pSelect) {
|
||||||
if ((NULL != pSelect->pLimit && pSelect->pLimit->offset < 0) || (NULL != pSelect->pSlimit && pSelect->pSlimit->offset < 0)) {
|
if ((NULL != pSelect->pLimit && pSelect->pLimit->offset < 0) ||
|
||||||
|
(NULL != pSelect->pSlimit && pSelect->pSlimit->offset < 0)) {
|
||||||
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_OFFSET_LESS_ZERO);
|
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_OFFSET_LESS_ZERO);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1438,7 +1440,7 @@ static int32_t translateSetOperatorImpl(STranslateContext* pCxt, SSetOperator* p
|
||||||
SExprNode* pLeftExpr = (SExprNode*)pLeft;
|
SExprNode* pLeftExpr = (SExprNode*)pLeft;
|
||||||
SExprNode* pRightExpr = (SExprNode*)pRight;
|
SExprNode* pRightExpr = (SExprNode*)pRight;
|
||||||
if (!dataTypeEqual(&pLeftExpr->resType, &pRightExpr->resType)) {
|
if (!dataTypeEqual(&pLeftExpr->resType, &pRightExpr->resType)) {
|
||||||
SNode* pRightFunc = NULL;
|
SNode* pRightFunc = NULL;
|
||||||
int32_t code = createCastFunc(pCxt, pRight, pLeftExpr->resType, &pRightFunc);
|
int32_t code = createCastFunc(pCxt, pRight, pLeftExpr->resType, &pRightFunc);
|
||||||
if (TSDB_CODE_SUCCESS != code) {
|
if (TSDB_CODE_SUCCESS != code) {
|
||||||
return code;
|
return code;
|
||||||
|
@ -1650,7 +1652,8 @@ static int32_t checkKeepOption(STranslateContext* pCxt, SNodeList* pKeep) {
|
||||||
(TIME_UNIT_MINUTE != pKeep1->unit && TIME_UNIT_HOUR != pKeep1->unit && TIME_UNIT_DAY != pKeep1->unit)) ||
|
(TIME_UNIT_MINUTE != pKeep1->unit && TIME_UNIT_HOUR != pKeep1->unit && TIME_UNIT_DAY != pKeep1->unit)) ||
|
||||||
(pKeep2->isDuration &&
|
(pKeep2->isDuration &&
|
||||||
(TIME_UNIT_MINUTE != pKeep2->unit && TIME_UNIT_HOUR != pKeep2->unit && TIME_UNIT_DAY != pKeep2->unit))) {
|
(TIME_UNIT_MINUTE != pKeep2->unit && TIME_UNIT_HOUR != pKeep2->unit && TIME_UNIT_DAY != pKeep2->unit))) {
|
||||||
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_KEEP_UNIT, pKeep0->unit, pKeep1->unit, pKeep2->unit);
|
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_KEEP_UNIT, pKeep0->unit, pKeep1->unit,
|
||||||
|
pKeep2->unit);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t daysToKeep0 = getBigintFromValueNode(pKeep0);
|
int32_t daysToKeep0 = getBigintFromValueNode(pKeep0);
|
||||||
|
@ -1659,7 +1662,7 @@ static int32_t checkKeepOption(STranslateContext* pCxt, SNodeList* pKeep) {
|
||||||
if (daysToKeep0 < TSDB_MIN_KEEP || daysToKeep1 < TSDB_MIN_KEEP || daysToKeep2 < TSDB_MIN_KEEP ||
|
if (daysToKeep0 < TSDB_MIN_KEEP || daysToKeep1 < TSDB_MIN_KEEP || daysToKeep2 < TSDB_MIN_KEEP ||
|
||||||
daysToKeep0 > TSDB_MAX_KEEP || daysToKeep1 > TSDB_MAX_KEEP || daysToKeep2 > TSDB_MAX_KEEP) {
|
daysToKeep0 > TSDB_MAX_KEEP || daysToKeep1 > TSDB_MAX_KEEP || daysToKeep2 > TSDB_MAX_KEEP) {
|
||||||
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_KEEP_VALUE, daysToKeep0, daysToKeep1, daysToKeep2,
|
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_KEEP_VALUE, daysToKeep0, daysToKeep1, daysToKeep2,
|
||||||
TSDB_MIN_KEEP, TSDB_MAX_KEEP);
|
TSDB_MIN_KEEP, TSDB_MAX_KEEP);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!((daysToKeep0 <= daysToKeep1) && (daysToKeep1 <= daysToKeep2))) {
|
if (!((daysToKeep0 <= daysToKeep1) && (daysToKeep1 <= daysToKeep2))) {
|
||||||
|
@ -1691,7 +1694,8 @@ static int32_t checkDbRetentionsOption(STranslateContext* pCxt, SNodeList* pRete
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t checkOptionsDependency(STranslateContext* pCxt, const char* pDbName, SDatabaseOptions* pOptions, bool alter) {
|
static int32_t checkOptionsDependency(STranslateContext* pCxt, const char* pDbName, SDatabaseOptions* pOptions,
|
||||||
|
bool alter) {
|
||||||
if (NULL == pOptions->pDaysPerFile && NULL == pOptions->pKeep) {
|
if (NULL == pOptions->pDaysPerFile && NULL == pOptions->pKeep) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
@ -1699,7 +1703,7 @@ static int32_t checkOptionsDependency(STranslateContext* pCxt, const char* pDbNa
|
||||||
int64_t daysToKeep0 = GET_OPTION_VAL(nodesListGetNode(pOptions->pKeep, 0), alter ? -1 : TSDB_DEFAULT_KEEP);
|
int64_t daysToKeep0 = GET_OPTION_VAL(nodesListGetNode(pOptions->pKeep, 0), alter ? -1 : TSDB_DEFAULT_KEEP);
|
||||||
if (alter && (-1 == daysPerFile || -1 == daysToKeep0)) {
|
if (alter && (-1 == daysPerFile || -1 == daysToKeep0)) {
|
||||||
SDbCfgInfo dbCfg;
|
SDbCfgInfo dbCfg;
|
||||||
int32_t code = getDBCfg(pCxt, pDbName, &dbCfg);
|
int32_t code = getDBCfg(pCxt, pDbName, &dbCfg);
|
||||||
if (TSDB_CODE_SUCCESS != code) {
|
if (TSDB_CODE_SUCCESS != code) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -1712,7 +1716,8 @@ static int32_t checkOptionsDependency(STranslateContext* pCxt, const char* pDbNa
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t checkDatabaseOptions(STranslateContext* pCxt, const char* pDbName, SDatabaseOptions* pOptions, bool alter) {
|
static int32_t checkDatabaseOptions(STranslateContext* pCxt, const char* pDbName, SDatabaseOptions* pOptions,
|
||||||
|
bool alter) {
|
||||||
int32_t code =
|
int32_t code =
|
||||||
checkRangeOption(pCxt, "totalBlocks", pOptions->pNumOfBlocks, TSDB_MIN_TOTAL_BLOCKS, TSDB_MAX_TOTAL_BLOCKS);
|
checkRangeOption(pCxt, "totalBlocks", pOptions->pNumOfBlocks, TSDB_MIN_TOTAL_BLOCKS, TSDB_MAX_TOTAL_BLOCKS);
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
@ -1918,7 +1923,7 @@ static int32_t checTableFactorOption(STranslateContext* pCxt, SValueNode* pVal)
|
||||||
}
|
}
|
||||||
if (pVal->datum.d < TSDB_MIN_DB_FILE_FACTOR || pVal->datum.d > TSDB_MAX_DB_FILE_FACTOR) {
|
if (pVal->datum.d < TSDB_MIN_DB_FILE_FACTOR || pVal->datum.d > TSDB_MAX_DB_FILE_FACTOR) {
|
||||||
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_F_RANGE_OPTION, "file_factor", pVal->datum.d,
|
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_F_RANGE_OPTION, "file_factor", pVal->datum.d,
|
||||||
TSDB_MIN_DB_FILE_FACTOR, TSDB_MAX_DB_FILE_FACTOR);
|
TSDB_MIN_DB_FILE_FACTOR, TSDB_MAX_DB_FILE_FACTOR);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -1945,7 +1950,7 @@ static int32_t checkTableTags(STranslateContext* pCxt, SCreateTableStmt* pStmt)
|
||||||
SNode* pNode;
|
SNode* pNode;
|
||||||
FOREACH(pNode, pStmt->pTags) {
|
FOREACH(pNode, pStmt->pTags) {
|
||||||
SColumnDefNode* pCol = (SColumnDefNode*)pNode;
|
SColumnDefNode* pCol = (SColumnDefNode*)pNode;
|
||||||
if(pCol->dataType.type == TSDB_DATA_TYPE_JSON && LIST_LENGTH(pStmt->pTags) > 1){
|
if (pCol->dataType.type == TSDB_DATA_TYPE_JSON && LIST_LENGTH(pStmt->pTags) > 1) {
|
||||||
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_ONLY_ONE_JSON_TAG);
|
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_ONLY_ONE_JSON_TAG);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1960,8 +1965,10 @@ static int32_t checkTableRollupOption(STranslateContext* pCxt, SNodeList* pFuncs
|
||||||
if (1 != LIST_LENGTH(pFuncs)) {
|
if (1 != LIST_LENGTH(pFuncs)) {
|
||||||
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_ROLLUP_OPTION);
|
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_ROLLUP_OPTION);
|
||||||
}
|
}
|
||||||
SFunctionNode* pFunc = nodesListGetNode(pFuncs, 0);
|
SFunctionNode* pFunc = nodesListGetNode(pFuncs, 0);
|
||||||
SFmGetFuncInfoParam param = { .pCtg = pCxt->pParseCxt->pCatalog, .pRpc = pCxt->pParseCxt->pTransporter, .pMgmtEps = &pCxt->pParseCxt->mgmtEpSet};
|
SFmGetFuncInfoParam param = {.pCtg = pCxt->pParseCxt->pCatalog,
|
||||||
|
.pRpc = pCxt->pParseCxt->pTransporter,
|
||||||
|
.pMgmtEps = &pCxt->pParseCxt->mgmtEpSet};
|
||||||
if (TSDB_CODE_SUCCESS != fmGetFuncInfo(¶m, pFunc->functionName, &pFunc->funcId, &pFunc->funcType)) {
|
if (TSDB_CODE_SUCCESS != fmGetFuncInfo(¶m, pFunc->functionName, &pFunc->funcId, &pFunc->funcType)) {
|
||||||
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_FUNTION, pFunc->functionName);
|
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_FUNTION, pFunc->functionName);
|
||||||
}
|
}
|
||||||
|
@ -2016,10 +2023,10 @@ static void toSchema(const SColumnDefNode* pCol, col_id_t colId, SSchema* pSchem
|
||||||
typedef struct SSampleAstInfo {
|
typedef struct SSampleAstInfo {
|
||||||
const char* pDbName;
|
const char* pDbName;
|
||||||
const char* pTableName;
|
const char* pTableName;
|
||||||
SNodeList* pFuncs;
|
SNodeList* pFuncs;
|
||||||
SNode* pInterval;
|
SNode* pInterval;
|
||||||
SNode* pOffset;
|
SNode* pOffset;
|
||||||
SNode* pSliding;
|
SNode* pSliding;
|
||||||
STableMeta* pRollupTableMeta;
|
STableMeta* pRollupTableMeta;
|
||||||
} SSampleAstInfo;
|
} SSampleAstInfo;
|
||||||
|
|
||||||
|
@ -2089,8 +2096,8 @@ static SNode* makeIntervalVal(SRetention* pRetension, int8_t precision) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
int64_t timeVal = convertTimeFromPrecisionToUnit(pRetension->freq, precision, pRetension->freqUnit);
|
int64_t timeVal = convertTimeFromPrecisionToUnit(pRetension->freq, precision, pRetension->freqUnit);
|
||||||
char buf[20] = {0};
|
char buf[20] = {0};
|
||||||
int32_t len = snprintf(buf, sizeof(buf), "%"PRId64"%c", timeVal, pRetension->freqUnit);
|
int32_t len = snprintf(buf, sizeof(buf), "%" PRId64 "%c", timeVal, pRetension->freqUnit);
|
||||||
pVal->literal = strndup(buf, len);
|
pVal->literal = strndup(buf, len);
|
||||||
if (NULL == pVal->literal) {
|
if (NULL == pVal->literal) {
|
||||||
nodesDestroyNode(pVal);
|
nodesDestroyNode(pVal);
|
||||||
|
@ -2133,7 +2140,7 @@ static SNodeList* createRollupFuncs(SCreateTableStmt* pStmt) {
|
||||||
SNode* pFunc = NULL;
|
SNode* pFunc = NULL;
|
||||||
FOREACH(pFunc, pStmt->pOptions->pFuncs) {
|
FOREACH(pFunc, pStmt->pOptions->pFuncs) {
|
||||||
SNode* pCol = NULL;
|
SNode* pCol = NULL;
|
||||||
bool primaryKey = true;
|
bool primaryKey = true;
|
||||||
FOREACH(pCol, pStmt->pCols) {
|
FOREACH(pCol, pStmt->pCols) {
|
||||||
if (primaryKey) {
|
if (primaryKey) {
|
||||||
primaryKey = false;
|
primaryKey = false;
|
||||||
|
@ -2150,7 +2157,7 @@ static SNodeList* createRollupFuncs(SCreateTableStmt* pStmt) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static STableMeta* createRollupTableMeta(SCreateTableStmt* pStmt, int8_t precision) {
|
static STableMeta* createRollupTableMeta(SCreateTableStmt* pStmt, int8_t precision) {
|
||||||
int32_t numOfField = LIST_LENGTH(pStmt->pCols) + LIST_LENGTH(pStmt->pTags);
|
int32_t numOfField = LIST_LENGTH(pStmt->pCols) + LIST_LENGTH(pStmt->pTags);
|
||||||
STableMeta* pMeta = taosMemoryCalloc(1, sizeof(STableMeta) + numOfField * sizeof(SSchema));
|
STableMeta* pMeta = taosMemoryCalloc(1, sizeof(STableMeta) + numOfField * sizeof(SSchema));
|
||||||
if (NULL == pMeta) {
|
if (NULL == pMeta) {
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -2161,7 +2168,7 @@ static STableMeta* createRollupTableMeta(SCreateTableStmt* pStmt, int8_t precisi
|
||||||
pMeta->tableInfo.numOfColumns = LIST_LENGTH(pStmt->pCols);
|
pMeta->tableInfo.numOfColumns = LIST_LENGTH(pStmt->pCols);
|
||||||
|
|
||||||
int32_t index = 0;
|
int32_t index = 0;
|
||||||
SNode* pCol = NULL;
|
SNode* pCol = NULL;
|
||||||
FOREACH(pCol, pStmt->pCols) {
|
FOREACH(pCol, pStmt->pCols) {
|
||||||
toSchema((SColumnDefNode*)pCol, index + 1, pMeta->schema + index);
|
toSchema((SColumnDefNode*)pCol, index + 1, pMeta->schema + index);
|
||||||
++index;
|
++index;
|
||||||
|
@ -2175,8 +2182,8 @@ static STableMeta* createRollupTableMeta(SCreateTableStmt* pStmt, int8_t precisi
|
||||||
return pMeta;
|
return pMeta;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t buildSampleAstInfoByTable(STranslateContext* pCxt,
|
static int32_t buildSampleAstInfoByTable(STranslateContext* pCxt, SCreateTableStmt* pStmt, SRetention* pRetension,
|
||||||
SCreateTableStmt* pStmt, SRetention* pRetension, int8_t precision, SSampleAstInfo* pInfo) {
|
int8_t precision, SSampleAstInfo* pInfo) {
|
||||||
pInfo->pDbName = pStmt->dbName;
|
pInfo->pDbName = pStmt->dbName;
|
||||||
pInfo->pTableName = pStmt->tableName;
|
pInfo->pTableName = pStmt->tableName;
|
||||||
pInfo->pFuncs = createRollupFuncs(pStmt);
|
pInfo->pFuncs = createRollupFuncs(pStmt);
|
||||||
|
@ -2188,10 +2195,10 @@ static int32_t buildSampleAstInfoByTable(STranslateContext* pCxt,
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t getRollupAst(STranslateContext* pCxt,
|
static int32_t getRollupAst(STranslateContext* pCxt, SCreateTableStmt* pStmt, SRetention* pRetension, int8_t precision,
|
||||||
SCreateTableStmt* pStmt, SRetention* pRetension, int8_t precision, char** pAst, int32_t* pLen) {
|
char** pAst, int32_t* pLen) {
|
||||||
SSampleAstInfo info = {0};
|
SSampleAstInfo info = {0};
|
||||||
int32_t code = buildSampleAstInfoByTable(pCxt, pStmt, pRetension, precision, &info);
|
int32_t code = buildSampleAstInfoByTable(pCxt, pStmt, pRetension, precision, &info);
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = buildSampleAst(pCxt, &info, pAst, pLen);
|
code = buildSampleAst(pCxt, &info, pAst, pLen);
|
||||||
}
|
}
|
||||||
|
@ -2201,17 +2208,17 @@ static int32_t getRollupAst(STranslateContext* pCxt,
|
||||||
|
|
||||||
static int32_t buildRollupAst(STranslateContext* pCxt, SCreateTableStmt* pStmt, SMCreateStbReq* pReq) {
|
static int32_t buildRollupAst(STranslateContext* pCxt, SCreateTableStmt* pStmt, SMCreateStbReq* pReq) {
|
||||||
SDbCfgInfo dbCfg = {0};
|
SDbCfgInfo dbCfg = {0};
|
||||||
int32_t code = getDBCfg(pCxt, pStmt->dbName, &dbCfg);
|
int32_t code = getDBCfg(pCxt, pStmt->dbName, &dbCfg);
|
||||||
int32_t num = taosArrayGetSize(dbCfg.pRetensions);
|
int32_t num = taosArrayGetSize(dbCfg.pRetensions);
|
||||||
if (TSDB_CODE_SUCCESS != code || num < 2) {
|
if (TSDB_CODE_SUCCESS != code || num < 2) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
for (int32_t i = 1; i < num; ++i) {
|
for (int32_t i = 1; i < num; ++i) {
|
||||||
SRetention *pRetension = taosArrayGet(dbCfg.pRetensions, i);
|
SRetention* pRetension = taosArrayGet(dbCfg.pRetensions, i);
|
||||||
STranslateContext cxt = {0};
|
STranslateContext cxt = {0};
|
||||||
initTranslateContext(pCxt->pParseCxt, &cxt);
|
initTranslateContext(pCxt->pParseCxt, &cxt);
|
||||||
code = getRollupAst(&cxt, pStmt, pRetension, dbCfg.precision,
|
code = getRollupAst(&cxt, pStmt, pRetension, dbCfg.precision, 1 == i ? &pReq->pAst1 : &pReq->pAst2,
|
||||||
1 == i ? &pReq->pAst1 : &pReq->pAst2, 1 == i ? &pReq->ast1Len : &pReq->ast2Len);
|
1 == i ? &pReq->ast1Len : &pReq->ast2Len);
|
||||||
destroyTranslateContext(&cxt);
|
destroyTranslateContext(&cxt);
|
||||||
if (TSDB_CODE_SUCCESS != code) {
|
if (TSDB_CODE_SUCCESS != code) {
|
||||||
break;
|
break;
|
||||||
|
@ -2245,7 +2252,7 @@ static int32_t buildCreateStbReq(STranslateContext* pCxt, SCreateTableStmt* pStm
|
||||||
|
|
||||||
static int32_t translateCreateSuperTable(STranslateContext* pCxt, SCreateTableStmt* pStmt) {
|
static int32_t translateCreateSuperTable(STranslateContext* pCxt, SCreateTableStmt* pStmt) {
|
||||||
SMCreateStbReq createReq = {0};
|
SMCreateStbReq createReq = {0};
|
||||||
int32_t code = checkCreateTable(pCxt, pStmt);
|
int32_t code = checkCreateTable(pCxt, pStmt);
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = buildCreateStbReq(pCxt, pStmt, &createReq);
|
code = buildCreateStbReq(pCxt, pStmt, &createReq);
|
||||||
}
|
}
|
||||||
|
@ -2289,7 +2296,8 @@ static int32_t translateDropTable(STranslateContext* pCxt, SDropTableStmt* pStmt
|
||||||
|
|
||||||
static int32_t translateDropSuperTable(STranslateContext* pCxt, SDropSuperTableStmt* pStmt) {
|
static int32_t translateDropSuperTable(STranslateContext* pCxt, SDropSuperTableStmt* pStmt) {
|
||||||
SName tableName;
|
SName tableName;
|
||||||
return doTranslateDropSuperTable(pCxt, toName(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tableName, &tableName), pStmt->ignoreNotExists);
|
return doTranslateDropSuperTable(pCxt, toName(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tableName, &tableName),
|
||||||
|
pStmt->ignoreNotExists);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t setAlterTableField(SAlterTableStmt* pStmt, SMAltertbReq* pAlterReq) {
|
static int32_t setAlterTableField(SAlterTableStmt* pStmt, SMAltertbReq* pAlterReq) {
|
||||||
|
@ -2421,7 +2429,7 @@ static int32_t nodeTypeToShowType(ENodeType nt) {
|
||||||
case QUERY_NODE_SHOW_QUERIES_STMT:
|
case QUERY_NODE_SHOW_QUERIES_STMT:
|
||||||
return TSDB_MGMT_TABLE_QUERIES;
|
return TSDB_MGMT_TABLE_QUERIES;
|
||||||
case QUERY_NODE_SHOW_VARIABLE_STMT:
|
case QUERY_NODE_SHOW_VARIABLE_STMT:
|
||||||
return 0; // todo
|
return 0; // todo
|
||||||
default:
|
default:
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -2463,8 +2471,8 @@ static int32_t buildSampleAstInfoByIndex(STranslateContext* pCxt, SCreateIndexSt
|
||||||
pInfo->pOffset = nodesCloneNode(pStmt->pOptions->pOffset);
|
pInfo->pOffset = nodesCloneNode(pStmt->pOptions->pOffset);
|
||||||
pInfo->pSliding = nodesCloneNode(pStmt->pOptions->pSliding);
|
pInfo->pSliding = nodesCloneNode(pStmt->pOptions->pSliding);
|
||||||
if (NULL == pInfo->pFuncs || NULL == pInfo->pInterval ||
|
if (NULL == pInfo->pFuncs || NULL == pInfo->pInterval ||
|
||||||
(NULL != pStmt->pOptions->pOffset && NULL == pInfo->pOffset) ||
|
(NULL != pStmt->pOptions->pOffset && NULL == pInfo->pOffset) ||
|
||||||
(NULL != pStmt->pOptions->pSliding && NULL == pInfo->pSliding)) {
|
(NULL != pStmt->pOptions->pSliding && NULL == pInfo->pSliding)) {
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -2472,7 +2480,7 @@ static int32_t buildSampleAstInfoByIndex(STranslateContext* pCxt, SCreateIndexSt
|
||||||
|
|
||||||
static int32_t getSmaIndexAst(STranslateContext* pCxt, SCreateIndexStmt* pStmt, char** pAst, int32_t* pLen) {
|
static int32_t getSmaIndexAst(STranslateContext* pCxt, SCreateIndexStmt* pStmt, char** pAst, int32_t* pLen) {
|
||||||
SSampleAstInfo info = {0};
|
SSampleAstInfo info = {0};
|
||||||
int32_t code = buildSampleAstInfoByIndex(pCxt, pStmt, &info);
|
int32_t code = buildSampleAstInfoByIndex(pCxt, pStmt, &info);
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = buildSampleAst(pCxt, &info, pAst, pLen);
|
code = buildSampleAst(pCxt, &info, pAst, pLen);
|
||||||
}
|
}
|
||||||
|
@ -2618,9 +2626,9 @@ static int32_t translateDropComponentNode(STranslateContext* pCxt, SDropComponen
|
||||||
|
|
||||||
static int32_t buildCreateTopicReq(STranslateContext* pCxt, SCreateTopicStmt* pStmt, SCMCreateTopicReq* pReq) {
|
static int32_t buildCreateTopicReq(STranslateContext* pCxt, SCreateTopicStmt* pStmt, SCMCreateTopicReq* pReq) {
|
||||||
SName name;
|
SName name;
|
||||||
// tNameSetDbName(&name, pCxt->pParseCxt->acctId, pStmt->topicName, strlen(pStmt->topicName));
|
tNameSetDbName(&name, pCxt->pParseCxt->acctId, pStmt->topicName, strlen(pStmt->topicName));
|
||||||
// tNameGetFullDbName(&name, pReq->name);
|
tNameGetFullDbName(&name, pReq->name);
|
||||||
tNameExtractFullName(toName(pCxt->pParseCxt->acctId, pCxt->pParseCxt->db, pStmt->topicName, &name), pReq->name);
|
/*tNameExtractFullName(toName(pCxt->pParseCxt->acctId, pCxt->pParseCxt->db, pStmt->topicName, &name), pReq->name);*/
|
||||||
pReq->igExists = pStmt->ignoreExists;
|
pReq->igExists = pStmt->ignoreExists;
|
||||||
pReq->withTbName = pStmt->pOptions->withTable;
|
pReq->withTbName = pStmt->pOptions->withTable;
|
||||||
pReq->withSchema = pStmt->pOptions->withSchema;
|
pReq->withSchema = pStmt->pOptions->withSchema;
|
||||||
|
@ -2633,16 +2641,19 @@ static int32_t buildCreateTopicReq(STranslateContext* pCxt, SCreateTopicStmt* pS
|
||||||
|
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
|
const char* dbName;
|
||||||
if (NULL != pStmt->pQuery) {
|
if (NULL != pStmt->pQuery) {
|
||||||
strcpy(pReq->subscribeDbName, ((SRealTableNode*)(((SSelectStmt*)pStmt->pQuery)->pFromTable))->table.dbName);
|
dbName = ((SRealTableNode*)(((SSelectStmt*)pStmt->pQuery)->pFromTable))->table.dbName;
|
||||||
pCxt->pParseCxt->topicQuery = true;
|
pCxt->pParseCxt->topicQuery = true;
|
||||||
code = translateQuery(pCxt, pStmt->pQuery);
|
code = translateQuery(pCxt, pStmt->pQuery);
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = nodesNodeToString(pStmt->pQuery, false, &pReq->ast, NULL);
|
code = nodesNodeToString(pStmt->pQuery, false, &pReq->ast, NULL);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
strcpy(pReq->subscribeDbName, pStmt->subscribeDbName);
|
dbName = pStmt->subscribeDbName;
|
||||||
}
|
}
|
||||||
|
tNameSetDbName(&name, pCxt->pParseCxt->acctId, dbName, strlen(dbName));
|
||||||
|
tNameGetFullDbName(&name, pReq->subscribeDbName);
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -2654,8 +2665,9 @@ static int32_t checkCreateTopic(STranslateContext* pCxt, SCreateTopicStmt* pStmt
|
||||||
|
|
||||||
if (QUERY_NODE_SELECT_STMT == nodeType(pStmt->pQuery)) {
|
if (QUERY_NODE_SELECT_STMT == nodeType(pStmt->pQuery)) {
|
||||||
SSelectStmt* pSelect = (SSelectStmt*)pStmt->pQuery;
|
SSelectStmt* pSelect = (SSelectStmt*)pStmt->pQuery;
|
||||||
if (!pSelect->isDistinct && QUERY_NODE_REAL_TABLE == nodeType(pSelect->pFromTable) && NULL == pSelect->pGroupByList &&
|
if (!pSelect->isDistinct && QUERY_NODE_REAL_TABLE == nodeType(pSelect->pFromTable) &&
|
||||||
NULL == pSelect->pLimit && NULL == pSelect->pSlimit && NULL == pSelect->pOrderByList && NULL == pSelect->pPartitionByList) {
|
NULL == pSelect->pGroupByList && NULL == pSelect->pLimit && NULL == pSelect->pSlimit &&
|
||||||
|
NULL == pSelect->pOrderByList && NULL == pSelect->pPartitionByList) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -2665,7 +2677,7 @@ static int32_t checkCreateTopic(STranslateContext* pCxt, SCreateTopicStmt* pStmt
|
||||||
|
|
||||||
static int32_t translateCreateTopic(STranslateContext* pCxt, SCreateTopicStmt* pStmt) {
|
static int32_t translateCreateTopic(STranslateContext* pCxt, SCreateTopicStmt* pStmt) {
|
||||||
SCMCreateTopicReq createReq = {0};
|
SCMCreateTopicReq createReq = {0};
|
||||||
int32_t code = checkCreateTopic(pCxt, pStmt);
|
int32_t code = checkCreateTopic(pCxt, pStmt);
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = buildCreateTopicReq(pCxt, pStmt, &createReq);
|
code = buildCreateTopicReq(pCxt, pStmt, &createReq);
|
||||||
}
|
}
|
||||||
|
@ -2763,7 +2775,7 @@ static int32_t translateDropStream(STranslateContext* pCxt, SDropStreamStmt* pSt
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t readFromFile(char* pName, int32_t *len, char **buf) {
|
static int32_t readFromFile(char* pName, int32_t* len, char** buf) {
|
||||||
int64_t filesize = 0;
|
int64_t filesize = 0;
|
||||||
if (taosStatFile(pName, &filesize, NULL) < 0) {
|
if (taosStatFile(pName, &filesize, NULL) < 0) {
|
||||||
return TAOS_SYSTEM_ERROR(errno);
|
return TAOS_SYSTEM_ERROR(errno);
|
||||||
|
@ -3408,8 +3420,8 @@ static void addCreateTbReqIntoVgroup(int32_t acctId, SHashObj* pVgroupHashmap, c
|
||||||
|
|
||||||
static int32_t addValToKVRow(STranslateContext* pCxt, SValueNode* pVal, const SSchema* pSchema,
|
static int32_t addValToKVRow(STranslateContext* pCxt, SValueNode* pVal, const SSchema* pSchema,
|
||||||
SKVRowBuilder* pBuilder) {
|
SKVRowBuilder* pBuilder) {
|
||||||
if(pSchema->type == TSDB_DATA_TYPE_JSON){
|
if (pSchema->type == TSDB_DATA_TYPE_JSON) {
|
||||||
if(pVal->literal && strlen(pVal->literal) > (TSDB_MAX_JSON_TAG_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE){
|
if (pVal->literal && strlen(pVal->literal) > (TSDB_MAX_JSON_TAG_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE) {
|
||||||
return buildSyntaxErrMsg(&pCxt->msgBuf, "json string too long than 4095", pVal->literal);
|
return buildSyntaxErrMsg(&pCxt->msgBuf, "json string too long than 4095", pVal->literal);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3420,10 +3432,11 @@ static int32_t addValToKVRow(STranslateContext* pCxt, SValueNode* pVal, const SS
|
||||||
return pCxt->errCode;
|
return pCxt->errCode;
|
||||||
}
|
}
|
||||||
|
|
||||||
if(pVal->node.resType.type == TSDB_DATA_TYPE_NULL){
|
if (pVal->node.resType.type == TSDB_DATA_TYPE_NULL) {
|
||||||
// todo
|
// todo
|
||||||
}else{
|
} else {
|
||||||
tdAddColToKVRow(pBuilder, pSchema->colId, &(pVal->datum.p), IS_VAR_DATA_TYPE(pSchema->type) ? varDataTLen(pVal->datum.p) : TYPE_BYTES[pSchema->type]);
|
tdAddColToKVRow(pBuilder, pSchema->colId, &(pVal->datum.p),
|
||||||
|
IS_VAR_DATA_TYPE(pSchema->type) ? varDataTLen(pVal->datum.p) : TYPE_BYTES[pSchema->type]);
|
||||||
}
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -3692,7 +3705,7 @@ static int32_t setQuery(STranslateContext* pCxt, SQuery* pQuery) {
|
||||||
|
|
||||||
int32_t translate(SParseContext* pParseCxt, SQuery* pQuery) {
|
int32_t translate(SParseContext* pParseCxt, SQuery* pQuery) {
|
||||||
STranslateContext cxt = {0};
|
STranslateContext cxt = {0};
|
||||||
int32_t code = initTranslateContext(pParseCxt, &cxt);
|
int32_t code = initTranslateContext(pParseCxt, &cxt);
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = fmFuncMgtInit();
|
code = fmFuncMgtInit();
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue