other: merge 3.0.
This commit is contained in:
commit
cce87a036f
|
@ -29,7 +29,6 @@ extern "C" {
|
|||
#define SLOW_LOG_TYPE_OTHERS 0x4
|
||||
#define SLOW_LOG_TYPE_ALL 0xFFFFFFFF
|
||||
|
||||
|
||||
// cluster
|
||||
extern char tsFirst[];
|
||||
extern char tsSecond[];
|
||||
|
@ -181,6 +180,7 @@ extern bool tsDisableStream;
|
|||
extern int64_t tsStreamBufferSize;
|
||||
extern int64_t tsCheckpointInterval;
|
||||
extern bool tsFilterScalarMode;
|
||||
extern int32_t tsMaxStreamBackendCache;
|
||||
|
||||
// #define NEEDTO_COMPRESSS_MSG(size) (tsCompressMsgSize != -1 && (size) > tsCompressMsgSize)
|
||||
|
||||
|
|
|
@ -2009,10 +2009,8 @@ typedef struct {
|
|||
int8_t withMeta;
|
||||
char* sql;
|
||||
char subDbName[TSDB_DB_FNAME_LEN];
|
||||
union {
|
||||
char* ast;
|
||||
char subStbName[TSDB_TABLE_FNAME_LEN];
|
||||
};
|
||||
char* ast;
|
||||
char subStbName[TSDB_TABLE_FNAME_LEN];
|
||||
} SCMCreateTopicReq;
|
||||
|
||||
int32_t tSerializeSCMCreateTopicReq(void* buf, int32_t bufLen, const SCMCreateTopicReq* pReq);
|
||||
|
@ -2822,6 +2820,7 @@ static FORCE_INLINE int32_t tEncodeSMqRebVgReq(void** buf, const SMqRebVgReq* pR
|
|||
tlen += taosEncodeString(buf, pReq->qmsg);
|
||||
} else if (pReq->subType == TOPIC_SUB_TYPE__TABLE) {
|
||||
tlen += taosEncodeFixedI64(buf, pReq->suid);
|
||||
tlen += taosEncodeString(buf, pReq->qmsg);
|
||||
}
|
||||
return tlen;
|
||||
}
|
||||
|
@ -2838,6 +2837,7 @@ static FORCE_INLINE void* tDecodeSMqRebVgReq(const void* buf, SMqRebVgReq* pReq)
|
|||
buf = taosDecodeString(buf, &pReq->qmsg);
|
||||
} else if (pReq->subType == TOPIC_SUB_TYPE__TABLE) {
|
||||
buf = taosDecodeFixedI64(buf, &pReq->suid);
|
||||
buf = taosDecodeString(buf, &pReq->qmsg);
|
||||
}
|
||||
return (void*)buf;
|
||||
}
|
||||
|
|
|
@ -354,6 +354,7 @@
|
|||
#define TK_WAL 336
|
||||
|
||||
|
||||
|
||||
#define TK_NK_SPACE 600
|
||||
#define TK_NK_COMMENT 601
|
||||
#define TK_NK_ILLEGAL 602
|
||||
|
|
|
@ -82,6 +82,8 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers, int32_t v
|
|||
qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* pReaderHandle, int32_t vgId, int32_t* numOfCols,
|
||||
uint64_t id);
|
||||
|
||||
int32_t qGetTableList(int64_t suid, void* pVnode, void* node, SArray **tableList, void* pTaskInfo);
|
||||
|
||||
/**
|
||||
* set the task Id, usually used by message queue process
|
||||
* @param tinfo
|
||||
|
|
|
@ -364,6 +364,7 @@ typedef struct SCreateTopicStmt {
|
|||
bool ignoreExists;
|
||||
bool withMeta;
|
||||
SNode* pQuery;
|
||||
SNode* pWhere;
|
||||
} SCreateTopicStmt;
|
||||
|
||||
typedef struct SDropTopicStmt {
|
||||
|
|
|
@ -51,6 +51,12 @@ typedef enum {
|
|||
TARGET_TYPE_OTHER,
|
||||
} ETargetType;
|
||||
|
||||
typedef enum {
|
||||
TCOL_TYPE_COLUMN = 1,
|
||||
TCOL_TYPE_TAG,
|
||||
TCOL_TYPE_NONE,
|
||||
} ETableColumnType;
|
||||
|
||||
#define QUERY_POLICY_VNODE 1
|
||||
#define QUERY_POLICY_HYBRID 2
|
||||
#define QUERY_POLICY_QNODE 3
|
||||
|
@ -253,6 +259,7 @@ void destroyQueryExecRes(SExecResult* pRes);
|
|||
int32_t dataConverToStr(char* str, int type, void* buf, int32_t bufSize, int32_t* len);
|
||||
char* parseTagDatatoJson(void* p);
|
||||
int32_t cloneTableMeta(STableMeta* pSrc, STableMeta** pDst);
|
||||
void getColumnTypeFromMeta(STableMeta* pMeta, char* pName, ETableColumnType* pType);
|
||||
int32_t cloneDbVgInfo(SDBVgInfo* pSrc, SDBVgInfo** pDst);
|
||||
int32_t cloneSVreateTbReq(SVCreateTbReq* pSrc, SVCreateTbReq** pDst);
|
||||
void freeVgInfo(SDBVgInfo* vgInfo);
|
||||
|
|
|
@ -60,6 +60,7 @@ int32_t tsNumOfQnodeQueryThreads = 4;
|
|||
int32_t tsNumOfQnodeFetchThreads = 1;
|
||||
int32_t tsNumOfSnodeStreamThreads = 4;
|
||||
int32_t tsNumOfSnodeWriteThreads = 1;
|
||||
int32_t tsMaxStreamBackendCache = 128; // M
|
||||
|
||||
// sync raft
|
||||
int32_t tsElectInterval = 25 * 1000;
|
||||
|
@ -105,7 +106,7 @@ int32_t tsQueryPolicy = 1;
|
|||
int32_t tsQueryRspPolicy = 0;
|
||||
int64_t tsQueryMaxConcurrentTables = 200; // unit is TSDB_TABLE_NUM_UNIT
|
||||
bool tsEnableQueryHb = false;
|
||||
bool tsEnableScience = false; // on taos-cli show float and doulbe with scientific notation if true
|
||||
bool tsEnableScience = false; // on taos-cli show float and doulbe with scientific notation if true
|
||||
int32_t tsQuerySmaOptimize = 0;
|
||||
int32_t tsQueryRsmaTolerance = 1000; // the tolerance time (ms) to judge from which level to query rsma data.
|
||||
bool tsQueryPlannerTrace = false;
|
||||
|
@ -117,8 +118,8 @@ int32_t tsRedirectFactor = 2;
|
|||
int32_t tsRedirectMaxPeriod = 1000;
|
||||
int32_t tsMaxRetryWaitTime = 10000;
|
||||
bool tsUseAdapter = false;
|
||||
int32_t tsMetaCacheMaxSize = -1; // MB
|
||||
int32_t tsSlowLogThreshold = 3; // seconds
|
||||
int32_t tsMetaCacheMaxSize = -1; // MB
|
||||
int32_t tsSlowLogThreshold = 3; // seconds
|
||||
int32_t tsSlowLogScope = SLOW_LOG_TYPE_ALL;
|
||||
|
||||
/*
|
||||
|
@ -349,7 +350,8 @@ static int32_t taosAddClientCfg(SConfig *pCfg) {
|
|||
if (cfgAddInt32(pCfg, "maxRetryWaitTime", tsMaxRetryWaitTime, 0, 86400000, 0) != 0) return -1;
|
||||
if (cfgAddBool(pCfg, "useAdapter", tsUseAdapter, true) != 0) return -1;
|
||||
if (cfgAddBool(pCfg, "crashReporting", tsEnableCrashReport, true) != 0) return -1;
|
||||
if (cfgAddInt64(pCfg, "queryMaxConcurrentTables", tsQueryMaxConcurrentTables, INT64_MIN, INT64_MAX, 1) != 0) return -1;
|
||||
if (cfgAddInt64(pCfg, "queryMaxConcurrentTables", tsQueryMaxConcurrentTables, INT64_MIN, INT64_MAX, 1) != 0)
|
||||
return -1;
|
||||
if (cfgAddInt32(pCfg, "metaCacheMaxSize", tsMetaCacheMaxSize, -1, INT32_MAX, 1) != 0) return -1;
|
||||
if (cfgAddInt32(pCfg, "slowLogThreshold", tsSlowLogThreshold, 0, INT32_MAX, true) != 0) return -1;
|
||||
if (cfgAddString(pCfg, "slowLogScope", "", true) != 0) return -1;
|
||||
|
@ -524,6 +526,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
|
|||
if (cfgAddInt32(pCfg, "cacheLazyLoadThreshold", tsCacheLazyLoadThreshold, 0, 100000, 0) != 0) return -1;
|
||||
|
||||
if (cfgAddBool(pCfg, "filterScalarMode", tsFilterScalarMode, 0) != 0) return -1;
|
||||
if (cfgAddInt32(pCfg, "maxStreamBackendCache", tsMaxStreamBackendCache, 16, 1024, 0) != 0) return -1;
|
||||
|
||||
GRANT_CFG_ADD;
|
||||
return 0;
|
||||
|
@ -781,7 +784,7 @@ static int32_t taosSetClientCfg(SConfig *pCfg) {
|
|||
tsNumOfTaskQueueThreads = cfgGetItem(pCfg, "numOfTaskQueueThreads")->i32;
|
||||
tsQueryPolicy = cfgGetItem(pCfg, "queryPolicy")->i32;
|
||||
tsEnableQueryHb = cfgGetItem(pCfg, "enableQueryHb")->bval;
|
||||
tsEnableScience = cfgGetItem(pCfg, "enableScience")->bval;
|
||||
tsEnableScience = cfgGetItem(pCfg, "enableScience")->bval;
|
||||
tsQuerySmaOptimize = cfgGetItem(pCfg, "querySmaOptimize")->i32;
|
||||
tsQueryPlannerTrace = cfgGetItem(pCfg, "queryPlannerTrace")->bval;
|
||||
tsQueryNodeChunkSize = cfgGetItem(pCfg, "queryNodeChunkSize")->i32;
|
||||
|
@ -902,7 +905,8 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
|
|||
tsCheckpointInterval = cfgGetItem(pCfg, "checkpointInterval")->i64;
|
||||
|
||||
tsFilterScalarMode = cfgGetItem(pCfg, "filterScalarMode")->bval;
|
||||
|
||||
tsMaxStreamBackendCache = cfgGetItem(pCfg, "maxStreamBackendCache")->i32;
|
||||
|
||||
GRANT_CFG_GET;
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -4002,11 +4002,16 @@ int32_t tSerializeSCMCreateTopicReq(void *buf, int32_t bufLen, const SCMCreateTo
|
|||
if (tEncodeI8(&encoder, pReq->withMeta) < 0) return -1;
|
||||
if (tEncodeCStr(&encoder, pReq->subDbName) < 0) return -1;
|
||||
if (TOPIC_SUB_TYPE__DB == pReq->subType) {
|
||||
} else if (TOPIC_SUB_TYPE__TABLE == pReq->subType) {
|
||||
if (tEncodeCStr(&encoder, pReq->subStbName) < 0) return -1;
|
||||
} else {
|
||||
if (tEncodeI32(&encoder, strlen(pReq->ast)) < 0) return -1;
|
||||
if (tEncodeCStr(&encoder, pReq->ast) < 0) return -1;
|
||||
if (TOPIC_SUB_TYPE__TABLE == pReq->subType) {
|
||||
if (tEncodeCStr(&encoder, pReq->subStbName) < 0) return -1;
|
||||
}
|
||||
if (pReq->ast && strlen(pReq->ast) > 0) {
|
||||
if (tEncodeI32(&encoder, strlen(pReq->ast)) < 0) return -1;
|
||||
if (tEncodeCStr(&encoder, pReq->ast) < 0) return -1;
|
||||
} else {
|
||||
if (tEncodeI32(&encoder, 0) < 0) return -1;
|
||||
}
|
||||
}
|
||||
if (tEncodeI32(&encoder, strlen(pReq->sql)) < 0) return -1;
|
||||
if (tEncodeCStr(&encoder, pReq->sql) < 0) return -1;
|
||||
|
@ -4032,9 +4037,10 @@ int32_t tDeserializeSCMCreateTopicReq(void *buf, int32_t bufLen, SCMCreateTopicR
|
|||
if (tDecodeI8(&decoder, &pReq->withMeta) < 0) return -1;
|
||||
if (tDecodeCStrTo(&decoder, pReq->subDbName) < 0) return -1;
|
||||
if (TOPIC_SUB_TYPE__DB == pReq->subType) {
|
||||
} else if (TOPIC_SUB_TYPE__TABLE == pReq->subType) {
|
||||
if (tDecodeCStrTo(&decoder, pReq->subStbName) < 0) return -1;
|
||||
} else {
|
||||
if (TOPIC_SUB_TYPE__TABLE == pReq->subType) {
|
||||
if (tDecodeCStrTo(&decoder, pReq->subStbName) < 0) return -1;
|
||||
}
|
||||
if (tDecodeI32(&decoder, &astLen) < 0) return -1;
|
||||
if (astLen > 0) {
|
||||
pReq->ast = taosMemoryCalloc(1, astLen + 1);
|
||||
|
@ -4057,7 +4063,7 @@ int32_t tDeserializeSCMCreateTopicReq(void *buf, int32_t bufLen, SCMCreateTopicR
|
|||
|
||||
void tFreeSCMCreateTopicReq(SCMCreateTopicReq *pReq) {
|
||||
taosMemoryFreeClear(pReq->sql);
|
||||
if (TOPIC_SUB_TYPE__COLUMN == pReq->subType) {
|
||||
if (TOPIC_SUB_TYPE__DB != pReq->subType) {
|
||||
taosMemoryFreeClear(pReq->ast);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -513,7 +513,23 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib
|
|||
terrno = TSDB_CODE_QRY_INVALID_INPUT;
|
||||
return -1;
|
||||
}
|
||||
}else if(pTopic->subType == TOPIC_SUB_TYPE__TABLE && pTopic->ast != NULL){
|
||||
SNode *pAst = NULL;
|
||||
if (nodesStringToNode(pTopic->ast, &pAst) != 0) {
|
||||
mError("topic:%s, failed to create since %s", pTopic->name, terrstr());
|
||||
return -1;
|
||||
}
|
||||
|
||||
SPlanContext cxt = {.pAstRoot = pAst, .topicQuery = true};
|
||||
if (qCreateQueryPlan(&cxt, &pPlan, NULL) != 0) {
|
||||
mError("failed to create topic:%s since %s", pTopic->name, terrstr());
|
||||
nodesDestroyNode(pAst);
|
||||
return -1;
|
||||
}
|
||||
nodesDestroyNode(pAst);
|
||||
}
|
||||
|
||||
if(pPlan){
|
||||
int32_t levelNum = LIST_LENGTH(pPlan->pSubplans);
|
||||
if (levelNum != 1) {
|
||||
qDestroyQueryPlan(pPlan);
|
||||
|
@ -554,7 +570,7 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib
|
|||
|
||||
mDebug("init subscription %s for topic:%s assign vgId:%d", pSub->key, pTopic->name, pVgEp->vgId);
|
||||
|
||||
if (pTopic->subType == TOPIC_SUB_TYPE__COLUMN) {
|
||||
if (pSubplan) {
|
||||
int32_t msgLen;
|
||||
|
||||
pSubplan->execNode.epSet = pVgEp->epSet;
|
||||
|
|
|
@ -1230,7 +1230,7 @@ static int32_t mndCheckAlterColForTopic(SMnode *pMnode, const char *stbFullName,
|
|||
|
||||
mInfo("topic:%s, check tag and column modifiable, stb:%s suid:%" PRId64 " colId:%d, subType:%d sql:%s",
|
||||
pTopic->name, stbFullName, suid, colId, pTopic->subType, pTopic->sql);
|
||||
if (pTopic->subType != TOPIC_SUB_TYPE__COLUMN) {
|
||||
if (pTopic->ast == NULL) {
|
||||
sdbRelease(pSdb, pTopic);
|
||||
continue;
|
||||
}
|
||||
|
@ -2272,7 +2272,7 @@ static int32_t mndCheckDropStbForTopic(SMnode *pMnode, const char *stbFullName,
|
|||
}
|
||||
}
|
||||
|
||||
if (pTopic->subType != TOPIC_SUB_TYPE__COLUMN) {
|
||||
if (pTopic->ast == NULL) {
|
||||
sdbRelease(pSdb, pTopic);
|
||||
continue;
|
||||
}
|
||||
|
|
|
@ -28,7 +28,7 @@
|
|||
#include "parser.h"
|
||||
#include "tname.h"
|
||||
|
||||
#define MND_TOPIC_VER_NUMBER 2
|
||||
#define MND_TOPIC_VER_NUMBER 3
|
||||
#define MND_TOPIC_RESERVE_SIZE 64
|
||||
|
||||
SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic);
|
||||
|
@ -170,7 +170,7 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) {
|
|||
int8_t sver = 0;
|
||||
if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto TOPIC_DECODE_OVER;
|
||||
|
||||
if (sver != 1 && sver != 2) {
|
||||
if (sver < 1 || sver > MND_TOPIC_VER_NUMBER) {
|
||||
terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
|
||||
goto TOPIC_DECODE_OVER;
|
||||
}
|
||||
|
@ -197,7 +197,9 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) {
|
|||
SDB_GET_INT8(pRaw, dataPos, &pTopic->withMeta, TOPIC_DECODE_OVER);
|
||||
|
||||
SDB_GET_INT64(pRaw, dataPos, &pTopic->stbUid, TOPIC_DECODE_OVER);
|
||||
SDB_GET_BINARY(pRaw, dataPos, pTopic->stbName, TSDB_TABLE_FNAME_LEN, TOPIC_DECODE_OVER);
|
||||
if (sver >= 3) {
|
||||
SDB_GET_BINARY(pRaw, dataPos, pTopic->stbName, TSDB_TABLE_FNAME_LEN, TOPIC_DECODE_OVER);
|
||||
}
|
||||
SDB_GET_INT32(pRaw, dataPos, &pTopic->sqlLen, TOPIC_DECODE_OVER);
|
||||
pTopic->sql = taosMemoryCalloc(pTopic->sqlLen, sizeof(char));
|
||||
if (pTopic->sql == NULL) {
|
||||
|
@ -422,6 +424,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
|
|||
mError("failed to create topic:%s since %s", pCreate->name, terrstr());
|
||||
taosMemoryFree(topicObj.ast);
|
||||
taosMemoryFree(topicObj.sql);
|
||||
nodesDestroyNode(pAst);
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
@ -429,6 +432,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
|
|||
if (topicObj.ntbColIds == NULL) {
|
||||
taosMemoryFree(topicObj.ast);
|
||||
taosMemoryFree(topicObj.sql);
|
||||
nodesDestroyNode(pAst);
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
|
@ -444,6 +448,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
|
|||
mError("topic:%s, failed to create since %s", pCreate->name, terrstr());
|
||||
taosMemoryFree(topicObj.ast);
|
||||
taosMemoryFree(topicObj.sql);
|
||||
nodesDestroyNode(pAst);
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
@ -465,6 +470,11 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
|
|||
strcpy(topicObj.stbName, pCreate->subStbName);
|
||||
topicObj.stbUid = pStb->uid;
|
||||
mndReleaseStb(pMnode, pStb);
|
||||
if(pCreate->ast != NULL){
|
||||
qDebugL("topic:%s ast %s", topicObj.name, pCreate->ast);
|
||||
topicObj.ast = taosStrdup(pCreate->ast);
|
||||
topicObj.astLen = strlen(pCreate->ast) + 1;
|
||||
}
|
||||
}
|
||||
/*} else if (pCreate->subType == TOPIC_SUB_TYPE__DB) {*/
|
||||
/*topicObj.ast = NULL;*/
|
||||
|
@ -914,13 +924,12 @@ static int32_t mndRetrieveTopic(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBl
|
|||
}else if(pTopic->subType == TOPIC_SUB_TYPE__TABLE){
|
||||
SStbObj *pStb = mndAcquireStb(pMnode, pTopic->stbName);
|
||||
if (pStb == NULL) {
|
||||
terrno = TSDB_CODE_MND_STB_NOT_EXIST;
|
||||
taosMemoryFree(schemaJson);
|
||||
return -1;
|
||||
STR_TO_VARSTR(schemaJson, "NULL");
|
||||
mError("mndRetrieveTopic mndAcquireStb null stbName:%s", pTopic->stbName);
|
||||
}else{
|
||||
schemaToJson(pStb->pColumns, pStb->numOfColumns, schemaJson);
|
||||
mndReleaseStb(pMnode, pStb);
|
||||
}
|
||||
schemaToJson(pStb->pColumns, pStb->numOfColumns, schemaJson);
|
||||
|
||||
mndReleaseStb(pMnode, pStb);
|
||||
}else{
|
||||
STR_TO_VARSTR(schemaJson, "NULL");
|
||||
}
|
||||
|
|
|
@ -72,6 +72,8 @@ typedef struct {
|
|||
|
||||
typedef struct {
|
||||
int64_t suid;
|
||||
char* qmsg; // SubPlanToString
|
||||
SNode* node;
|
||||
} STqExecTb;
|
||||
|
||||
typedef struct {
|
||||
|
|
|
@ -75,6 +75,8 @@ static void destroyTqHandle(void* data) {
|
|||
} else if (pData->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
|
||||
walCloseReader(pData->pWalReader);
|
||||
tqReaderClose(pData->execHandle.pTqReader);
|
||||
taosMemoryFreeClear(pData->execHandle.execTb.qmsg);
|
||||
nodesDestroyNode(pData->execHandle.execTb.node);
|
||||
}
|
||||
if(pData->msg != NULL) {
|
||||
rpcFreeCont(pData->msg->pCont);
|
||||
|
@ -655,7 +657,6 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
|
|||
STqHandle tqHandle = {0};
|
||||
pHandle = &tqHandle;
|
||||
|
||||
uint64_t oldConsumerId = pHandle->consumerId;
|
||||
memcpy(pHandle->subKey, req.subKey, TSDB_SUBSCRIBE_KEY_LEN);
|
||||
pHandle->consumerId = req.newConsumerId;
|
||||
pHandle->epoch = -1;
|
||||
|
@ -700,27 +701,37 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
|
|||
} else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
|
||||
pHandle->pWalReader = walOpenReader(pVnode->pWal, NULL);
|
||||
pHandle->execHandle.execTb.suid = req.suid;
|
||||
pHandle->execHandle.execTb.qmsg = req.qmsg;
|
||||
req.qmsg = NULL;
|
||||
|
||||
SArray* tbUidList = taosArrayInit(0, sizeof(int64_t));
|
||||
vnodeGetCtbIdList(pVnode, req.suid, tbUidList);
|
||||
tqDebug("vgId:%d, tq try to get all ctb, suid:%" PRId64, pVnode->config.vgId, req.suid);
|
||||
for (int32_t i = 0; i < taosArrayGetSize(tbUidList); i++) {
|
||||
int64_t tbUid = *(int64_t*)taosArrayGet(tbUidList, i);
|
||||
tqDebug("vgId:%d, idx %d, uid:%" PRId64, vgId, i, tbUid);
|
||||
if(strcmp(pHandle->execHandle.execTb.qmsg, "") != 0) {
|
||||
if (nodesStringToNode(pHandle->execHandle.execTb.qmsg, &pHandle->execHandle.execTb.node) != 0) {
|
||||
tqError("nodesStringToNode error in sub stable, since %s, vgId:%d, subkey:%s consumer:0x%" PRIx64, terrstr(),
|
||||
pVnode->config.vgId, req.subKey, pHandle->consumerId);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
pHandle->execHandle.pTqReader = tqReaderOpen(pVnode);
|
||||
tqReaderSetTbUidList(pHandle->execHandle.pTqReader, tbUidList, NULL);
|
||||
taosArrayDestroy(tbUidList);
|
||||
|
||||
buildSnapContext(handle.vnode, handle.version, req.suid, pHandle->execHandle.subType, pHandle->fetchMeta,
|
||||
(SSnapContext**)(&handle.sContext));
|
||||
pHandle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &handle, vgId, NULL, req.newConsumerId);
|
||||
|
||||
SArray* tbUidList = NULL;
|
||||
ret = qGetTableList(req.suid, pVnode, pHandle->execHandle.execTb.node, &tbUidList, pHandle->execHandle.task);
|
||||
if(ret != TDB_CODE_SUCCESS) {
|
||||
tqError("qGetTableList error:%d vgId:%d, subkey:%s consumer:0x%" PRIx64, ret, pVnode->config.vgId, req.subKey, pHandle->consumerId);
|
||||
taosArrayDestroy(tbUidList);
|
||||
goto end;
|
||||
}
|
||||
tqDebug("tq try to get ctb for stb subscribe, vgId:%d, subkey:%s consumer:0x%" PRIx64 " suid:%" PRId64, pVnode->config.vgId, req.subKey, pHandle->consumerId, req.suid);
|
||||
pHandle->execHandle.pTqReader = tqReaderOpen(pVnode);
|
||||
tqReaderSetTbUidList(pHandle->execHandle.pTqReader, tbUidList, NULL);
|
||||
taosArrayDestroy(tbUidList);
|
||||
}
|
||||
|
||||
taosHashPut(pTq->pHandle, req.subKey, strlen(req.subKey), pHandle, sizeof(STqHandle));
|
||||
tqDebug("try to persist handle %s consumer:0x%" PRIx64 " , old consumer:0x%" PRIx64, req.subKey,
|
||||
pHandle->consumerId, oldConsumerId);
|
||||
tqDebug("try to persist handle %s consumer:0x%" PRIx64, req.subKey,
|
||||
pHandle->consumerId);
|
||||
ret = tqMetaSaveHandle(pTq, req.subKey, pHandle);
|
||||
goto end;
|
||||
} else {
|
||||
|
|
|
@ -37,6 +37,9 @@ int32_t tEncodeSTqHandle(SEncoder* pEncoder, const STqHandle* pHandle) {
|
|||
}
|
||||
} else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
|
||||
if (tEncodeI64(pEncoder, pHandle->execHandle.execTb.suid) < 0) return -1;
|
||||
if (pHandle->execHandle.execTb.qmsg != NULL){
|
||||
if (tEncodeCStr(pEncoder, pHandle->execHandle.execTb.qmsg) < 0) return -1;
|
||||
}
|
||||
}
|
||||
tEndEncode(pEncoder);
|
||||
return pEncoder->pos;
|
||||
|
@ -64,6 +67,9 @@ int32_t tDecodeSTqHandle(SDecoder* pDecoder, STqHandle* pHandle) {
|
|||
}
|
||||
} else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
|
||||
if (tDecodeI64(pDecoder, &pHandle->execHandle.execTb.suid) < 0) return -1;
|
||||
if (!tDecodeIsEnd(pDecoder)){
|
||||
if (tDecodeCStrAlloc(pDecoder, &pHandle->execHandle.execTb.qmsg) < 0) return -1;
|
||||
}
|
||||
}
|
||||
tEndDecode(pDecoder);
|
||||
return 0;
|
||||
|
@ -337,21 +343,27 @@ int32_t tqMetaRestoreHandle(STQ* pTq) {
|
|||
} else if (handle.execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
|
||||
handle.pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);
|
||||
|
||||
SArray* tbUidList = taosArrayInit(0, sizeof(int64_t));
|
||||
vnodeGetCtbIdList(pTq->pVnode, handle.execHandle.execTb.suid, tbUidList);
|
||||
tqDebug("vgId:%d, tq try to get all ctb, suid:%" PRId64, pTq->pVnode->config.vgId, handle.execHandle.execTb.suid);
|
||||
for (int32_t i = 0; i < taosArrayGetSize(tbUidList); i++) {
|
||||
int64_t tbUid = *(int64_t*)taosArrayGet(tbUidList, i);
|
||||
tqDebug("vgId:%d, idx %d, uid:%" PRId64, vgId, i, tbUid);
|
||||
if(handle.execHandle.execTb.qmsg != NULL && strcmp(handle.execHandle.execTb.qmsg, "") != 0) {
|
||||
if (nodesStringToNode(handle.execHandle.execTb.qmsg, &handle.execHandle.execTb.node) != 0) {
|
||||
tqError("nodesStringToNode error in sub stable, since %s", terrstr());
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
handle.execHandle.pTqReader = tqReaderOpen(pTq->pVnode);
|
||||
tqReaderSetTbUidList(handle.execHandle.pTqReader, tbUidList, NULL);
|
||||
taosArrayDestroy(tbUidList);
|
||||
|
||||
buildSnapContext(reader.vnode, reader.version, handle.execHandle.execTb.suid, handle.execHandle.subType,
|
||||
handle.fetchMeta, (SSnapContext**)(&reader.sContext));
|
||||
handle.execHandle.task = qCreateQueueExecTaskInfo(NULL, &reader, vgId, NULL, 0);
|
||||
|
||||
SArray* tbUidList = NULL;
|
||||
int ret = qGetTableList(handle.execHandle.execTb.suid, pTq->pVnode, handle.execHandle.execTb.node, &tbUidList, handle.execHandle.task);
|
||||
if(ret != TDB_CODE_SUCCESS) {
|
||||
tqError("qGetTableList error:%d handle %s consumer:0x%" PRIx64, ret, handle.subKey, handle.consumerId);
|
||||
taosArrayDestroy(tbUidList);
|
||||
goto end;
|
||||
}
|
||||
tqDebug("vgId:%d, tq try to get ctb for stb subscribe, suid:%" PRId64, pTq->pVnode->config.vgId, handle.execHandle.execTb.suid);
|
||||
handle.execHandle.pTqReader = tqReaderOpen(pTq->pVnode);
|
||||
tqReaderSetTbUidList(handle.execHandle.pTqReader, tbUidList, NULL);
|
||||
taosArrayDestroy(tbUidList);
|
||||
}
|
||||
tqDebug("tq restore %s consumer %" PRId64 " vgId:%d", handle.subKey, handle.consumerId, vgId);
|
||||
taosWLockLatch(&pTq->lock);
|
||||
|
|
|
@ -1086,34 +1086,15 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
|
|||
}
|
||||
} else if (pTqHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
|
||||
if (isAdd) {
|
||||
SArray* qa = taosArrayInit(4, sizeof(tb_uid_t));
|
||||
SMetaReader mr = {0};
|
||||
metaReaderInit(&mr, pTq->pVnode->pMeta, 0);
|
||||
for (int32_t i = 0; i < taosArrayGetSize(tbUidList); ++i) {
|
||||
uint64_t* id = (uint64_t*)taosArrayGet(tbUidList, i);
|
||||
|
||||
int32_t code = metaReaderGetTableEntryByUidCache(&mr, *id);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tqError("failed to get table meta, uid:%" PRIu64 " code:%s", *id, tstrerror(terrno));
|
||||
continue;
|
||||
}
|
||||
|
||||
tDecoderClear(&mr.coder);
|
||||
if (mr.me.type != TSDB_CHILD_TABLE || mr.me.ctbEntry.suid != pTqHandle->execHandle.execTb.suid) {
|
||||
tqDebug("table uid %" PRId64 " does not add to tq handle", *id);
|
||||
continue;
|
||||
}
|
||||
|
||||
tqDebug("table uid %" PRId64 " add to tq handle", *id);
|
||||
taosArrayPush(qa, id);
|
||||
SArray* list = NULL;
|
||||
int ret = qGetTableList(pTqHandle->execHandle.execTb.suid, pTq->pVnode, pTqHandle->execHandle.execTb.node, &list, pTqHandle->execHandle.task);
|
||||
if(ret != TDB_CODE_SUCCESS) {
|
||||
tqError("qGetTableList in tqUpdateTbUidList error:%d handle %s consumer:0x%" PRIx64, ret, pTqHandle->subKey, pTqHandle->consumerId);
|
||||
taosArrayDestroy(list);
|
||||
return ret;
|
||||
}
|
||||
|
||||
metaReaderClear(&mr);
|
||||
if (taosArrayGetSize(qa) > 0) {
|
||||
tqReaderAddTbUidList(pTqHandle->execHandle.pTqReader, qa);
|
||||
}
|
||||
|
||||
taosArrayDestroy(qa);
|
||||
tqReaderSetTbUidList(pTqHandle->execHandle.pTqReader, list, NULL);
|
||||
taosArrayDestroy(list);
|
||||
} else {
|
||||
tqReaderRemoveTbUidList(pTqHandle->execHandle.pTqReader, tbUidList);
|
||||
}
|
||||
|
|
|
@ -960,6 +960,7 @@ static int32_t doFilterByTagCond(STableListInfo* pListInfo, SArray* pUidList, SN
|
|||
SArray* pBlockList = NULL;
|
||||
SSDataBlock* pResBlock = NULL;
|
||||
SScalarParam output = {0};
|
||||
SArray* pUidTagList = NULL;
|
||||
|
||||
tagFilterAssist ctx = {0};
|
||||
ctx.colHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_SMALLINT), false, HASH_NO_LOCK);
|
||||
|
@ -979,7 +980,7 @@ static int32_t doFilterByTagCond(STableListInfo* pListInfo, SArray* pUidList, SN
|
|||
SDataType type = {.type = TSDB_DATA_TYPE_BOOL, .bytes = sizeof(bool)};
|
||||
|
||||
// int64_t stt = taosGetTimestampUs();
|
||||
SArray* pUidTagList = taosArrayInit(10, sizeof(STUidTagInfo));
|
||||
pUidTagList = taosArrayInit(10, sizeof(STUidTagInfo));
|
||||
copyExistedUids(pUidTagList, pUidList);
|
||||
|
||||
FilterCondType condType = checkTagCond(pTagCond);
|
||||
|
@ -1151,6 +1152,21 @@ _end:
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t qGetTableList(int64_t suid, void* pVnode, void* node, SArray **tableList, void* pTaskInfo){
|
||||
SSubplan *pSubplan = (SSubplan *)node;
|
||||
SScanPhysiNode pNode = {0};
|
||||
pNode.suid = suid;
|
||||
pNode.uid = suid;
|
||||
pNode.tableType = TSDB_SUPER_TABLE;
|
||||
STableListInfo* pTableListInfo = tableListCreate();
|
||||
uint8_t digest[17] = {0};
|
||||
int code = getTableList(pVnode, &pNode, pSubplan ? pSubplan->pTagCond : NULL, pSubplan ? pSubplan->pTagIndexCond : NULL, pTableListInfo, digest, "qGetTableList", &((SExecTaskInfo*)pTaskInfo)->storageAPI);
|
||||
*tableList = pTableListInfo->pTableList;
|
||||
pTableListInfo->pTableList = NULL;
|
||||
tableListDestroy(pTableListInfo);
|
||||
return code;
|
||||
}
|
||||
|
||||
size_t getTableTagsBufLen(const SNodeList* pGroups) {
|
||||
size_t keyLen = 0;
|
||||
|
||||
|
|
|
@ -1002,9 +1002,10 @@ static void doStreamFillImpl(SOperatorInfo* pOperator) {
|
|||
SSDataBlock* pBlock = pInfo->pSrcBlock;
|
||||
uint64_t groupId = pBlock->info.id.groupId;
|
||||
SSDataBlock* pRes = pInfo->pRes;
|
||||
SColumnInfoData* pTsCol = taosArrayGet(pInfo->pSrcBlock->pDataBlock, pInfo->primaryTsCol);
|
||||
TSKEY* tsCol = (TSKEY*)pTsCol->pData;
|
||||
pRes->info.id.groupId = groupId;
|
||||
SColumnInfoData* pTsCol = taosArrayGet(pInfo->pSrcBlock->pDataBlock, pInfo->primaryTsCol);
|
||||
TSKEY* tsCol = (TSKEY*)pTsCol->pData;
|
||||
pInfo->srcRowIndex++;
|
||||
|
||||
if (pInfo->srcRowIndex == 0) {
|
||||
keepBlockRowInDiscBuf(pOperator, pFillInfo, pBlock, tsCol, pInfo->srcRowIndex, groupId, pFillSup->rowSize);
|
||||
|
@ -1242,7 +1243,7 @@ static SSDataBlock* doStreamFill(SOperatorInfo* pOperator) {
|
|||
SSDataBlock* fillResult = NULL;
|
||||
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
||||
while (1) {
|
||||
if (pInfo->srcRowIndex >= pInfo->pSrcBlock->info.rows) {
|
||||
if (pInfo->srcRowIndex >= pInfo->pSrcBlock->info.rows || pInfo->pSrcBlock->info.rows == 0) {
|
||||
// If there are delete datablocks, we receive them first.
|
||||
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
|
||||
if (pBlock == NULL) {
|
||||
|
@ -1281,7 +1282,7 @@ static SSDataBlock* doStreamFill(SOperatorInfo* pOperator) {
|
|||
case STREAM_PULL_DATA: {
|
||||
doApplyStreamScalarCalculation(pOperator, pBlock, pInfo->pSrcBlock);
|
||||
memcpy(pInfo->pSrcBlock->info.parTbName, pBlock->info.parTbName, TSDB_TABLE_NAME_LEN);
|
||||
pInfo->srcRowIndex = 0;
|
||||
pInfo->srcRowIndex = -1;
|
||||
} break;
|
||||
case STREAM_CREATE_CHILD_TABLE: {
|
||||
return pBlock;
|
||||
|
@ -1497,7 +1498,7 @@ SOperatorInfo* createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFi
|
|||
goto _error;
|
||||
}
|
||||
|
||||
pInfo->srcRowIndex = 0;
|
||||
pInfo->srcRowIndex = -1;
|
||||
setOperatorInfo(pOperator, "StreamFillOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_FILL, false, OP_NOT_OPENED, pInfo,
|
||||
pTaskInfo);
|
||||
pOperator->fpSet =
|
||||
|
|
|
@ -1884,7 +1884,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
|
|||
if (pInfo->pRecoverRes != NULL) {
|
||||
pInfo->blockRecoverContiCnt++;
|
||||
calBlockTbName(pInfo, pInfo->pRecoverRes);
|
||||
if (pInfo->pUpdateInfo) {
|
||||
if (!pInfo->igCheckUpdate && pInfo->pUpdateInfo) {
|
||||
if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__SCAN1) {
|
||||
TSKEY maxTs = pAPI->stateStore.updateInfoFillBlockData(pInfo->pUpdateInfo, pInfo->pRecoverRes, pInfo->primaryTsIndex);
|
||||
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs);
|
||||
|
|
|
@ -1623,7 +1623,7 @@ void initIntervalDownStream(SOperatorInfo* downstream, uint16_t type, SStreamInt
|
|||
SStreamScanInfo* pScanInfo = downstream->info;
|
||||
pScanInfo->windowSup.parentType = type;
|
||||
pScanInfo->windowSup.pIntervalAggSup = &pInfo->aggSup;
|
||||
if (!pScanInfo->igCheckUpdate && !pScanInfo->pUpdateInfo) {
|
||||
if (!pScanInfo->pUpdateInfo) {
|
||||
pScanInfo->pUpdateInfo = pAPI->updateInfoInitP(&pInfo->interval, pInfo->twAggSup.waterMark);
|
||||
}
|
||||
|
||||
|
@ -2150,28 +2150,29 @@ static void doBuildPullDataBlock(SArray* array, int32_t* pIndex, SSDataBlock* pB
|
|||
}
|
||||
|
||||
void processPullOver(SSDataBlock* pBlock, SHashObj* pMap, SInterval* pInterval) {
|
||||
SColumnInfoData* pStartCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
|
||||
SColumnInfoData* pStartCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
|
||||
TSKEY* tsData = (TSKEY*)pStartCol->pData;
|
||||
SColumnInfoData* pEndCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
|
||||
SColumnInfoData* pEndCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
|
||||
TSKEY* tsEndData = (TSKEY*)pEndCol->pData;
|
||||
SColumnInfoData* pGroupCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
|
||||
uint64_t* groupIdData = (uint64_t*)pGroupCol->pData;
|
||||
int32_t chId = getChildIndex(pBlock);
|
||||
for (int32_t i = 0; i < pBlock->info.rows; i++) {
|
||||
TSKEY winTs = tsData[i];
|
||||
while (winTs < tsEndData[i]) {
|
||||
while (winTs <= tsEndData[i]) {
|
||||
SWinKey winRes = {.ts = winTs, .groupId = groupIdData[i]};
|
||||
void* chIds = taosHashGet(pMap, &winRes, sizeof(SWinKey));
|
||||
if (chIds) {
|
||||
SArray* chArray = *(SArray**)chIds;
|
||||
int32_t index = taosArraySearchIdx(chArray, &chId, compareInt32Val, TD_EQ);
|
||||
if (index != -1) {
|
||||
qDebug("===stream===window %" PRId64 " delete child id %d", winRes.ts, chId);
|
||||
qDebug("===stream===retrive window %" PRId64 " delete child id %d", winRes.ts, chId);
|
||||
taosArrayRemove(chArray, index);
|
||||
if (taosArrayGetSize(chArray) == 0) {
|
||||
// pull data is over
|
||||
taosArrayDestroy(chArray);
|
||||
taosHashRemove(pMap, &winRes, sizeof(SWinKey));
|
||||
qDebug("===stream===retrive pull data over.window %" PRId64 , winRes.ts);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -2903,7 +2904,7 @@ void initDownStream(SOperatorInfo* downstream, SStreamAggSupporter* pAggSup, uin
|
|||
SStreamScanInfo* pScanInfo = downstream->info;
|
||||
pScanInfo->windowSup = (SWindowSupporter){.pStreamAggSup = pAggSup, .gap = pAggSup->gap, .parentType = type};
|
||||
pScanInfo->pState = pAggSup->pState;
|
||||
if ((!pScanInfo->igCheckUpdate || type == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE) && !pScanInfo->pUpdateInfo) {
|
||||
if (!pScanInfo->pUpdateInfo) {
|
||||
pScanInfo->pUpdateInfo = pAggSup->stateStore.updateInfoInit(60000, TSDB_TIME_PRECISION_MILLI, pTwSup->waterMark);
|
||||
}
|
||||
pScanInfo->twAggSup = *pTwSup;
|
||||
|
|
|
@ -921,6 +921,7 @@ void nodesDestroyNode(SNode* pNode) {
|
|||
break;
|
||||
case QUERY_NODE_CREATE_TOPIC_STMT:
|
||||
nodesDestroyNode(((SCreateTopicStmt*)pNode)->pQuery);
|
||||
nodesDestroyNode(((SCreateTopicStmt*)pNode)->pWhere);
|
||||
break;
|
||||
case QUERY_NODE_DROP_TOPIC_STMT: // no pointer field
|
||||
case QUERY_NODE_DROP_CGROUP_STMT: // no pointer field
|
||||
|
|
|
@ -207,7 +207,7 @@ SNode* createCreateTopicStmtUseQuery(SAstCreateContext* pCxt, bool ignoreExists,
|
|||
SNode* createCreateTopicStmtUseDb(SAstCreateContext* pCxt, bool ignoreExists, SToken* pTopicName, SToken* pSubDbName,
|
||||
bool withMeta);
|
||||
SNode* createCreateTopicStmtUseTable(SAstCreateContext* pCxt, bool ignoreExists, SToken* pTopicName, SNode* pRealTable,
|
||||
bool withMeta);
|
||||
bool withMeta, SNode* pWhere);
|
||||
SNode* createDropTopicStmt(SAstCreateContext* pCxt, bool ignoreNotExists, SToken* pTopicName);
|
||||
SNode* createDropCGroupStmt(SAstCreateContext* pCxt, bool ignoreNotExists, const SToken* pCGroupId, SToken* pTopicName);
|
||||
SNode* createAlterLocalStmt(SAstCreateContext* pCxt, const SToken* pConfig, const SToken* pValue);
|
||||
|
|
|
@ -115,6 +115,7 @@ int32_t getTableIndexFromCache(SParseMetaCache* pMetaCache, const SName* pName,
|
|||
int32_t getTableCfgFromCache(SParseMetaCache* pMetaCache, const SName* pName, STableCfg** pOutput);
|
||||
int32_t getDnodeListFromCache(SParseMetaCache* pMetaCache, SArray** pDnodes);
|
||||
void destoryParseMetaCache(SParseMetaCache* pMetaCache, bool request);
|
||||
SNode* createSelectStmtImpl(bool isDistinct, SNodeList* pProjectionList, SNode* pTable);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -543,9 +543,9 @@ cmd ::= CREATE TOPIC not_exists_opt(A) topic_name(B) AS DATABASE db_name(C).
|
|||
cmd ::= CREATE TOPIC not_exists_opt(A) topic_name(B)
|
||||
WITH META AS DATABASE db_name(C). { pCxt->pRootNode = createCreateTopicStmtUseDb(pCxt, A, &B, &C, true); }
|
||||
cmd ::= CREATE TOPIC not_exists_opt(A) topic_name(B)
|
||||
AS STABLE full_table_name(C). { pCxt->pRootNode = createCreateTopicStmtUseTable(pCxt, A, &B, C, false); }
|
||||
AS STABLE full_table_name(C) where_clause_opt(D). { pCxt->pRootNode = createCreateTopicStmtUseTable(pCxt, A, &B, C, false, D); }
|
||||
cmd ::= CREATE TOPIC not_exists_opt(A) topic_name(B)
|
||||
WITH META AS STABLE full_table_name(C). { pCxt->pRootNode = createCreateTopicStmtUseTable(pCxt, A, &B, C, true); }
|
||||
WITH META AS STABLE full_table_name(C) where_clause_opt(D). { pCxt->pRootNode = createCreateTopicStmtUseTable(pCxt, A, &B, C, true, D); }
|
||||
cmd ::= DROP TOPIC exists_opt(A) topic_name(B). { pCxt->pRootNode = createDropTopicStmt(pCxt, A, &B); }
|
||||
cmd ::= DROP CONSUMER GROUP exists_opt(A) cgroup_name(B) ON topic_name(C). { pCxt->pRootNode = createDropCGroupStmt(pCxt, A, &B, &C); }
|
||||
|
||||
|
|
|
@ -822,16 +822,9 @@ SNode* addFillClause(SAstCreateContext* pCxt, SNode* pStmt, SNode* pFill) {
|
|||
|
||||
SNode* createSelectStmt(SAstCreateContext* pCxt, bool isDistinct, SNodeList* pProjectionList, SNode* pTable) {
|
||||
CHECK_PARSER_STATUS(pCxt);
|
||||
SSelectStmt* select = (SSelectStmt*)nodesMakeNode(QUERY_NODE_SELECT_STMT);
|
||||
SNode* select = createSelectStmtImpl(isDistinct, pProjectionList, pTable);
|
||||
CHECK_OUT_OF_MEM(select);
|
||||
select->isDistinct = isDistinct;
|
||||
select->pProjectionList = pProjectionList;
|
||||
select->pFromTable = pTable;
|
||||
sprintf(select->stmtName, "%p", select);
|
||||
select->isTimeLineResult = true;
|
||||
select->onlyHasKeepOrderFunc = true;
|
||||
select->timeRange = TSWINDOW_INITIALIZER;
|
||||
return (SNode*)select;
|
||||
return select;
|
||||
}
|
||||
|
||||
static void setSubquery(SNode* pStmt) {
|
||||
|
@ -1712,7 +1705,7 @@ SNode* createCreateTopicStmtUseDb(SAstCreateContext* pCxt, bool ignoreExists, ST
|
|||
}
|
||||
|
||||
SNode* createCreateTopicStmtUseTable(SAstCreateContext* pCxt, bool ignoreExists, SToken* pTopicName, SNode* pRealTable,
|
||||
bool withMeta) {
|
||||
bool withMeta, SNode* pWhere) {
|
||||
CHECK_PARSER_STATUS(pCxt);
|
||||
if (!checkTopicName(pCxt, pTopicName)) {
|
||||
return NULL;
|
||||
|
@ -1722,6 +1715,8 @@ SNode* createCreateTopicStmtUseTable(SAstCreateContext* pCxt, bool ignoreExists,
|
|||
COPY_STRING_FORM_ID_TOKEN(pStmt->topicName, pTopicName);
|
||||
pStmt->ignoreExists = ignoreExists;
|
||||
pStmt->withMeta = withMeta;
|
||||
pStmt->pWhere = pWhere;
|
||||
|
||||
strcpy(pStmt->subDbName, ((SRealTableNode*)pRealTable)->table.dbName);
|
||||
strcpy(pStmt->subSTbName, ((SRealTableNode*)pRealTable)->table.tableName);
|
||||
nodesDestroyNode(pRealTable);
|
||||
|
|
|
@ -355,6 +355,11 @@ static int32_t collectMetaKeyFromCreateTopic(SCollectMetaKeyCxt* pCxt, SCreateTo
|
|||
if (NULL != pStmt->pQuery) {
|
||||
return collectMetaKeyFromQuery(pCxt, pStmt->pQuery);
|
||||
}
|
||||
if (NULL != pStmt->pWhere) {
|
||||
int32_t code = collectMetaKeyFromRealTableImpl(pCxt, pStmt->subDbName, pStmt->subSTbName,
|
||||
AUTH_TYPE_READ);
|
||||
return code;
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
|
|
@ -55,6 +55,13 @@ typedef struct STranslateContext {
|
|||
bool showRewrite;
|
||||
} STranslateContext;
|
||||
|
||||
typedef struct SBuildTopicContext {
|
||||
bool colExists;
|
||||
bool colNotFound;
|
||||
STableMeta* pMeta;
|
||||
SNodeList* pTags;
|
||||
} SBuildTopicContext;
|
||||
|
||||
typedef struct SFullDatabaseName {
|
||||
char fullDbName[TSDB_DB_FNAME_LEN];
|
||||
} SFullDatabaseName;
|
||||
|
@ -5823,6 +5830,9 @@ static int32_t buildCreateTopicReq(STranslateContext* pCxt, SCreateTopicStmt* pS
|
|||
toName(pCxt->pParseCxt->acctId, pStmt->subDbName, pStmt->subSTbName, &name);
|
||||
tNameGetFullDbName(&name, pReq->subDbName);
|
||||
tNameExtractFullName(&name, pReq->subStbName);
|
||||
if(pStmt->pQuery != NULL) {
|
||||
code = nodesNodeToString(pStmt->pQuery, false, &pReq->ast, NULL);
|
||||
}
|
||||
} else if ('\0' != pStmt->subDbName[0]) {
|
||||
pReq->subType = TOPIC_SUB_TYPE__DB;
|
||||
tNameSetDbName(&name, pCxt->pParseCxt->acctId, pStmt->subDbName, strlen(pStmt->subDbName));
|
||||
|
@ -5845,12 +5855,108 @@ static int32_t buildCreateTopicReq(STranslateContext* pCxt, SCreateTopicStmt* pS
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t addTagList(SNodeList** ppList, SNode* pNode) {
|
||||
if (NULL == *ppList) {
|
||||
*ppList = nodesMakeList();
|
||||
}
|
||||
|
||||
nodesListStrictAppend(*ppList, pNode);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static EDealRes checkColumnTagsInCond(SNode* pNode, void* pContext) {
|
||||
SBuildTopicContext* pCxt = (SBuildTopicContext*)pContext;
|
||||
if (QUERY_NODE_COLUMN == nodeType(pNode)) {
|
||||
ETableColumnType type;
|
||||
getColumnTypeFromMeta(pCxt->pMeta, ((SColumnNode*)pNode)->colName, &type);
|
||||
if (type == TCOL_TYPE_COLUMN) {
|
||||
pCxt->colExists = true;
|
||||
return DEAL_RES_ERROR;
|
||||
} else if (type == TCOL_TYPE_TAG) {
|
||||
addTagList(&pCxt->pTags, nodesCloneNode(pNode));
|
||||
} else {
|
||||
pCxt->colNotFound = true;
|
||||
return DEAL_RES_ERROR;
|
||||
}
|
||||
} else if (QUERY_NODE_FUNCTION == nodeType(pNode)) {
|
||||
SFunctionNode* pFunc = (SFunctionNode*)pNode;
|
||||
if (0 == strcasecmp(pFunc->functionName, "tbname")) {
|
||||
addTagList(&pCxt->pTags, nodesCloneNode(pNode));
|
||||
}
|
||||
}
|
||||
|
||||
return DEAL_RES_CONTINUE;
|
||||
}
|
||||
|
||||
static int32_t checkCollectTopicTags(STranslateContext* pCxt, SCreateTopicStmt* pStmt, STableMeta* pMeta, SNodeList** ppProjection) {
|
||||
SBuildTopicContext colCxt = {.colExists = false, .colNotFound = false, .pMeta = pMeta, .pTags = NULL};
|
||||
nodesWalkExprPostOrder(pStmt->pWhere, checkColumnTagsInCond, &colCxt);
|
||||
if (colCxt.colNotFound) {
|
||||
nodesDestroyList(colCxt.pTags);
|
||||
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_SYNTAX_ERROR, "Invalid column name");
|
||||
} else if (colCxt.colExists) {
|
||||
nodesDestroyList(colCxt.pTags);
|
||||
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_SYNTAX_ERROR, "Columns are forbidden in where clause");
|
||||
}
|
||||
if (NULL == colCxt.pTags) { // put one column to select
|
||||
// for (int32_t i = 0; i < pMeta->tableInfo.numOfColumns; ++i) {
|
||||
SSchema* column = &pMeta->schema[0];
|
||||
SColumnNode* col = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN);
|
||||
strcpy(col->colName, column->name);
|
||||
strcpy(col->node.aliasName, col->colName);
|
||||
strcpy(col->node.userAlias, col->colName);
|
||||
addTagList(&colCxt.pTags, (SNode*)col);
|
||||
// }
|
||||
}
|
||||
|
||||
*ppProjection = colCxt.pTags;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t buildQueryForTableTopic(STranslateContext* pCxt, SCreateTopicStmt* pStmt, SNode** pSelect) {
|
||||
SParseContext* pParCxt = pCxt->pParseCxt;
|
||||
SRequestConnInfo connInfo = {.pTrans = pParCxt->pTransporter,
|
||||
.requestId = pParCxt->requestId,
|
||||
.requestObjRefId = pParCxt->requestRid,
|
||||
.mgmtEps = pParCxt->mgmtEpSet};
|
||||
SName name;
|
||||
STableMeta* pMeta = NULL;
|
||||
int32_t code = getTableMetaImpl(pCxt, toName(pParCxt->acctId, pStmt->subDbName, pStmt->subSTbName, &name), &pMeta);
|
||||
if (code) {
|
||||
taosMemoryFree(pMeta);
|
||||
return code;
|
||||
}
|
||||
if (TSDB_SUPER_TABLE != pMeta->tableType) {
|
||||
taosMemoryFree(pMeta);
|
||||
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_SYNTAX_ERROR, "Only supertable table can be used");
|
||||
}
|
||||
|
||||
SNodeList* pProjection = NULL;
|
||||
code = checkCollectTopicTags(pCxt, pStmt, pMeta, &pProjection);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
SRealTableNode* realTable = (SRealTableNode*)nodesMakeNode(QUERY_NODE_REAL_TABLE);
|
||||
strcpy(realTable->table.dbName, pStmt->subDbName);
|
||||
strcpy(realTable->table.tableName, pStmt->subSTbName);
|
||||
strcpy(realTable->table.tableAlias, pStmt->subSTbName);
|
||||
*pSelect = createSelectStmtImpl(true, pProjection, (SNode*)realTable);
|
||||
((SSelectStmt*)*pSelect)->pWhere = nodesCloneNode(pStmt->pWhere);
|
||||
pCxt->pParseCxt->topicQuery = true;
|
||||
code = translateQuery(pCxt, *pSelect);
|
||||
}
|
||||
|
||||
taosMemoryFree(pMeta);
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t checkCreateTopic(STranslateContext* pCxt, SCreateTopicStmt* pStmt) {
|
||||
if (NULL == pStmt->pQuery) {
|
||||
if (NULL == pStmt->pQuery && NULL == pStmt->pWhere) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
if (QUERY_NODE_SELECT_STMT == nodeType(pStmt->pQuery)) {
|
||||
if (pStmt->pWhere) {
|
||||
return buildQueryForTableTopic(pCxt, pStmt, &pStmt->pQuery);
|
||||
} else if (QUERY_NODE_SELECT_STMT == nodeType(pStmt->pQuery)) {
|
||||
SSelectStmt* pSelect = (SSelectStmt*)pStmt->pQuery;
|
||||
if (!pSelect->isDistinct &&
|
||||
(NULL != pSelect->pFromTable && QUERY_NODE_REAL_TABLE == nodeType(pSelect->pFromTable)) &&
|
||||
|
|
|
@ -667,6 +667,22 @@ int32_t buildCatalogReq(const SParseMetaCache* pMetaCache, SCatalogReq* pCatalog
|
|||
return code;
|
||||
}
|
||||
|
||||
|
||||
SNode* createSelectStmtImpl(bool isDistinct, SNodeList* pProjectionList, SNode* pTable) {
|
||||
SSelectStmt* select = (SSelectStmt*)nodesMakeNode(QUERY_NODE_SELECT_STMT);
|
||||
if (NULL == select) {
|
||||
return NULL;
|
||||
}
|
||||
select->isDistinct = isDistinct;
|
||||
select->pProjectionList = pProjectionList;
|
||||
select->pFromTable = pTable;
|
||||
sprintf(select->stmtName, "%p", select);
|
||||
select->isTimeLineResult = true;
|
||||
select->onlyHasKeepOrderFunc = true;
|
||||
select->timeRange = TSWINDOW_INITIALIZER;
|
||||
return (SNode*)select;
|
||||
}
|
||||
|
||||
static int32_t putMetaDataToHash(const char* pKey, int32_t len, const SArray* pData, int32_t index, SHashObj** pHash) {
|
||||
if (NULL == *pHash) {
|
||||
*pHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -1145,6 +1145,15 @@ TEST_F(ParserInitialCTest, createTopic) {
|
|||
setCreateTopicReq("tp1", 1, "create topic if not exists tp1 with meta as stable st1", nullptr, "test", "st1", 1);
|
||||
run("CREATE TOPIC IF NOT EXISTS tp1 WITH META AS STABLE st1");
|
||||
clearCreateTopicReq();
|
||||
|
||||
setCreateTopicReq("tp1", 1, "create topic if not exists tp1 as stable st1 where tag1 > 0", nullptr, "test", "st1");
|
||||
run("CREATE TOPIC IF NOT EXISTS tp1 AS STABLE st1 WHERE tag1 > 0");
|
||||
clearCreateTopicReq();
|
||||
|
||||
setCreateTopicReq("tp1", 1, "create topic if not exists tp1 with meta as stable st1 where tag1 > 0", nullptr, "test", "st1", 1);
|
||||
run("CREATE TOPIC IF NOT EXISTS tp1 WITH META AS STABLE st1 WHERE tag1 > 0");
|
||||
clearCreateTopicReq();
|
||||
|
||||
}
|
||||
|
||||
/*
|
||||
|
|
|
@ -454,6 +454,18 @@ int32_t cloneTableMeta(STableMeta* pSrc, STableMeta** pDst) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
void getColumnTypeFromMeta(STableMeta* pMeta, char* pName, ETableColumnType* pType) {
|
||||
int32_t nums = pMeta->tableInfo.numOfTags + pMeta->tableInfo.numOfColumns;
|
||||
for (int32_t i = 0; i < nums; ++i) {
|
||||
if (0 == strcmp(pName, pMeta->schema[i].name)) {
|
||||
*pType = (i < pMeta->tableInfo.numOfColumns) ? TCOL_TYPE_COLUMN : TCOL_TYPE_TAG;
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
*pType = TCOL_TYPE_NONE;
|
||||
}
|
||||
|
||||
void freeVgInfo(SDBVgInfo* vgInfo) {
|
||||
if (NULL == vgInfo) {
|
||||
return;
|
||||
|
|
|
@ -38,6 +38,15 @@ typedef struct {
|
|||
rocksdb_comparator_t** pCompares;
|
||||
} RocksdbCfInst;
|
||||
|
||||
uint32_t nextPow2(uint32_t x) {
|
||||
x = x - 1;
|
||||
x = x | (x >> 1);
|
||||
x = x | (x >> 2);
|
||||
x = x | (x >> 4);
|
||||
x = x | (x >> 8);
|
||||
x = x | (x >> 16);
|
||||
return x + 1;
|
||||
}
|
||||
int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t nCf);
|
||||
|
||||
void destroyRocksdbCfInst(RocksdbCfInst* inst);
|
||||
|
@ -92,6 +101,8 @@ void* streamBackendInit(const char* path) {
|
|||
rocksdb_options_set_recycle_log_file_num(opts, 6);
|
||||
rocksdb_options_set_max_write_buffer_number(opts, 2);
|
||||
rocksdb_options_set_info_log_level(opts, 0);
|
||||
uint32_t dbLimit = nextPow2(tsMaxStreamBackendCache);
|
||||
rocksdb_options_set_db_write_buffer_size(opts, dbLimit << 20);
|
||||
|
||||
pHandle->env = env;
|
||||
pHandle->dbOpt = opts;
|
||||
|
|
|
@ -1062,7 +1062,7 @@ _end:
|
|||
}
|
||||
|
||||
int32_t streamStatePutParName(SStreamState* pState, int64_t groupId, const char tbname[TSDB_TABLE_NAME_LEN]) {
|
||||
qWarn("try to write to cf parname");
|
||||
qDebug("try to write to cf parname");
|
||||
#ifdef USE_ROCKSDB
|
||||
if (tSimpleHashGetSize(pState->parNameMap) > MAX_TABLE_NAME_NUM) {
|
||||
if (tSimpleHashGet(pState->parNameMap, &groupId, sizeof(int64_t)) == NULL) {
|
||||
|
|
|
@ -497,6 +497,7 @@
|
|||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/db.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqError.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/schema.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/stbFilterWhere.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/stbFilter.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqCheckData.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqCheckData1.py
|
||||
|
|
|
@ -108,4 +108,15 @@ if $rows != 6 then
|
|||
return -1
|
||||
endi
|
||||
|
||||
sql create topic topic_stable_1 as stable stb where t1 > 0
|
||||
sql create topic topic_stable_2 as stable stb where t1 > 0 and t1 < 0
|
||||
sql create topic topic_stable_3 as stable stb where 1 > 0
|
||||
sql create topic topic_stable_4 as stable stb where abs(t1) > 0
|
||||
sql_error create topic topic_stable_5 as stable stb where last(t1) > 0
|
||||
sql_error create topic topic_stable_5 as stable stb where sum(t1) > 0
|
||||
sql create topic topic_stable_6 as stable stb where tbname is not null
|
||||
sql create topic topic_stable_7 as stable stb where tbname > 'a'
|
||||
sql_error create topic topic_stable_8 as stable stb where tbname > 0 and xx < 0
|
||||
sql_error create topic topic_stable_9 as stable stb where tbname > 0 and c1 < 0
|
||||
|
||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||
|
|
|
@ -0,0 +1,227 @@
|
|||
|
||||
import taos
|
||||
import sys
|
||||
import time
|
||||
import socket
|
||||
import os
|
||||
import threading
|
||||
|
||||
from util.log import *
|
||||
from util.sql import *
|
||||
from util.cases import *
|
||||
from util.dnodes import *
|
||||
from util.common import *
|
||||
sys.path.append("./7-tmq")
|
||||
from tmqCommon import *
|
||||
|
||||
class TDTestCase:
|
||||
def init(self, conn, logSql, replicaVar=1):
|
||||
self.replicaVar = int(replicaVar)
|
||||
tdLog.debug(f"start to excute {__file__}")
|
||||
tdSql.init(conn.cursor())
|
||||
#tdSql.init(conn.cursor(), logSql) # output sql.txt file
|
||||
|
||||
def prepareTestEnv(self):
|
||||
tdLog.printNoPrefix("======== prepare test env include database, stable, ctables, and insert data: ")
|
||||
paraDict = {'dbName': 'dbt',
|
||||
'dropFlag': 1,
|
||||
'event': '',
|
||||
'vgroups': 4,
|
||||
'replica': 1,
|
||||
'stbName': 'stb',
|
||||
'colPrefix': 'c',
|
||||
'tagPrefix': 't',
|
||||
'colSchema': [{'type': 'INT', 'count':2}, {'type': 'binary', 'len':20, 'count':1}],
|
||||
'tagSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}],
|
||||
'ctbPrefix': 'ctb',
|
||||
'ctbStartIdx': 0,
|
||||
'ctbNum': 10,
|
||||
'rowsPerTbl': 10000,
|
||||
'batchNum': 100,
|
||||
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
||||
'pollDelay': 2,
|
||||
'showMsg': 1,
|
||||
'showRow': 1}
|
||||
|
||||
tmqCom.initConsumerTable()
|
||||
tmqCom.create_database(tsql=tdSql, dbName=paraDict["dbName"],dropFlag=paraDict["dropFlag"], vgroups=paraDict['vgroups'],replica=paraDict['replica'])
|
||||
tdSql.execute("alter database %s wal_retention_period 3600"%(paraDict["dbName"]))
|
||||
tdLog.info("create stb")
|
||||
tmqCom.create_stable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"])
|
||||
tdLog.info("create ctb")
|
||||
tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict['ctbPrefix'], ctbNum=paraDict['ctbNum'])
|
||||
tdLog.info("insert data")
|
||||
tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"],
|
||||
ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
|
||||
startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
|
||||
return
|
||||
|
||||
def tmqCase_columnError(self, topicName, condition):
|
||||
tdLog.printNoPrefix("======== test case error: ")
|
||||
paraDict = {'dbName': 'dbt',
|
||||
'dropFlag': 1,
|
||||
'event': '',
|
||||
'vgroups': 4,
|
||||
'replica': 1,
|
||||
'stbName': 'stb',
|
||||
'colPrefix': 'c',
|
||||
'tagPrefix': 't',
|
||||
'colSchema': [{'type': 'INT', 'count':2}, {'type': 'binary', 'len':20, 'count':1}],
|
||||
'tagSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}],
|
||||
'ctbPrefix': 'ctb',
|
||||
'ctbStartIdx': 0,
|
||||
'ctbNum': 10,
|
||||
'rowsPerTbl': 10000,
|
||||
'batchNum': 100,
|
||||
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
||||
'pollDelay': 2,
|
||||
'showMsg': 1,
|
||||
'showRow': 1}
|
||||
|
||||
tdLog.info("create topics from stb with column filter")
|
||||
topicString = "create topic %s as stable %s.%s where %s" %(topicName, paraDict['dbName'], paraDict['stbName'], condition)
|
||||
tdLog.info("create topic sql: %s"%topicString)
|
||||
tdSql.error(topicString)
|
||||
|
||||
def tmqCase(self, topicName, condition):
|
||||
tdLog.printNoPrefix("======== test case: ")
|
||||
paraDict = {'dbName': 'dbt',
|
||||
'dropFlag': 1,
|
||||
'event': '',
|
||||
'vgroups': 4,
|
||||
'replica': 1,
|
||||
'stbName': 'stb',
|
||||
'colPrefix': 'c',
|
||||
'tagPrefix': 't',
|
||||
'colSchema': [{'type': 'INT', 'count':2}, {'type': 'binary', 'len':20, 'count':1}],
|
||||
'tagSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}],
|
||||
'ctbPrefix': 'ctb',
|
||||
'ctbStartIdx': 0,
|
||||
'ctbNum': 10,
|
||||
'rowsPerTbl': 10000,
|
||||
'batchNum': 100,
|
||||
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
||||
'pollDelay': 2,
|
||||
'showMsg': 1,
|
||||
'showRow': 1}
|
||||
|
||||
expectRowsList = []
|
||||
tmqCom.initConsumerTable()
|
||||
|
||||
tdLog.info("create topics from stb with tag filter")
|
||||
topicString = "create topic %s as stable %s.%s where %s" %(topicName, paraDict['dbName'], paraDict['stbName'], condition)
|
||||
tdLog.info("create topic sql: %s"%topicString)
|
||||
tdSql.execute(topicString)
|
||||
|
||||
queryString = "select * from %s.%s where %s" %(paraDict['dbName'], paraDict['stbName'], condition)
|
||||
tdSql.query(queryString)
|
||||
expectRowsList.append(tdSql.getRows())
|
||||
|
||||
# init consume info, and start tmq_sim, then check consume result
|
||||
tdLog.info("insert consume info to consume processor")
|
||||
consumerId = 0
|
||||
expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"]
|
||||
topicList = topicName
|
||||
ifcheckdata = 0
|
||||
ifManualCommit = 1
|
||||
keyList = 'group.id:cgrp1, enable.auto.commit:false, auto.commit.interval.ms:6000, auto.offset.reset:earliest'
|
||||
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
|
||||
|
||||
tdLog.info("start consume processor")
|
||||
tmqCom.startTmqSimProcess(paraDict['pollDelay'],paraDict["dbName"],paraDict['showMsg'], paraDict['showRow'])
|
||||
|
||||
tdLog.info("wait the consume result")
|
||||
expectRows = 1
|
||||
resultList = tmqCom.selectConsumeResult(expectRows)
|
||||
|
||||
if expectRowsList[0] != resultList[0]:
|
||||
tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[0], resultList[0]))
|
||||
tdLog.exit("0 tmq consume rows error!")
|
||||
|
||||
tdLog.printNoPrefix("======== test case end ...... ")
|
||||
|
||||
def tmqCase_addNewTable_dropTag(self, topicName, condition):
|
||||
tdLog.printNoPrefix("======== test case1: ")
|
||||
paraDict = {'dbName': 'dbt',
|
||||
'dropFlag': 1,
|
||||
'event': '',
|
||||
'vgroups': 4,
|
||||
'replica': 1,
|
||||
'stbName': 'stb',
|
||||
'colPrefix': 'c',
|
||||
'tagPrefix': 't',
|
||||
'colSchema': [{'type': 'INT', 'count':2}, {'type': 'binary', 'len':20, 'count':1}],
|
||||
'tagSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}],
|
||||
'ctbPrefix': 'ctb',
|
||||
'ctbStartIdx': 0,
|
||||
'ctbNum': 10,
|
||||
'rowsPerTbl': 10000,
|
||||
'batchNum': 100,
|
||||
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
||||
'pollDelay': 2,
|
||||
'showMsg': 1,
|
||||
'showRow': 1}
|
||||
|
||||
expectRowsList = []
|
||||
tmqCom.initConsumerTable()
|
||||
|
||||
tdLog.info("create topics from stb with tag filter")
|
||||
topicString = "create topic %s as stable %s.%s where %s" %(topicName, paraDict['dbName'], paraDict['stbName'], condition)
|
||||
tdLog.info("create topic sql: %s"%topicString)
|
||||
tdSql.execute(topicString)
|
||||
|
||||
queryString = "select * from %s.%s where %s" %(paraDict['dbName'], paraDict['stbName'], condition)
|
||||
tdSql.query(queryString)
|
||||
expectRowsList.append(tdSql.getRows() + 1)
|
||||
|
||||
# init consume info, and start tmq_sim, then check consume result
|
||||
tdLog.info("insert consume info to consume processor")
|
||||
consumerId = 0
|
||||
expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"]
|
||||
topicList = topicName
|
||||
ifcheckdata = 0
|
||||
ifManualCommit = 1
|
||||
keyList = 'group.id:cgrp1, enable.auto.commit:false, auto.commit.interval.ms:6000, auto.offset.reset:earliest'
|
||||
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
|
||||
|
||||
tdLog.info("start consume processor")
|
||||
tmqCom.startTmqSimProcess(paraDict['pollDelay'],paraDict["dbName"],paraDict['showMsg'], paraDict['showRow'])
|
||||
|
||||
#add new table with one data
|
||||
tdLog.info("start insert data")
|
||||
insertString = "insert into %s.tmp using %s.%s tags(1, 1, 1, 't4', 't5') values(now, 1, 1, 1, 'c4', 'c5', now)" %(paraDict['dbName'], paraDict['dbName'], paraDict['stbName'])
|
||||
tdSql.execute(insertString)
|
||||
|
||||
#test drop tag
|
||||
tdSql.error("alter stable %s.%s drop tag t1" %(paraDict['dbName'], paraDict['stbName']))
|
||||
tdSql.execute("alter stable %s.%s drop tag t2" %(paraDict['dbName'], paraDict['stbName']))
|
||||
tdSql.execute("alter stable %s.%s drop column c2" %(paraDict['dbName'], paraDict['stbName']))
|
||||
|
||||
tdLog.info("wait the consume result")
|
||||
expectRows = 1
|
||||
resultList = tmqCom.selectConsumeResult(expectRows)
|
||||
|
||||
if expectRowsList[0] != resultList[0]:
|
||||
tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[0], resultList[0]))
|
||||
tdLog.exit("0 tmq consume rows error!")
|
||||
|
||||
tdLog.printNoPrefix("======== test case1 end ...... ")
|
||||
|
||||
def run(self):
|
||||
tdSql.prepare()
|
||||
self.prepareTestEnv()
|
||||
self.tmqCase_columnError("t1", "c1 = 4 and t1 = 3")
|
||||
self.tmqCase("t2", "2 > 1")
|
||||
self.tmqCase("t3", "t4 = 'beijing'")
|
||||
self.tmqCase("t4", "t4 > t3")
|
||||
self.tmqCase("t5", "t3 = t4")
|
||||
self.tmqCase_addNewTable_dropTag("t6", "t1 = 1")
|
||||
|
||||
def stop(self):
|
||||
tdSql.close()
|
||||
tdLog.success(f"{__file__} successfully executed")
|
||||
|
||||
event = threading.Event()
|
||||
|
||||
tdCases.addLinux(__file__, TDTestCase())
|
||||
tdCases.addWindows(__file__, TDTestCase())
|
Loading…
Reference in New Issue