Merge branch '3.0' of https://github.com/taosdata/TDengine into enh/tsdb_optimize
This commit is contained in:
commit
9d1e90f914
|
@ -2009,10 +2009,8 @@ typedef struct {
|
|||
int8_t withMeta;
|
||||
char* sql;
|
||||
char subDbName[TSDB_DB_FNAME_LEN];
|
||||
union {
|
||||
char* ast;
|
||||
char subStbName[TSDB_TABLE_FNAME_LEN];
|
||||
};
|
||||
char* ast;
|
||||
char subStbName[TSDB_TABLE_FNAME_LEN];
|
||||
} SCMCreateTopicReq;
|
||||
|
||||
int32_t tSerializeSCMCreateTopicReq(void* buf, int32_t bufLen, const SCMCreateTopicReq* pReq);
|
||||
|
@ -2822,6 +2820,7 @@ static FORCE_INLINE int32_t tEncodeSMqRebVgReq(void** buf, const SMqRebVgReq* pR
|
|||
tlen += taosEncodeString(buf, pReq->qmsg);
|
||||
} else if (pReq->subType == TOPIC_SUB_TYPE__TABLE) {
|
||||
tlen += taosEncodeFixedI64(buf, pReq->suid);
|
||||
tlen += taosEncodeString(buf, pReq->qmsg);
|
||||
}
|
||||
return tlen;
|
||||
}
|
||||
|
@ -2838,6 +2837,7 @@ static FORCE_INLINE void* tDecodeSMqRebVgReq(const void* buf, SMqRebVgReq* pReq)
|
|||
buf = taosDecodeString(buf, &pReq->qmsg);
|
||||
} else if (pReq->subType == TOPIC_SUB_TYPE__TABLE) {
|
||||
buf = taosDecodeFixedI64(buf, &pReq->suid);
|
||||
buf = taosDecodeString(buf, &pReq->qmsg);
|
||||
}
|
||||
return (void*)buf;
|
||||
}
|
||||
|
|
|
@ -354,6 +354,7 @@
|
|||
#define TK_WAL 336
|
||||
|
||||
|
||||
|
||||
#define TK_NK_SPACE 600
|
||||
#define TK_NK_COMMENT 601
|
||||
#define TK_NK_ILLEGAL 602
|
||||
|
|
|
@ -82,6 +82,8 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers, int32_t v
|
|||
qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* pReaderHandle, int32_t vgId, int32_t* numOfCols,
|
||||
uint64_t id);
|
||||
|
||||
int32_t qGetTableList(int64_t suid, void* pVnode, void* node, SArray **tableList, void* pTaskInfo);
|
||||
|
||||
/**
|
||||
* set the task Id, usually used by message queue process
|
||||
* @param tinfo
|
||||
|
|
|
@ -364,6 +364,7 @@ typedef struct SCreateTopicStmt {
|
|||
bool ignoreExists;
|
||||
bool withMeta;
|
||||
SNode* pQuery;
|
||||
SNode* pWhere;
|
||||
} SCreateTopicStmt;
|
||||
|
||||
typedef struct SDropTopicStmt {
|
||||
|
|
|
@ -51,6 +51,12 @@ typedef enum {
|
|||
TARGET_TYPE_OTHER,
|
||||
} ETargetType;
|
||||
|
||||
typedef enum {
|
||||
TCOL_TYPE_COLUMN = 1,
|
||||
TCOL_TYPE_TAG,
|
||||
TCOL_TYPE_NONE,
|
||||
} ETableColumnType;
|
||||
|
||||
#define QUERY_POLICY_VNODE 1
|
||||
#define QUERY_POLICY_HYBRID 2
|
||||
#define QUERY_POLICY_QNODE 3
|
||||
|
@ -253,6 +259,7 @@ void destroyQueryExecRes(SExecResult* pRes);
|
|||
int32_t dataConverToStr(char* str, int type, void* buf, int32_t bufSize, int32_t* len);
|
||||
char* parseTagDatatoJson(void* p);
|
||||
int32_t cloneTableMeta(STableMeta* pSrc, STableMeta** pDst);
|
||||
void getColumnTypeFromMeta(STableMeta* pMeta, char* pName, ETableColumnType* pType);
|
||||
int32_t cloneDbVgInfo(SDBVgInfo* pSrc, SDBVgInfo** pDst);
|
||||
int32_t cloneSVreateTbReq(SVCreateTbReq* pSrc, SVCreateTbReq** pDst);
|
||||
void freeVgInfo(SDBVgInfo* vgInfo);
|
||||
|
|
|
@ -4002,11 +4002,16 @@ int32_t tSerializeSCMCreateTopicReq(void *buf, int32_t bufLen, const SCMCreateTo
|
|||
if (tEncodeI8(&encoder, pReq->withMeta) < 0) return -1;
|
||||
if (tEncodeCStr(&encoder, pReq->subDbName) < 0) return -1;
|
||||
if (TOPIC_SUB_TYPE__DB == pReq->subType) {
|
||||
} else if (TOPIC_SUB_TYPE__TABLE == pReq->subType) {
|
||||
if (tEncodeCStr(&encoder, pReq->subStbName) < 0) return -1;
|
||||
} else {
|
||||
if (tEncodeI32(&encoder, strlen(pReq->ast)) < 0) return -1;
|
||||
if (tEncodeCStr(&encoder, pReq->ast) < 0) return -1;
|
||||
if (TOPIC_SUB_TYPE__TABLE == pReq->subType) {
|
||||
if (tEncodeCStr(&encoder, pReq->subStbName) < 0) return -1;
|
||||
}
|
||||
if (pReq->ast && strlen(pReq->ast) > 0) {
|
||||
if (tEncodeI32(&encoder, strlen(pReq->ast)) < 0) return -1;
|
||||
if (tEncodeCStr(&encoder, pReq->ast) < 0) return -1;
|
||||
} else {
|
||||
if (tEncodeI32(&encoder, 0) < 0) return -1;
|
||||
}
|
||||
}
|
||||
if (tEncodeI32(&encoder, strlen(pReq->sql)) < 0) return -1;
|
||||
if (tEncodeCStr(&encoder, pReq->sql) < 0) return -1;
|
||||
|
@ -4032,9 +4037,10 @@ int32_t tDeserializeSCMCreateTopicReq(void *buf, int32_t bufLen, SCMCreateTopicR
|
|||
if (tDecodeI8(&decoder, &pReq->withMeta) < 0) return -1;
|
||||
if (tDecodeCStrTo(&decoder, pReq->subDbName) < 0) return -1;
|
||||
if (TOPIC_SUB_TYPE__DB == pReq->subType) {
|
||||
} else if (TOPIC_SUB_TYPE__TABLE == pReq->subType) {
|
||||
if (tDecodeCStrTo(&decoder, pReq->subStbName) < 0) return -1;
|
||||
} else {
|
||||
if (TOPIC_SUB_TYPE__TABLE == pReq->subType) {
|
||||
if (tDecodeCStrTo(&decoder, pReq->subStbName) < 0) return -1;
|
||||
}
|
||||
if (tDecodeI32(&decoder, &astLen) < 0) return -1;
|
||||
if (astLen > 0) {
|
||||
pReq->ast = taosMemoryCalloc(1, astLen + 1);
|
||||
|
@ -4057,7 +4063,7 @@ int32_t tDeserializeSCMCreateTopicReq(void *buf, int32_t bufLen, SCMCreateTopicR
|
|||
|
||||
void tFreeSCMCreateTopicReq(SCMCreateTopicReq *pReq) {
|
||||
taosMemoryFreeClear(pReq->sql);
|
||||
if (TOPIC_SUB_TYPE__COLUMN == pReq->subType) {
|
||||
if (TOPIC_SUB_TYPE__DB != pReq->subType) {
|
||||
taosMemoryFreeClear(pReq->ast);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -513,7 +513,23 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib
|
|||
terrno = TSDB_CODE_QRY_INVALID_INPUT;
|
||||
return -1;
|
||||
}
|
||||
}else if(pTopic->subType == TOPIC_SUB_TYPE__TABLE && pTopic->ast != NULL){
|
||||
SNode *pAst = NULL;
|
||||
if (nodesStringToNode(pTopic->ast, &pAst) != 0) {
|
||||
mError("topic:%s, failed to create since %s", pTopic->name, terrstr());
|
||||
return -1;
|
||||
}
|
||||
|
||||
SPlanContext cxt = {.pAstRoot = pAst, .topicQuery = true};
|
||||
if (qCreateQueryPlan(&cxt, &pPlan, NULL) != 0) {
|
||||
mError("failed to create topic:%s since %s", pTopic->name, terrstr());
|
||||
nodesDestroyNode(pAst);
|
||||
return -1;
|
||||
}
|
||||
nodesDestroyNode(pAst);
|
||||
}
|
||||
|
||||
if(pPlan){
|
||||
int32_t levelNum = LIST_LENGTH(pPlan->pSubplans);
|
||||
if (levelNum != 1) {
|
||||
qDestroyQueryPlan(pPlan);
|
||||
|
@ -554,7 +570,7 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib
|
|||
|
||||
mDebug("init subscription %s for topic:%s assign vgId:%d", pSub->key, pTopic->name, pVgEp->vgId);
|
||||
|
||||
if (pTopic->subType == TOPIC_SUB_TYPE__COLUMN) {
|
||||
if (pSubplan) {
|
||||
int32_t msgLen;
|
||||
|
||||
pSubplan->execNode.epSet = pVgEp->epSet;
|
||||
|
|
|
@ -1230,7 +1230,7 @@ static int32_t mndCheckAlterColForTopic(SMnode *pMnode, const char *stbFullName,
|
|||
|
||||
mInfo("topic:%s, check tag and column modifiable, stb:%s suid:%" PRId64 " colId:%d, subType:%d sql:%s",
|
||||
pTopic->name, stbFullName, suid, colId, pTopic->subType, pTopic->sql);
|
||||
if (pTopic->subType != TOPIC_SUB_TYPE__COLUMN) {
|
||||
if (pTopic->ast == NULL) {
|
||||
sdbRelease(pSdb, pTopic);
|
||||
continue;
|
||||
}
|
||||
|
@ -2272,7 +2272,7 @@ static int32_t mndCheckDropStbForTopic(SMnode *pMnode, const char *stbFullName,
|
|||
}
|
||||
}
|
||||
|
||||
if (pTopic->subType != TOPIC_SUB_TYPE__COLUMN) {
|
||||
if (pTopic->ast == NULL) {
|
||||
sdbRelease(pSdb, pTopic);
|
||||
continue;
|
||||
}
|
||||
|
|
|
@ -422,6 +422,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
|
|||
mError("failed to create topic:%s since %s", pCreate->name, terrstr());
|
||||
taosMemoryFree(topicObj.ast);
|
||||
taosMemoryFree(topicObj.sql);
|
||||
nodesDestroyNode(pAst);
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
@ -429,6 +430,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
|
|||
if (topicObj.ntbColIds == NULL) {
|
||||
taosMemoryFree(topicObj.ast);
|
||||
taosMemoryFree(topicObj.sql);
|
||||
nodesDestroyNode(pAst);
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
|
@ -444,6 +446,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
|
|||
mError("topic:%s, failed to create since %s", pCreate->name, terrstr());
|
||||
taosMemoryFree(topicObj.ast);
|
||||
taosMemoryFree(topicObj.sql);
|
||||
nodesDestroyNode(pAst);
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
@ -465,6 +468,11 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
|
|||
strcpy(topicObj.stbName, pCreate->subStbName);
|
||||
topicObj.stbUid = pStb->uid;
|
||||
mndReleaseStb(pMnode, pStb);
|
||||
if(pCreate->ast != NULL){
|
||||
qDebugL("topic:%s ast %s", topicObj.name, pCreate->ast);
|
||||
topicObj.ast = taosStrdup(pCreate->ast);
|
||||
topicObj.astLen = strlen(pCreate->ast) + 1;
|
||||
}
|
||||
}
|
||||
/*} else if (pCreate->subType == TOPIC_SUB_TYPE__DB) {*/
|
||||
/*topicObj.ast = NULL;*/
|
||||
|
|
|
@ -72,6 +72,8 @@ typedef struct {
|
|||
|
||||
typedef struct {
|
||||
int64_t suid;
|
||||
char* qmsg; // SubPlanToString
|
||||
SNode* node;
|
||||
} STqExecTb;
|
||||
|
||||
typedef struct {
|
||||
|
|
|
@ -75,6 +75,8 @@ static void destroyTqHandle(void* data) {
|
|||
} else if (pData->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
|
||||
walCloseReader(pData->pWalReader);
|
||||
tqReaderClose(pData->execHandle.pTqReader);
|
||||
taosMemoryFreeClear(pData->execHandle.execTb.qmsg);
|
||||
nodesDestroyNode(pData->execHandle.execTb.node);
|
||||
}
|
||||
if(pData->msg != NULL) {
|
||||
rpcFreeCont(pData->msg->pCont);
|
||||
|
@ -655,7 +657,6 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
|
|||
STqHandle tqHandle = {0};
|
||||
pHandle = &tqHandle;
|
||||
|
||||
uint64_t oldConsumerId = pHandle->consumerId;
|
||||
memcpy(pHandle->subKey, req.subKey, TSDB_SUBSCRIBE_KEY_LEN);
|
||||
pHandle->consumerId = req.newConsumerId;
|
||||
pHandle->epoch = -1;
|
||||
|
@ -700,26 +701,37 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
|
|||
} else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
|
||||
pHandle->pWalReader = walOpenReader(pVnode->pWal, NULL);
|
||||
pHandle->execHandle.execTb.suid = req.suid;
|
||||
pHandle->execHandle.execTb.qmsg = req.qmsg;
|
||||
req.qmsg = NULL;
|
||||
|
||||
SArray* tbUidList = taosArrayInit(0, sizeof(int64_t));
|
||||
vnodeGetCtbIdList(pVnode, req.suid, tbUidList);
|
||||
tqDebug("vgId:%d, tq try to get all ctb, suid:%" PRId64, pVnode->config.vgId, req.suid);
|
||||
for (int32_t i = 0; i < taosArrayGetSize(tbUidList); i++) {
|
||||
int64_t tbUid = *(int64_t*)taosArrayGet(tbUidList, i);
|
||||
tqDebug("vgId:%d, idx %d, uid:%" PRId64, vgId, i, tbUid);
|
||||
if(strcmp(pHandle->execHandle.execTb.qmsg, "") != 0) {
|
||||
if (nodesStringToNode(pHandle->execHandle.execTb.qmsg, &pHandle->execHandle.execTb.node) != 0) {
|
||||
tqError("nodesStringToNode error in sub stable, since %s, vgId:%d, subkey:%s consumer:0x%" PRIx64, terrstr(),
|
||||
pVnode->config.vgId, req.subKey, pHandle->consumerId);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
pHandle->execHandle.pTqReader = tqReaderOpen(pVnode);
|
||||
tqReaderSetTbUidList(pHandle->execHandle.pTqReader, tbUidList);
|
||||
taosArrayDestroy(tbUidList);
|
||||
|
||||
buildSnapContext(handle.vnode, handle.version, req.suid, pHandle->execHandle.subType, pHandle->fetchMeta,
|
||||
(SSnapContext**)(&handle.sContext));
|
||||
pHandle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &handle, vgId, NULL, req.newConsumerId);
|
||||
|
||||
SArray* tbUidList = NULL;
|
||||
ret = qGetTableList(req.suid, pVnode, pHandle->execHandle.execTb.node, &tbUidList, pHandle->execHandle.task);
|
||||
if(ret != TDB_CODE_SUCCESS) {
|
||||
tqError("qGetTableList error:%d vgId:%d, subkey:%s consumer:0x%" PRIx64, ret, pVnode->config.vgId, req.subKey, pHandle->consumerId);
|
||||
taosArrayDestroy(tbUidList);
|
||||
goto end;
|
||||
}
|
||||
tqDebug("tq try to get ctb for stb subscribe, vgId:%d, subkey:%s consumer:0x%" PRIx64 " suid:%" PRId64, pVnode->config.vgId, req.subKey, pHandle->consumerId, req.suid);
|
||||
pHandle->execHandle.pTqReader = tqReaderOpen(pVnode);
|
||||
tqReaderSetTbUidList(pHandle->execHandle.pTqReader, tbUidList);
|
||||
taosArrayDestroy(tbUidList);
|
||||
}
|
||||
|
||||
taosHashPut(pTq->pHandle, req.subKey, strlen(req.subKey), pHandle, sizeof(STqHandle));
|
||||
tqDebug("try to persist handle %s consumer:0x%" PRIx64 " , old consumer:0x%" PRIx64, req.subKey,
|
||||
pHandle->consumerId, oldConsumerId);
|
||||
tqDebug("try to persist handle %s consumer:0x%" PRIx64, req.subKey,
|
||||
pHandle->consumerId);
|
||||
ret = tqMetaSaveHandle(pTq, req.subKey, pHandle);
|
||||
goto end;
|
||||
} else {
|
||||
|
@ -1272,13 +1284,15 @@ int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) {
|
|||
SStreamDispatchRsp* pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
|
||||
int32_t taskId = ntohl(pRsp->upstreamTaskId);
|
||||
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
|
||||
tqDebug("recv dispatch rsp, code:%x", pMsg->code);
|
||||
|
||||
int32_t vgId = pTq->pStreamMeta->vgId;
|
||||
if (pTask) {
|
||||
streamProcessDispatchRsp(pTask, pRsp, pMsg->code);
|
||||
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
|
||||
return 0;
|
||||
} else {
|
||||
return -1;
|
||||
tqDebug("vgId:%d failed to handle the dispatch rsp, since find task:0x%x failed", vgId, taskId);
|
||||
return TSDB_CODE_INVALID_MSG;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -37,6 +37,7 @@ int32_t tEncodeSTqHandle(SEncoder* pEncoder, const STqHandle* pHandle) {
|
|||
}
|
||||
} else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
|
||||
if (tEncodeI64(pEncoder, pHandle->execHandle.execTb.suid) < 0) return -1;
|
||||
if (tEncodeCStr(pEncoder, pHandle->execHandle.execTb.qmsg) < 0) return -1;
|
||||
}
|
||||
tEndEncode(pEncoder);
|
||||
return pEncoder->pos;
|
||||
|
@ -64,6 +65,7 @@ int32_t tDecodeSTqHandle(SDecoder* pDecoder, STqHandle* pHandle) {
|
|||
}
|
||||
} else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
|
||||
if (tDecodeI64(pDecoder, &pHandle->execHandle.execTb.suid) < 0) return -1;
|
||||
if (tDecodeCStrAlloc(pDecoder, &pHandle->execHandle.execTb.qmsg) < 0) return -1;
|
||||
}
|
||||
tEndDecode(pDecoder);
|
||||
return 0;
|
||||
|
@ -337,20 +339,27 @@ int32_t tqMetaRestoreHandle(STQ* pTq) {
|
|||
} else if (handle.execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
|
||||
handle.pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);
|
||||
|
||||
SArray* tbUidList = taosArrayInit(0, sizeof(int64_t));
|
||||
vnodeGetCtbIdList(pTq->pVnode, handle.execHandle.execTb.suid, tbUidList);
|
||||
tqDebug("vgId:%d, tq try to get all ctb, suid:%" PRId64, pTq->pVnode->config.vgId, handle.execHandle.execTb.suid);
|
||||
for (int32_t i = 0; i < taosArrayGetSize(tbUidList); i++) {
|
||||
int64_t tbUid = *(int64_t*)taosArrayGet(tbUidList, i);
|
||||
tqDebug("vgId:%d, idx %d, uid:%" PRId64, vgId, i, tbUid);
|
||||
if(strcmp(handle.execHandle.execTb.qmsg, "") != 0) {
|
||||
if (nodesStringToNode(handle.execHandle.execTb.qmsg, &handle.execHandle.execTb.node) != 0) {
|
||||
tqError("nodesStringToNode error in sub stable, since %s", terrstr());
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
handle.execHandle.pTqReader = tqReaderOpen(pTq->pVnode);
|
||||
tqReaderSetTbUidList(handle.execHandle.pTqReader, tbUidList);
|
||||
taosArrayDestroy(tbUidList);
|
||||
|
||||
buildSnapContext(reader.vnode, reader.version, handle.execHandle.execTb.suid, handle.execHandle.subType,
|
||||
handle.fetchMeta, (SSnapContext**)(&reader.sContext));
|
||||
handle.execHandle.task = qCreateQueueExecTaskInfo(NULL, &reader, vgId, NULL, 0);
|
||||
|
||||
SArray* tbUidList = NULL;
|
||||
int ret = qGetTableList(handle.execHandle.execTb.suid, pTq->pVnode, handle.execHandle.execTb.node, &tbUidList, handle.execHandle.task);
|
||||
if(ret != TDB_CODE_SUCCESS) {
|
||||
tqError("qGetTableList error:%d handle %s consumer:0x%" PRIx64, ret, handle.subKey, handle.consumerId);
|
||||
taosArrayDestroy(tbUidList);
|
||||
goto end;
|
||||
}
|
||||
tqDebug("vgId:%d, tq try to get ctb for stb subscribe, suid:%" PRId64, pTq->pVnode->config.vgId, handle.execHandle.execTb.suid);
|
||||
handle.execHandle.pTqReader = tqReaderOpen(pTq->pVnode);
|
||||
tqReaderSetTbUidList(handle.execHandle.pTqReader, tbUidList);
|
||||
taosArrayDestroy(tbUidList);
|
||||
}
|
||||
tqDebug("tq restore %s consumer %" PRId64 " vgId:%d", handle.subKey, handle.consumerId, vgId);
|
||||
taosWLockLatch(&pTq->lock);
|
||||
|
|
|
@ -1083,34 +1083,15 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
|
|||
}
|
||||
} else if (pTqHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
|
||||
if (isAdd) {
|
||||
SArray* qa = taosArrayInit(4, sizeof(tb_uid_t));
|
||||
SMetaReader mr = {0};
|
||||
metaReaderInit(&mr, pTq->pVnode->pMeta, 0);
|
||||
for (int32_t i = 0; i < taosArrayGetSize(tbUidList); ++i) {
|
||||
uint64_t* id = (uint64_t*)taosArrayGet(tbUidList, i);
|
||||
|
||||
int32_t code = metaReaderGetTableEntryByUidCache(&mr, *id);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tqError("failed to get table meta, uid:%" PRIu64 " code:%s", *id, tstrerror(terrno));
|
||||
continue;
|
||||
}
|
||||
|
||||
tDecoderClear(&mr.coder);
|
||||
if (mr.me.type != TSDB_CHILD_TABLE || mr.me.ctbEntry.suid != pTqHandle->execHandle.execTb.suid) {
|
||||
tqDebug("table uid %" PRId64 " does not add to tq handle", *id);
|
||||
continue;
|
||||
}
|
||||
|
||||
tqDebug("table uid %" PRId64 " add to tq handle", *id);
|
||||
taosArrayPush(qa, id);
|
||||
SArray* list = NULL;
|
||||
int ret = qGetTableList(pTqHandle->execHandle.execTb.suid, pTq->pVnode, pTqHandle->execHandle.execTb.node, &list, pTqHandle->execHandle.task);
|
||||
if(ret != TDB_CODE_SUCCESS) {
|
||||
tqError("qGetTableList in tqUpdateTbUidList error:%d handle %s consumer:0x%" PRIx64, ret, pTqHandle->subKey, pTqHandle->consumerId);
|
||||
taosArrayDestroy(list);
|
||||
return ret;
|
||||
}
|
||||
|
||||
metaReaderClear(&mr);
|
||||
if (taosArrayGetSize(qa) > 0) {
|
||||
tqReaderAddTbUidList(pTqHandle->execHandle.pTqReader, qa);
|
||||
}
|
||||
|
||||
taosArrayDestroy(qa);
|
||||
tqReaderSetTbUidList(pTqHandle->execHandle.pTqReader, list);
|
||||
taosArrayDestroy(list);
|
||||
} else {
|
||||
tqReaderRemoveTbUidList(pTqHandle->execHandle.pTqReader, tbUidList);
|
||||
}
|
||||
|
|
|
@ -137,7 +137,7 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d
|
|||
|
||||
int32_t blockSz = taosArrayGetSize(pBlocks);
|
||||
|
||||
tqDebug("vgId:%d, s-task:%s write results blocks:%d into table", TD_VID(pVnode), pTask->id.idStr, blockSz);
|
||||
tqDebug("vgId:%d, s-task:%s write results %d blocks into table", TD_VID(pVnode), pTask->id.idStr, blockSz);
|
||||
|
||||
void* pBuf = NULL;
|
||||
SArray* tagArray = NULL;
|
||||
|
@ -482,17 +482,15 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d
|
|||
tEncoderClear(&encoder);
|
||||
tDestroySubmitReq(&submitReq, TSDB_MSG_FLG_ENCODE);
|
||||
|
||||
SRpcMsg msg = {
|
||||
.msgType = TDMT_VND_SUBMIT,
|
||||
.pCont = pBuf,
|
||||
.contLen = len,
|
||||
};
|
||||
|
||||
SRpcMsg msg = { .msgType = TDMT_VND_SUBMIT, .pCont = pBuf, .contLen = len };
|
||||
if (tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg) != 0) {
|
||||
tqDebug("failed to put into write-queue since %s", terrstr());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
tqDebug("vgId:%d, s-task:%s write results completed", TD_VID(pVnode), pTask->id.idStr);
|
||||
|
||||
_end:
|
||||
taosArrayDestroy(tagArray);
|
||||
taosArrayDestroy(pVals);
|
||||
|
|
|
@ -1121,6 +1121,27 @@ static int32_t getEndPosInDataBlock(STsdbReader* pReader, SBlockData* pBlockData
|
|||
endPos = doBinarySearchKey(pBlockData->aTSKEY, pBlock->nRow, pos, key, pReader->order);
|
||||
}
|
||||
|
||||
if ((pReader->verRange.maxVer >= pBlock->minVer && pReader->verRange.maxVer < pBlock->maxVer)||
|
||||
(pReader->verRange.minVer <= pBlock->maxVer && pReader->verRange.minVer > pBlock->minVer)) {
|
||||
int32_t i = endPos;
|
||||
|
||||
if (asc) {
|
||||
for(; i >= 0; --i) {
|
||||
if (pBlockData->aVersion[i] <= pReader->verRange.maxVer) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
for(; i < pBlock->nRow; ++i) {
|
||||
if (pBlockData->aVersion[i] >= pReader->verRange.minVer) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
endPos = i;
|
||||
}
|
||||
|
||||
return endPos;
|
||||
}
|
||||
|
||||
|
@ -1260,10 +1281,11 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
// row index of dump info remain the initial position, let's find the appropriate start position.
|
||||
if ((pDumpInfo->rowIndex == 0 && asc) || (pDumpInfo->rowIndex == pBlock->nRow - 1 && (!asc))) {
|
||||
if (asc && pReader->window.skey <= pBlock->minKey.ts) {
|
||||
if (asc && pReader->window.skey <= pBlock->minKey.ts && pReader->verRange.minVer <= pBlock->minVer) {
|
||||
// pDumpInfo->rowIndex = 0;
|
||||
} else if (!asc && pReader->window.ekey >= pBlock->maxKey.ts) {
|
||||
} else if (!asc && pReader->window.ekey >= pBlock->maxKey.ts && pReader->verRange.maxVer >= pBlock->maxVer) {
|
||||
// pDumpInfo->rowIndex = pBlock->nRow - 1;
|
||||
} else { // find the appropriate the start position in current block, and set it to be the current rowIndex
|
||||
int32_t pos = asc ? pBlock->nRow - 1 : 0;
|
||||
|
@ -1279,6 +1301,29 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader) {
|
|||
pBlock->maxVer, pReader->idStr);
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
}
|
||||
|
||||
ASSERT(pReader->verRange.minVer <= pBlock->maxVer && pReader->verRange.maxVer >= pBlock->minVer);
|
||||
|
||||
// find the appropriate start position that satisfies the version requirement.
|
||||
if ((pReader->verRange.maxVer >= pBlock->minVer && pReader->verRange.maxVer < pBlock->maxVer)||
|
||||
(pReader->verRange.minVer <= pBlock->maxVer && pReader->verRange.minVer > pBlock->minVer)) {
|
||||
int32_t i = pDumpInfo->rowIndex;
|
||||
if (asc) {
|
||||
for(; i < pBlock->nRow; ++i) {
|
||||
if (pBlockData->aVersion[i] >= pReader->verRange.minVer) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
for(; i >= 0; --i) {
|
||||
if (pBlockData->aVersion[i] <= pReader->verRange.maxVer) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pDumpInfo->rowIndex = i;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1293,6 +1338,9 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader) {
|
|||
int32_t dumpedRows = asc ? (endIndex - pDumpInfo->rowIndex) : (pDumpInfo->rowIndex - endIndex);
|
||||
if (dumpedRows > pReader->resBlockInfo.capacity) { // output buffer check
|
||||
dumpedRows = pReader->resBlockInfo.capacity;
|
||||
} else if (dumpedRows <= 0) { // no qualified rows in current data block, abort directly.
|
||||
setBlockAllDumped(pDumpInfo, pReader->window.ekey, pReader->order);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t i = 0;
|
||||
|
@ -1848,7 +1896,7 @@ static bool isCleanFileDataBlock(STsdbReader* pReader, SFileDataBlockInfo* pBloc
|
|||
SDataBlockToLoadInfo info = {0};
|
||||
getBlockToLoadInfo(&info, pBlockInfo, pBlock, pScanInfo, keyInBuf, pLastBlockReader, pReader);
|
||||
bool isCleanFileBlock = !(info.overlapWithNeighborBlock || info.hasDupTs || info.overlapWithKeyInBuf ||
|
||||
info.overlapWithDelInfo || info.overlapWithLastBlock || info.partiallyRequired);
|
||||
info.overlapWithDelInfo || info.overlapWithLastBlock);
|
||||
return isCleanFileBlock;
|
||||
}
|
||||
|
||||
|
@ -2809,7 +2857,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
|
|||
// it is a clean block, load it directly
|
||||
if (isCleanFileDataBlock(pReader, pBlockInfo, pBlock, pBlockScanInfo, keyInBuf, pLastBlockReader) &&
|
||||
pBlock->nRow <= pReader->resBlockInfo.capacity) {
|
||||
if (asc || ((!asc) && (!hasDataInLastBlock(pLastBlockReader)))) {
|
||||
if (asc || (!hasDataInLastBlock(pLastBlockReader))) {
|
||||
code = copyBlockDataToSDataBlock(pReader);
|
||||
if (code) {
|
||||
goto _end;
|
||||
|
@ -2828,7 +2876,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
|
|||
}
|
||||
}
|
||||
|
||||
SBlockData* pBlockData = &pReader->status.fileBlockData;
|
||||
SBlockData* pBlockData = &pReader->status.fileBlockData;
|
||||
|
||||
while (1) {
|
||||
bool hasBlockData = false;
|
||||
|
@ -2842,7 +2890,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
|
|||
|
||||
pDumpInfo->rowIndex += step;
|
||||
|
||||
SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter);
|
||||
pBlock = getCurrentBlock(&pReader->status.blockIter);
|
||||
if (pDumpInfo->rowIndex >= pBlock->nRow || pDumpInfo->rowIndex < 0) {
|
||||
pBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter); // NOTE: get the new block info
|
||||
|
||||
|
@ -2870,7 +2918,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
|
|||
|
||||
// currently loaded file data block is consumed
|
||||
if ((pBlockData->nRow > 0) && (pDumpInfo->rowIndex >= pBlockData->nRow || pDumpInfo->rowIndex < 0)) {
|
||||
SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter);
|
||||
pBlock = getCurrentBlock(&pReader->status.blockIter);
|
||||
setBlockAllDumped(pDumpInfo, pBlock->maxKey.ts, pReader->order);
|
||||
break;
|
||||
}
|
||||
|
|
|
@ -960,6 +960,7 @@ static int32_t doFilterByTagCond(STableListInfo* pListInfo, SArray* pUidList, SN
|
|||
SArray* pBlockList = NULL;
|
||||
SSDataBlock* pResBlock = NULL;
|
||||
SScalarParam output = {0};
|
||||
SArray* pUidTagList = NULL;
|
||||
|
||||
tagFilterAssist ctx = {0};
|
||||
ctx.colHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_SMALLINT), false, HASH_NO_LOCK);
|
||||
|
@ -979,7 +980,7 @@ static int32_t doFilterByTagCond(STableListInfo* pListInfo, SArray* pUidList, SN
|
|||
SDataType type = {.type = TSDB_DATA_TYPE_BOOL, .bytes = sizeof(bool)};
|
||||
|
||||
// int64_t stt = taosGetTimestampUs();
|
||||
SArray* pUidTagList = taosArrayInit(10, sizeof(STUidTagInfo));
|
||||
pUidTagList = taosArrayInit(10, sizeof(STUidTagInfo));
|
||||
copyExistedUids(pUidTagList, pUidList);
|
||||
|
||||
FilterCondType condType = checkTagCond(pTagCond);
|
||||
|
@ -1151,6 +1152,21 @@ _end:
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t qGetTableList(int64_t suid, void* pVnode, void* node, SArray **tableList, void* pTaskInfo){
|
||||
SSubplan *pSubplan = (SSubplan *)node;
|
||||
SScanPhysiNode pNode = {0};
|
||||
pNode.suid = suid;
|
||||
pNode.uid = suid;
|
||||
pNode.tableType = TSDB_SUPER_TABLE;
|
||||
STableListInfo* pTableListInfo = tableListCreate();
|
||||
uint8_t digest[17] = {0};
|
||||
int code = getTableList(pVnode, &pNode, pSubplan ? pSubplan->pTagCond : NULL, pSubplan ? pSubplan->pTagIndexCond : NULL, pTableListInfo, digest, "qGetTableList", &((SExecTaskInfo*)pTaskInfo)->storageAPI);
|
||||
*tableList = pTableListInfo->pTableList;
|
||||
pTableListInfo->pTableList = NULL;
|
||||
tableListDestroy(pTableListInfo);
|
||||
return code;
|
||||
}
|
||||
|
||||
size_t getTableTagsBufLen(const SNodeList* pGroups) {
|
||||
size_t keyLen = 0;
|
||||
|
||||
|
|
|
@ -132,7 +132,8 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
|
|||
pOperator->status = OP_NOT_OPENED;
|
||||
|
||||
SStreamScanInfo* pInfo = pOperator->info;
|
||||
qDebug("s-task:%s set source blocks:%d", id, (int32_t)numOfBlocks);
|
||||
|
||||
qDebug("s-task:%s in this batch, all %d blocks need to be processed and dump results", id, (int32_t)numOfBlocks);
|
||||
ASSERT(pInfo->validBlockIndex == 0 && taosArrayGetSize(pInfo->pBlockLists) == 0);
|
||||
|
||||
if (type == STREAM_INPUT__MERGED_SUBMIT) {
|
||||
|
@ -140,6 +141,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
|
|||
SPackedData* pReq = POINTER_SHIFT(input, i * sizeof(SPackedData));
|
||||
taosArrayPush(pInfo->pBlockLists, pReq);
|
||||
}
|
||||
|
||||
pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
|
||||
} else if (type == STREAM_INPUT__DATA_SUBMIT) {
|
||||
taosArrayPush(pInfo->pBlockLists, input);
|
||||
|
@ -150,6 +152,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
|
|||
SPackedData tmp = { .pDataBlock = pDataBlock };
|
||||
taosArrayPush(pInfo->pBlockLists, &tmp);
|
||||
}
|
||||
|
||||
pInfo->blockType = STREAM_INPUT__DATA_BLOCK;
|
||||
} else {
|
||||
ASSERT(0);
|
||||
|
|
|
@ -1797,9 +1797,10 @@ void streamScanOperatorDecode(void* pBuff, int32_t len, SStreamScanInfo* pInfo)
|
|||
|
||||
static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
|
||||
// NOTE: this operator does never check if current status is done or not
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
const char* id = GET_TASKID(pTaskInfo);
|
||||
|
||||
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
|
||||
SStreamScanInfo* pInfo = pOperator->info;
|
||||
|
||||
qDebug("stream scan started, %s", GET_TASKID(pTaskInfo));
|
||||
|
@ -1922,7 +1923,9 @@ FETCH_NEXT_BLOCK:
|
|||
return NULL;
|
||||
}
|
||||
|
||||
int32_t current = pInfo->validBlockIndex++;
|
||||
int32_t current = pInfo->validBlockIndex++;
|
||||
qDebug("process %d/%d input data blocks, %s", current, (int32_t) total, id);
|
||||
|
||||
SPackedData* pPacked = taosArrayGet(pInfo->pBlockLists, current);
|
||||
SSDataBlock* pBlock = pPacked->pDataBlock;
|
||||
if (pBlock->info.parTbName[0]) {
|
||||
|
@ -2057,7 +2060,6 @@ FETCH_NEXT_BLOCK:
|
|||
return pInfo->pUpdateRes;
|
||||
}
|
||||
|
||||
const char* id = GET_TASKID(pTaskInfo);
|
||||
SSDataBlock* pBlock = pInfo->pRes;
|
||||
SDataBlockInfo* pBlockInfo = &pBlock->info;
|
||||
int32_t totalBlocks = taosArrayGetSize(pInfo->pBlockLists);
|
||||
|
|
|
@ -921,6 +921,7 @@ void nodesDestroyNode(SNode* pNode) {
|
|||
break;
|
||||
case QUERY_NODE_CREATE_TOPIC_STMT:
|
||||
nodesDestroyNode(((SCreateTopicStmt*)pNode)->pQuery);
|
||||
nodesDestroyNode(((SCreateTopicStmt*)pNode)->pWhere);
|
||||
break;
|
||||
case QUERY_NODE_DROP_TOPIC_STMT: // no pointer field
|
||||
case QUERY_NODE_DROP_CGROUP_STMT: // no pointer field
|
||||
|
|
|
@ -207,7 +207,7 @@ SNode* createCreateTopicStmtUseQuery(SAstCreateContext* pCxt, bool ignoreExists,
|
|||
SNode* createCreateTopicStmtUseDb(SAstCreateContext* pCxt, bool ignoreExists, SToken* pTopicName, SToken* pSubDbName,
|
||||
bool withMeta);
|
||||
SNode* createCreateTopicStmtUseTable(SAstCreateContext* pCxt, bool ignoreExists, SToken* pTopicName, SNode* pRealTable,
|
||||
bool withMeta);
|
||||
bool withMeta, SNode* pWhere);
|
||||
SNode* createDropTopicStmt(SAstCreateContext* pCxt, bool ignoreNotExists, SToken* pTopicName);
|
||||
SNode* createDropCGroupStmt(SAstCreateContext* pCxt, bool ignoreNotExists, const SToken* pCGroupId, SToken* pTopicName);
|
||||
SNode* createAlterLocalStmt(SAstCreateContext* pCxt, const SToken* pConfig, const SToken* pValue);
|
||||
|
|
|
@ -115,6 +115,7 @@ int32_t getTableIndexFromCache(SParseMetaCache* pMetaCache, const SName* pName,
|
|||
int32_t getTableCfgFromCache(SParseMetaCache* pMetaCache, const SName* pName, STableCfg** pOutput);
|
||||
int32_t getDnodeListFromCache(SParseMetaCache* pMetaCache, SArray** pDnodes);
|
||||
void destoryParseMetaCache(SParseMetaCache* pMetaCache, bool request);
|
||||
SNode* createSelectStmtImpl(bool isDistinct, SNodeList* pProjectionList, SNode* pTable);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -543,9 +543,9 @@ cmd ::= CREATE TOPIC not_exists_opt(A) topic_name(B) AS DATABASE db_name(C).
|
|||
cmd ::= CREATE TOPIC not_exists_opt(A) topic_name(B)
|
||||
WITH META AS DATABASE db_name(C). { pCxt->pRootNode = createCreateTopicStmtUseDb(pCxt, A, &B, &C, true); }
|
||||
cmd ::= CREATE TOPIC not_exists_opt(A) topic_name(B)
|
||||
AS STABLE full_table_name(C). { pCxt->pRootNode = createCreateTopicStmtUseTable(pCxt, A, &B, C, false); }
|
||||
AS STABLE full_table_name(C) where_clause_opt(D). { pCxt->pRootNode = createCreateTopicStmtUseTable(pCxt, A, &B, C, false, D); }
|
||||
cmd ::= CREATE TOPIC not_exists_opt(A) topic_name(B)
|
||||
WITH META AS STABLE full_table_name(C). { pCxt->pRootNode = createCreateTopicStmtUseTable(pCxt, A, &B, C, true); }
|
||||
WITH META AS STABLE full_table_name(C) where_clause_opt(D). { pCxt->pRootNode = createCreateTopicStmtUseTable(pCxt, A, &B, C, true, D); }
|
||||
cmd ::= DROP TOPIC exists_opt(A) topic_name(B). { pCxt->pRootNode = createDropTopicStmt(pCxt, A, &B); }
|
||||
cmd ::= DROP CONSUMER GROUP exists_opt(A) cgroup_name(B) ON topic_name(C). { pCxt->pRootNode = createDropCGroupStmt(pCxt, A, &B, &C); }
|
||||
|
||||
|
|
|
@ -822,16 +822,9 @@ SNode* addFillClause(SAstCreateContext* pCxt, SNode* pStmt, SNode* pFill) {
|
|||
|
||||
SNode* createSelectStmt(SAstCreateContext* pCxt, bool isDistinct, SNodeList* pProjectionList, SNode* pTable) {
|
||||
CHECK_PARSER_STATUS(pCxt);
|
||||
SSelectStmt* select = (SSelectStmt*)nodesMakeNode(QUERY_NODE_SELECT_STMT);
|
||||
SNode* select = createSelectStmtImpl(isDistinct, pProjectionList, pTable);
|
||||
CHECK_OUT_OF_MEM(select);
|
||||
select->isDistinct = isDistinct;
|
||||
select->pProjectionList = pProjectionList;
|
||||
select->pFromTable = pTable;
|
||||
sprintf(select->stmtName, "%p", select);
|
||||
select->isTimeLineResult = true;
|
||||
select->onlyHasKeepOrderFunc = true;
|
||||
select->timeRange = TSWINDOW_INITIALIZER;
|
||||
return (SNode*)select;
|
||||
return select;
|
||||
}
|
||||
|
||||
static void setSubquery(SNode* pStmt) {
|
||||
|
@ -1712,7 +1705,7 @@ SNode* createCreateTopicStmtUseDb(SAstCreateContext* pCxt, bool ignoreExists, ST
|
|||
}
|
||||
|
||||
SNode* createCreateTopicStmtUseTable(SAstCreateContext* pCxt, bool ignoreExists, SToken* pTopicName, SNode* pRealTable,
|
||||
bool withMeta) {
|
||||
bool withMeta, SNode* pWhere) {
|
||||
CHECK_PARSER_STATUS(pCxt);
|
||||
if (!checkTopicName(pCxt, pTopicName)) {
|
||||
return NULL;
|
||||
|
@ -1722,6 +1715,8 @@ SNode* createCreateTopicStmtUseTable(SAstCreateContext* pCxt, bool ignoreExists,
|
|||
COPY_STRING_FORM_ID_TOKEN(pStmt->topicName, pTopicName);
|
||||
pStmt->ignoreExists = ignoreExists;
|
||||
pStmt->withMeta = withMeta;
|
||||
pStmt->pWhere = pWhere;
|
||||
|
||||
strcpy(pStmt->subDbName, ((SRealTableNode*)pRealTable)->table.dbName);
|
||||
strcpy(pStmt->subSTbName, ((SRealTableNode*)pRealTable)->table.tableName);
|
||||
nodesDestroyNode(pRealTable);
|
||||
|
|
|
@ -355,6 +355,11 @@ static int32_t collectMetaKeyFromCreateTopic(SCollectMetaKeyCxt* pCxt, SCreateTo
|
|||
if (NULL != pStmt->pQuery) {
|
||||
return collectMetaKeyFromQuery(pCxt, pStmt->pQuery);
|
||||
}
|
||||
if (NULL != pStmt->pWhere) {
|
||||
int32_t code = collectMetaKeyFromRealTableImpl(pCxt, pStmt->subDbName, pStmt->subSTbName,
|
||||
AUTH_TYPE_READ);
|
||||
return code;
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
|
|
@ -55,6 +55,13 @@ typedef struct STranslateContext {
|
|||
bool showRewrite;
|
||||
} STranslateContext;
|
||||
|
||||
typedef struct SBuildTopicContext {
|
||||
bool colExists;
|
||||
bool colNotFound;
|
||||
STableMeta* pMeta;
|
||||
SNodeList* pTags;
|
||||
} SBuildTopicContext;
|
||||
|
||||
typedef struct SFullDatabaseName {
|
||||
char fullDbName[TSDB_DB_FNAME_LEN];
|
||||
} SFullDatabaseName;
|
||||
|
@ -5823,6 +5830,9 @@ static int32_t buildCreateTopicReq(STranslateContext* pCxt, SCreateTopicStmt* pS
|
|||
toName(pCxt->pParseCxt->acctId, pStmt->subDbName, pStmt->subSTbName, &name);
|
||||
tNameGetFullDbName(&name, pReq->subDbName);
|
||||
tNameExtractFullName(&name, pReq->subStbName);
|
||||
if(pStmt->pQuery != NULL) {
|
||||
code = nodesNodeToString(pStmt->pQuery, false, &pReq->ast, NULL);
|
||||
}
|
||||
} else if ('\0' != pStmt->subDbName[0]) {
|
||||
pReq->subType = TOPIC_SUB_TYPE__DB;
|
||||
tNameSetDbName(&name, pCxt->pParseCxt->acctId, pStmt->subDbName, strlen(pStmt->subDbName));
|
||||
|
@ -5845,12 +5855,108 @@ static int32_t buildCreateTopicReq(STranslateContext* pCxt, SCreateTopicStmt* pS
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t addTagList(SNodeList** ppList, SNode* pNode) {
|
||||
if (NULL == *ppList) {
|
||||
*ppList = nodesMakeList();
|
||||
}
|
||||
|
||||
nodesListStrictAppend(*ppList, pNode);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static EDealRes checkColumnTagsInCond(SNode* pNode, void* pContext) {
|
||||
SBuildTopicContext* pCxt = (SBuildTopicContext*)pContext;
|
||||
if (QUERY_NODE_COLUMN == nodeType(pNode)) {
|
||||
ETableColumnType type;
|
||||
getColumnTypeFromMeta(pCxt->pMeta, ((SColumnNode*)pNode)->colName, &type);
|
||||
if (type == TCOL_TYPE_COLUMN) {
|
||||
pCxt->colExists = true;
|
||||
return DEAL_RES_ERROR;
|
||||
} else if (type == TCOL_TYPE_TAG) {
|
||||
addTagList(&pCxt->pTags, nodesCloneNode(pNode));
|
||||
} else {
|
||||
pCxt->colNotFound = true;
|
||||
return DEAL_RES_ERROR;
|
||||
}
|
||||
} else if (QUERY_NODE_FUNCTION == nodeType(pNode)) {
|
||||
SFunctionNode* pFunc = (SFunctionNode*)pNode;
|
||||
if (0 == strcasecmp(pFunc->functionName, "tbname")) {
|
||||
addTagList(&pCxt->pTags, nodesCloneNode(pNode));
|
||||
}
|
||||
}
|
||||
|
||||
return DEAL_RES_CONTINUE;
|
||||
}
|
||||
|
||||
static int32_t checkCollectTopicTags(STranslateContext* pCxt, SCreateTopicStmt* pStmt, STableMeta* pMeta, SNodeList** ppProjection) {
|
||||
SBuildTopicContext colCxt = {.colExists = false, .colNotFound = false, .pMeta = pMeta, .pTags = NULL};
|
||||
nodesWalkExprPostOrder(pStmt->pWhere, checkColumnTagsInCond, &colCxt);
|
||||
if (colCxt.colNotFound) {
|
||||
nodesDestroyList(colCxt.pTags);
|
||||
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_SYNTAX_ERROR, "Invalid column name");
|
||||
} else if (colCxt.colExists) {
|
||||
nodesDestroyList(colCxt.pTags);
|
||||
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_SYNTAX_ERROR, "Columns are forbidden in where clause");
|
||||
}
|
||||
if (NULL == colCxt.pTags) { // put one column to select
|
||||
// for (int32_t i = 0; i < pMeta->tableInfo.numOfColumns; ++i) {
|
||||
SSchema* column = &pMeta->schema[0];
|
||||
SColumnNode* col = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN);
|
||||
strcpy(col->colName, column->name);
|
||||
strcpy(col->node.aliasName, col->colName);
|
||||
strcpy(col->node.userAlias, col->colName);
|
||||
addTagList(&colCxt.pTags, (SNode*)col);
|
||||
// }
|
||||
}
|
||||
|
||||
*ppProjection = colCxt.pTags;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t buildQueryForTableTopic(STranslateContext* pCxt, SCreateTopicStmt* pStmt, SNode** pSelect) {
|
||||
SParseContext* pParCxt = pCxt->pParseCxt;
|
||||
SRequestConnInfo connInfo = {.pTrans = pParCxt->pTransporter,
|
||||
.requestId = pParCxt->requestId,
|
||||
.requestObjRefId = pParCxt->requestRid,
|
||||
.mgmtEps = pParCxt->mgmtEpSet};
|
||||
SName name;
|
||||
STableMeta* pMeta = NULL;
|
||||
int32_t code = getTableMetaImpl(pCxt, toName(pParCxt->acctId, pStmt->subDbName, pStmt->subSTbName, &name), &pMeta);
|
||||
if (code) {
|
||||
taosMemoryFree(pMeta);
|
||||
return code;
|
||||
}
|
||||
if (TSDB_SUPER_TABLE != pMeta->tableType) {
|
||||
taosMemoryFree(pMeta);
|
||||
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_SYNTAX_ERROR, "Only supertable table can be used");
|
||||
}
|
||||
|
||||
SNodeList* pProjection = NULL;
|
||||
code = checkCollectTopicTags(pCxt, pStmt, pMeta, &pProjection);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
SRealTableNode* realTable = (SRealTableNode*)nodesMakeNode(QUERY_NODE_REAL_TABLE);
|
||||
strcpy(realTable->table.dbName, pStmt->subDbName);
|
||||
strcpy(realTable->table.tableName, pStmt->subSTbName);
|
||||
strcpy(realTable->table.tableAlias, pStmt->subSTbName);
|
||||
*pSelect = createSelectStmtImpl(true, pProjection, (SNode*)realTable);
|
||||
((SSelectStmt*)*pSelect)->pWhere = nodesCloneNode(pStmt->pWhere);
|
||||
pCxt->pParseCxt->topicQuery = true;
|
||||
code = translateQuery(pCxt, *pSelect);
|
||||
}
|
||||
|
||||
taosMemoryFree(pMeta);
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t checkCreateTopic(STranslateContext* pCxt, SCreateTopicStmt* pStmt) {
|
||||
if (NULL == pStmt->pQuery) {
|
||||
if (NULL == pStmt->pQuery && NULL == pStmt->pWhere) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
if (QUERY_NODE_SELECT_STMT == nodeType(pStmt->pQuery)) {
|
||||
if (pStmt->pWhere) {
|
||||
return buildQueryForTableTopic(pCxt, pStmt, &pStmt->pQuery);
|
||||
} else if (QUERY_NODE_SELECT_STMT == nodeType(pStmt->pQuery)) {
|
||||
SSelectStmt* pSelect = (SSelectStmt*)pStmt->pQuery;
|
||||
if (!pSelect->isDistinct &&
|
||||
(NULL != pSelect->pFromTable && QUERY_NODE_REAL_TABLE == nodeType(pSelect->pFromTable)) &&
|
||||
|
|
|
@ -667,6 +667,22 @@ int32_t buildCatalogReq(const SParseMetaCache* pMetaCache, SCatalogReq* pCatalog
|
|||
return code;
|
||||
}
|
||||
|
||||
|
||||
SNode* createSelectStmtImpl(bool isDistinct, SNodeList* pProjectionList, SNode* pTable) {
|
||||
SSelectStmt* select = (SSelectStmt*)nodesMakeNode(QUERY_NODE_SELECT_STMT);
|
||||
if (NULL == select) {
|
||||
return NULL;
|
||||
}
|
||||
select->isDistinct = isDistinct;
|
||||
select->pProjectionList = pProjectionList;
|
||||
select->pFromTable = pTable;
|
||||
sprintf(select->stmtName, "%p", select);
|
||||
select->isTimeLineResult = true;
|
||||
select->onlyHasKeepOrderFunc = true;
|
||||
select->timeRange = TSWINDOW_INITIALIZER;
|
||||
return (SNode*)select;
|
||||
}
|
||||
|
||||
static int32_t putMetaDataToHash(const char* pKey, int32_t len, const SArray* pData, int32_t index, SHashObj** pHash) {
|
||||
if (NULL == *pHash) {
|
||||
*pHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -1145,6 +1145,15 @@ TEST_F(ParserInitialCTest, createTopic) {
|
|||
setCreateTopicReq("tp1", 1, "create topic if not exists tp1 with meta as stable st1", nullptr, "test", "st1", 1);
|
||||
run("CREATE TOPIC IF NOT EXISTS tp1 WITH META AS STABLE st1");
|
||||
clearCreateTopicReq();
|
||||
|
||||
setCreateTopicReq("tp1", 1, "create topic if not exists tp1 as stable st1 where tag1 > 0", nullptr, "test", "st1");
|
||||
run("CREATE TOPIC IF NOT EXISTS tp1 AS STABLE st1 WHERE tag1 > 0");
|
||||
clearCreateTopicReq();
|
||||
|
||||
setCreateTopicReq("tp1", 1, "create topic if not exists tp1 with meta as stable st1 where tag1 > 0", nullptr, "test", "st1", 1);
|
||||
run("CREATE TOPIC IF NOT EXISTS tp1 WITH META AS STABLE st1 WHERE tag1 > 0");
|
||||
clearCreateTopicReq();
|
||||
|
||||
}
|
||||
|
||||
/*
|
||||
|
|
|
@ -454,6 +454,18 @@ int32_t cloneTableMeta(STableMeta* pSrc, STableMeta** pDst) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
void getColumnTypeFromMeta(STableMeta* pMeta, char* pName, ETableColumnType* pType) {
|
||||
int32_t nums = pMeta->tableInfo.numOfTags + pMeta->tableInfo.numOfColumns;
|
||||
for (int32_t i = 0; i < nums; ++i) {
|
||||
if (0 == strcmp(pName, pMeta->schema[i].name)) {
|
||||
*pType = (i < pMeta->tableInfo.numOfColumns) ? TCOL_TYPE_COLUMN : TCOL_TYPE_TAG;
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
*pType = TCOL_TYPE_NONE;
|
||||
}
|
||||
|
||||
void freeVgInfo(SDBVgInfo* vgInfo) {
|
||||
if (NULL == vgInfo) {
|
||||
return;
|
||||
|
|
|
@ -126,7 +126,7 @@ int32_t streamTaskEnqueueBlocks(SStreamTask* pTask, const SStreamDispatchReq* pR
|
|||
if (pBlock == NULL) {
|
||||
streamTaskInputFail(pTask);
|
||||
status = TASK_INPUT_STATUS__FAILED;
|
||||
qDebug("vgId:%d, s-task:%s failed to receive dispatch msg, reason: out of memory", pTask->pMeta->vgId,
|
||||
qError("vgId:%d, s-task:%s failed to receive dispatch msg, reason: out of memory", pTask->pMeta->vgId,
|
||||
pTask->id.idStr);
|
||||
} else {
|
||||
int32_t code = tAppendDataToInputQueue(pTask, (SStreamQueueItem*)pBlock);
|
||||
|
@ -233,11 +233,11 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S
|
|||
}
|
||||
|
||||
int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, int32_t code) {
|
||||
qDebug("s-task:%s receive dispatch rsp, status:%d code:%d", pTask->id.idStr, pRsp->inputStatus, code);
|
||||
qDebug("s-task:%s receive dispatch rsp, output status:%d code:%d", pTask->id.idStr, pRsp->inputStatus, code);
|
||||
|
||||
if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
||||
int32_t leftRsp = atomic_sub_fetch_32(&pTask->shuffleDispatcher.waitingRspCnt, 1);
|
||||
qDebug("task %d is shuffle, left waiting rsp %d", pTask->id.taskId, leftRsp);
|
||||
qDebug("s-task:%s is shuffle, left waiting rsp %d", pTask->id.idStr, leftRsp);
|
||||
if (leftRsp > 0) {
|
||||
return 0;
|
||||
}
|
||||
|
@ -251,6 +251,10 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
|
|||
if (pRsp->inputStatus == TASK_INPUT_STATUS__BLOCKED) {
|
||||
// TODO: init recover timer
|
||||
qError("s-task:%s inputQ of downstream task:0x%x is full, need to block output", pTask->id.idStr, pRsp->downstreamTaskId);
|
||||
|
||||
atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL);
|
||||
qError("s-task:%s ignore error, and reset task output status:%d", pTask->id.idStr, pTask->outputStatus);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
|
@ -510,14 +510,17 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
|
|||
int8_t old =
|
||||
atomic_val_compare_exchange_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL, TASK_OUTPUT_STATUS__WAIT);
|
||||
if (old != TASK_OUTPUT_STATUS__NORMAL) {
|
||||
qDebug("s-task:%s task wait for dispatch rsp, not dispatch now", pTask->id.idStr);
|
||||
qDebug("s-task:%s task wait for dispatch rsp, not dispatch now, output status:%d", pTask->id.idStr, old);
|
||||
return 0;
|
||||
}
|
||||
|
||||
qDebug("s-task:%s start to dispatch msg, output status:%d", pTask->id.idStr, pTask->outputStatus);
|
||||
|
||||
SStreamDataBlock* pDispatchedBlock = streamQueueNextItem(pTask->outputQueue);
|
||||
if (pDispatchedBlock == NULL) {
|
||||
qDebug("s-task:%s stop dispatching since no output in output queue", pTask->id.idStr);
|
||||
atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL);
|
||||
qDebug("s-task:%s stop dispatching since no output in output queue, output status:%d", pTask->id.idStr,
|
||||
pTask->outputStatus);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -527,6 +530,7 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
|
|||
if (code != TSDB_CODE_SUCCESS) {
|
||||
streamQueueProcessFail(pTask->outputQueue);
|
||||
atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL);
|
||||
qDebug("s-task:%s failed to dispatch msg to downstream, output status:%d", pTask->id.idStr, pTask->outputStatus);
|
||||
}
|
||||
|
||||
// this block can be freed only when it has been pushed to down stream.
|
||||
|
|
|
@ -16,9 +16,9 @@
|
|||
#include "streamInc.h"
|
||||
|
||||
// maximum allowed processed block batches. One block may include several submit blocks
|
||||
#define MAX_STREAM_EXEC_BATCH_NUM 128
|
||||
#define MIN_STREAM_EXEC_BATCH_NUM 16
|
||||
#define MAX_STREAM_RESULT_DUMP_THRESHOLD 1000
|
||||
#define MAX_STREAM_EXEC_BATCH_NUM 32
|
||||
#define MIN_STREAM_EXEC_BATCH_NUM 8
|
||||
#define MAX_STREAM_RESULT_DUMP_THRESHOLD 100
|
||||
|
||||
static int32_t updateCheckPointInfo (SStreamTask* pTask);
|
||||
|
||||
|
@ -385,7 +385,7 @@ int32_t streamExecForAll(SStreamTask* pTask) {
|
|||
|
||||
if (pTask->taskLevel == TASK_LEVEL__SINK) {
|
||||
ASSERT(pInput->type == STREAM_INPUT__DATA_BLOCK);
|
||||
qDebug("s-task:%s sink node start to sink result. numOfBlocks:%d", pTask->id.idStr, batchSize);
|
||||
qDebug("s-task:%s sink task start to sink %d blocks", pTask->id.idStr, batchSize);
|
||||
streamTaskOutputResultBlock(pTask, (SStreamDataBlock*)pInput);
|
||||
continue;
|
||||
}
|
||||
|
|
|
@ -497,6 +497,7 @@
|
|||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/db.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqError.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/schema.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/stbFilterWhere.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/stbFilter.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqCheckData.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqCheckData1.py
|
||||
|
|
|
@ -108,4 +108,15 @@ if $rows != 6 then
|
|||
return -1
|
||||
endi
|
||||
|
||||
sql create topic topic_stable_1 as stable stb where t1 > 0
|
||||
sql create topic topic_stable_2 as stable stb where t1 > 0 and t1 < 0
|
||||
sql create topic topic_stable_3 as stable stb where 1 > 0
|
||||
sql create topic topic_stable_4 as stable stb where abs(t1) > 0
|
||||
sql_error create topic topic_stable_5 as stable stb where last(t1) > 0
|
||||
sql_error create topic topic_stable_5 as stable stb where sum(t1) > 0
|
||||
sql create topic topic_stable_6 as stable stb where tbname is not null
|
||||
sql create topic topic_stable_7 as stable stb where tbname > 'a'
|
||||
sql_error create topic topic_stable_8 as stable stb where tbname > 0 and xx < 0
|
||||
sql_error create topic topic_stable_9 as stable stb where tbname > 0 and c1 < 0
|
||||
|
||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||
|
|
|
@ -0,0 +1,227 @@
|
|||
|
||||
import taos
|
||||
import sys
|
||||
import time
|
||||
import socket
|
||||
import os
|
||||
import threading
|
||||
|
||||
from util.log import *
|
||||
from util.sql import *
|
||||
from util.cases import *
|
||||
from util.dnodes import *
|
||||
from util.common import *
|
||||
sys.path.append("./7-tmq")
|
||||
from tmqCommon import *
|
||||
|
||||
class TDTestCase:
|
||||
def init(self, conn, logSql, replicaVar=1):
|
||||
self.replicaVar = int(replicaVar)
|
||||
tdLog.debug(f"start to excute {__file__}")
|
||||
tdSql.init(conn.cursor())
|
||||
#tdSql.init(conn.cursor(), logSql) # output sql.txt file
|
||||
|
||||
def prepareTestEnv(self):
|
||||
tdLog.printNoPrefix("======== prepare test env include database, stable, ctables, and insert data: ")
|
||||
paraDict = {'dbName': 'dbt',
|
||||
'dropFlag': 1,
|
||||
'event': '',
|
||||
'vgroups': 4,
|
||||
'replica': 1,
|
||||
'stbName': 'stb',
|
||||
'colPrefix': 'c',
|
||||
'tagPrefix': 't',
|
||||
'colSchema': [{'type': 'INT', 'count':2}, {'type': 'binary', 'len':20, 'count':1}],
|
||||
'tagSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}],
|
||||
'ctbPrefix': 'ctb',
|
||||
'ctbStartIdx': 0,
|
||||
'ctbNum': 10,
|
||||
'rowsPerTbl': 10000,
|
||||
'batchNum': 100,
|
||||
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
||||
'pollDelay': 2,
|
||||
'showMsg': 1,
|
||||
'showRow': 1}
|
||||
|
||||
tmqCom.initConsumerTable()
|
||||
tmqCom.create_database(tsql=tdSql, dbName=paraDict["dbName"],dropFlag=paraDict["dropFlag"], vgroups=paraDict['vgroups'],replica=paraDict['replica'])
|
||||
tdSql.execute("alter database %s wal_retention_period 3600"%(paraDict["dbName"]))
|
||||
tdLog.info("create stb")
|
||||
tmqCom.create_stable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"])
|
||||
tdLog.info("create ctb")
|
||||
tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict['ctbPrefix'], ctbNum=paraDict['ctbNum'])
|
||||
tdLog.info("insert data")
|
||||
tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"],
|
||||
ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
|
||||
startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
|
||||
return
|
||||
|
||||
def tmqCase_columnError(self, topicName, condition):
|
||||
tdLog.printNoPrefix("======== test case error: ")
|
||||
paraDict = {'dbName': 'dbt',
|
||||
'dropFlag': 1,
|
||||
'event': '',
|
||||
'vgroups': 4,
|
||||
'replica': 1,
|
||||
'stbName': 'stb',
|
||||
'colPrefix': 'c',
|
||||
'tagPrefix': 't',
|
||||
'colSchema': [{'type': 'INT', 'count':2}, {'type': 'binary', 'len':20, 'count':1}],
|
||||
'tagSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}],
|
||||
'ctbPrefix': 'ctb',
|
||||
'ctbStartIdx': 0,
|
||||
'ctbNum': 10,
|
||||
'rowsPerTbl': 10000,
|
||||
'batchNum': 100,
|
||||
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
||||
'pollDelay': 2,
|
||||
'showMsg': 1,
|
||||
'showRow': 1}
|
||||
|
||||
tdLog.info("create topics from stb with column filter")
|
||||
topicString = "create topic %s as stable %s.%s where %s" %(topicName, paraDict['dbName'], paraDict['stbName'], condition)
|
||||
tdLog.info("create topic sql: %s"%topicString)
|
||||
tdSql.error(topicString)
|
||||
|
||||
def tmqCase(self, topicName, condition):
|
||||
tdLog.printNoPrefix("======== test case: ")
|
||||
paraDict = {'dbName': 'dbt',
|
||||
'dropFlag': 1,
|
||||
'event': '',
|
||||
'vgroups': 4,
|
||||
'replica': 1,
|
||||
'stbName': 'stb',
|
||||
'colPrefix': 'c',
|
||||
'tagPrefix': 't',
|
||||
'colSchema': [{'type': 'INT', 'count':2}, {'type': 'binary', 'len':20, 'count':1}],
|
||||
'tagSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}],
|
||||
'ctbPrefix': 'ctb',
|
||||
'ctbStartIdx': 0,
|
||||
'ctbNum': 10,
|
||||
'rowsPerTbl': 10000,
|
||||
'batchNum': 100,
|
||||
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
||||
'pollDelay': 2,
|
||||
'showMsg': 1,
|
||||
'showRow': 1}
|
||||
|
||||
expectRowsList = []
|
||||
tmqCom.initConsumerTable()
|
||||
|
||||
tdLog.info("create topics from stb with tag filter")
|
||||
topicString = "create topic %s as stable %s.%s where %s" %(topicName, paraDict['dbName'], paraDict['stbName'], condition)
|
||||
tdLog.info("create topic sql: %s"%topicString)
|
||||
tdSql.execute(topicString)
|
||||
|
||||
queryString = "select * from %s.%s where %s" %(paraDict['dbName'], paraDict['stbName'], condition)
|
||||
tdSql.query(queryString)
|
||||
expectRowsList.append(tdSql.getRows())
|
||||
|
||||
# init consume info, and start tmq_sim, then check consume result
|
||||
tdLog.info("insert consume info to consume processor")
|
||||
consumerId = 0
|
||||
expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"]
|
||||
topicList = topicName
|
||||
ifcheckdata = 0
|
||||
ifManualCommit = 1
|
||||
keyList = 'group.id:cgrp1, enable.auto.commit:false, auto.commit.interval.ms:6000, auto.offset.reset:earliest'
|
||||
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
|
||||
|
||||
tdLog.info("start consume processor")
|
||||
tmqCom.startTmqSimProcess(paraDict['pollDelay'],paraDict["dbName"],paraDict['showMsg'], paraDict['showRow'])
|
||||
|
||||
tdLog.info("wait the consume result")
|
||||
expectRows = 1
|
||||
resultList = tmqCom.selectConsumeResult(expectRows)
|
||||
|
||||
if expectRowsList[0] != resultList[0]:
|
||||
tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[0], resultList[0]))
|
||||
tdLog.exit("0 tmq consume rows error!")
|
||||
|
||||
tdLog.printNoPrefix("======== test case end ...... ")
|
||||
|
||||
def tmqCase_addNewTable_dropTag(self, topicName, condition):
|
||||
tdLog.printNoPrefix("======== test case1: ")
|
||||
paraDict = {'dbName': 'dbt',
|
||||
'dropFlag': 1,
|
||||
'event': '',
|
||||
'vgroups': 4,
|
||||
'replica': 1,
|
||||
'stbName': 'stb',
|
||||
'colPrefix': 'c',
|
||||
'tagPrefix': 't',
|
||||
'colSchema': [{'type': 'INT', 'count':2}, {'type': 'binary', 'len':20, 'count':1}],
|
||||
'tagSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}],
|
||||
'ctbPrefix': 'ctb',
|
||||
'ctbStartIdx': 0,
|
||||
'ctbNum': 10,
|
||||
'rowsPerTbl': 10000,
|
||||
'batchNum': 100,
|
||||
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
||||
'pollDelay': 2,
|
||||
'showMsg': 1,
|
||||
'showRow': 1}
|
||||
|
||||
expectRowsList = []
|
||||
tmqCom.initConsumerTable()
|
||||
|
||||
tdLog.info("create topics from stb with tag filter")
|
||||
topicString = "create topic %s as stable %s.%s where %s" %(topicName, paraDict['dbName'], paraDict['stbName'], condition)
|
||||
tdLog.info("create topic sql: %s"%topicString)
|
||||
tdSql.execute(topicString)
|
||||
|
||||
queryString = "select * from %s.%s where %s" %(paraDict['dbName'], paraDict['stbName'], condition)
|
||||
tdSql.query(queryString)
|
||||
expectRowsList.append(tdSql.getRows() + 1)
|
||||
|
||||
# init consume info, and start tmq_sim, then check consume result
|
||||
tdLog.info("insert consume info to consume processor")
|
||||
consumerId = 0
|
||||
expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"]
|
||||
topicList = topicName
|
||||
ifcheckdata = 0
|
||||
ifManualCommit = 1
|
||||
keyList = 'group.id:cgrp1, enable.auto.commit:false, auto.commit.interval.ms:6000, auto.offset.reset:earliest'
|
||||
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
|
||||
|
||||
tdLog.info("start consume processor")
|
||||
tmqCom.startTmqSimProcess(paraDict['pollDelay'],paraDict["dbName"],paraDict['showMsg'], paraDict['showRow'])
|
||||
|
||||
#add new table with one data
|
||||
tdLog.info("start insert data")
|
||||
insertString = "insert into %s.tmp using %s.%s tags(1, 1, 1, 't4', 't5') values(now, 1, 1, 1, 'c4', 'c5', now)" %(paraDict['dbName'], paraDict['dbName'], paraDict['stbName'])
|
||||
tdSql.execute(insertString)
|
||||
|
||||
#test drop tag
|
||||
tdSql.error("alter stable %s.%s drop tag t1" %(paraDict['dbName'], paraDict['stbName']))
|
||||
tdSql.execute("alter stable %s.%s drop tag t2" %(paraDict['dbName'], paraDict['stbName']))
|
||||
tdSql.execute("alter stable %s.%s drop column c2" %(paraDict['dbName'], paraDict['stbName']))
|
||||
|
||||
tdLog.info("wait the consume result")
|
||||
expectRows = 1
|
||||
resultList = tmqCom.selectConsumeResult(expectRows)
|
||||
|
||||
if expectRowsList[0] != resultList[0]:
|
||||
tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[0], resultList[0]))
|
||||
tdLog.exit("0 tmq consume rows error!")
|
||||
|
||||
tdLog.printNoPrefix("======== test case1 end ...... ")
|
||||
|
||||
def run(self):
|
||||
tdSql.prepare()
|
||||
self.prepareTestEnv()
|
||||
self.tmqCase_columnError("t1", "c1 = 4 and t1 = 3")
|
||||
self.tmqCase("t2", "2 > 1")
|
||||
self.tmqCase("t3", "t4 = 'beijing'")
|
||||
self.tmqCase("t4", "t4 > t3")
|
||||
self.tmqCase("t5", "t3 = t4")
|
||||
self.tmqCase_addNewTable_dropTag("t6", "t1 = 1")
|
||||
|
||||
def stop(self):
|
||||
tdSql.close()
|
||||
tdLog.success(f"{__file__} successfully executed")
|
||||
|
||||
event = threading.Event()
|
||||
|
||||
tdCases.addLinux(__file__, TDTestCase())
|
||||
tdCases.addWindows(__file__, TDTestCase())
|
Loading…
Reference in New Issue