feat:add tag filter for stable subscribe

This commit is contained in:
wangmm0220 2023-05-24 18:59:47 +08:00
parent 8537e4e80e
commit 357e86b994
3 changed files with 12 additions and 26 deletions

View File

@ -1945,11 +1945,8 @@ typedef struct {
int8_t withMeta;
char* sql;
char subDbName[TSDB_DB_FNAME_LEN];
union {
char* ast;
char subStbName[TSDB_TABLE_FNAME_LEN];
};
char* subStbFilterAst;
char* ast;
char subStbName[TSDB_TABLE_FNAME_LEN];
} SCMCreateTopicReq;
int32_t tSerializeSCMCreateTopicReq(void* buf, int32_t bufLen, const SCMCreateTopicReq* pReq);

View File

@ -3832,19 +3832,16 @@ int32_t tSerializeSCMCreateTopicReq(void *buf, int32_t bufLen, const SCMCreateTo
if (tEncodeI8(&encoder, pReq->withMeta) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->subDbName) < 0) return -1;
if (TOPIC_SUB_TYPE__DB == pReq->subType) {
} else if (TOPIC_SUB_TYPE__TABLE == pReq->subType) {
if (tEncodeCStr(&encoder, pReq->subStbName) < 0) return -1;
} else {
if (TOPIC_SUB_TYPE__TABLE == pReq->subType) {
if (tEncodeCStr(&encoder, pReq->subStbName) < 0) return -1;
}
if (tEncodeI32(&encoder, strlen(pReq->ast)) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->ast) < 0) return -1;
}
if (tEncodeI32(&encoder, strlen(pReq->sql)) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->sql) < 0) return -1;
if (TOPIC_SUB_TYPE__TABLE == pReq->subType) {
if (tEncodeI32(&encoder, strlen(pReq->subStbFilterAst)) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->subStbFilterAst) < 0) return -1;
}
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
@ -3866,9 +3863,10 @@ int32_t tDeserializeSCMCreateTopicReq(void *buf, int32_t bufLen, SCMCreateTopicR
if (tDecodeI8(&decoder, &pReq->withMeta) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->subDbName) < 0) return -1;
if (TOPIC_SUB_TYPE__DB == pReq->subType) {
} else if (TOPIC_SUB_TYPE__TABLE == pReq->subType) {
if (tDecodeCStrTo(&decoder, pReq->subStbName) < 0) return -1;
} else {
if (TOPIC_SUB_TYPE__TABLE == pReq->subType) {
if (tDecodeCStrTo(&decoder, pReq->subStbName) < 0) return -1;
}
if (tDecodeI32(&decoder, &astLen) < 0) return -1;
if (astLen > 0) {
pReq->ast = taosMemoryCalloc(1, astLen + 1);
@ -3883,15 +3881,6 @@ int32_t tDeserializeSCMCreateTopicReq(void *buf, int32_t bufLen, SCMCreateTopicR
if (tDecodeCStrTo(&decoder, pReq->sql) < 0) return -1;
}
if (TOPIC_SUB_TYPE__TABLE == pReq->subType) {
if (tDecodeI32(&decoder, &astLen) < 0) return -1;
if (astLen > 0) {
pReq->subStbFilterAst = taosMemoryCalloc(1, astLen + 1);
if (pReq->subStbFilterAst == NULL) return -1;
if (tDecodeCStrTo(&decoder, pReq->subStbFilterAst) < 0) return -1;
}
}
tEndDecode(&decoder);
tDecoderClear(&decoder);
@ -3900,10 +3889,8 @@ int32_t tDeserializeSCMCreateTopicReq(void *buf, int32_t bufLen, SCMCreateTopicR
void tFreeSCMCreateTopicReq(SCMCreateTopicReq *pReq) {
taosMemoryFreeClear(pReq->sql);
if (TOPIC_SUB_TYPE__COLUMN == pReq->subType) {
if (TOPIC_SUB_TYPE__DB != pReq->subType) {
taosMemoryFreeClear(pReq->ast);
}else if(TOPIC_SUB_TYPE__TABLE == pReq->subType) {
taosMemoryFreeClear(pReq->subStbFilterAst);
}
}

View File

@ -5764,7 +5764,9 @@ static int32_t buildCreateTopicReq(STranslateContext* pCxt, SCreateTopicStmt* pS
toName(pCxt->pParseCxt->acctId, pStmt->subDbName, pStmt->subSTbName, &name);
tNameGetFullDbName(&name, pReq->subDbName);
tNameExtractFullName(&name, pReq->subStbName);
code = nodesNodeToString(pStmt->pQuery, false, &pReq->subStbFilterAst, NULL);
if(pStmt->pQuery != NULL) {
code = nodesNodeToString(pStmt->pQuery, false, &pReq->ast, NULL);
}
} else if ('\0' != pStmt->subDbName[0]) {
pReq->subType = TOPIC_SUB_TYPE__DB;
tNameSetDbName(&name, pCxt->pParseCxt->acctId, pStmt->subDbName, strlen(pStmt->subDbName));