Merge pull request #13304 from taosdata/feature/topic_grammar

feat(tmq): create topic new grammar
This commit is contained in:
Liu Jicong 2022-05-31 18:30:21 +08:00 committed by GitHub
commit ce62042a47
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
29 changed files with 3023 additions and 2924 deletions

View File

@ -106,7 +106,7 @@ int32_t create_topic() {
} }
taos_free_result(pRes); taos_free_result(pRes);
/*pRes = taos_query(pConn, "create topic topic_ctb_column as abc1");*/ /*pRes = taos_query(pConn, "create topic topic_ctb_column as database abc1");*/
pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from st1"); pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from st1");
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("failed to create topic topic_ctb_column, reason:%s\n", taos_errstr(pRes)); printf("failed to create topic topic_ctb_column, reason:%s\n", taos_errstr(pRes));

View File

@ -1474,15 +1474,22 @@ typedef struct {
int64_t streamId; int64_t streamId;
} SMVCreateStreamRsp, SMSCreateStreamRsp; } SMVCreateStreamRsp, SMSCreateStreamRsp;
enum {
TOPIC_SUB_TYPE__DB = 1,
TOPIC_SUB_TYPE__TABLE,
TOPIC_SUB_TYPE__COLUMN,
};
typedef struct { typedef struct {
char name[TSDB_TOPIC_FNAME_LEN]; // accout.topic char name[TSDB_TOPIC_FNAME_LEN]; // accout.topic
int8_t igExists; int8_t igExists;
int8_t withTbName; int8_t subType;
int8_t withSchema;
int8_t withTag;
char* sql; char* sql;
char subDbName[TSDB_DB_FNAME_LEN];
union {
char* ast; char* ast;
char subscribeDbName[TSDB_DB_NAME_LEN]; char subStbName[TSDB_TABLE_FNAME_LEN];
};
} SCMCreateTopicReq; } SCMCreateTopicReq;
int32_t tSerializeSCMCreateTopicReq(void* buf, int32_t bufLen, const SCMCreateTopicReq* pReq); int32_t tSerializeSCMCreateTopicReq(void* buf, int32_t bufLen, const SCMCreateTopicReq* pReq);
@ -2146,11 +2153,6 @@ static FORCE_INLINE void* taosDecodeSMqMsg(void* buf, SMqHbMsg* pMsg) {
return buf; return buf;
} }
enum {
TOPIC_SUB_TYPE__DB = 1,
TOPIC_SUB_TYPE__TABLE,
};
typedef struct { typedef struct {
SMsgHead head; SMsgHead head;
int64_t leftForVer; int64_t leftForVer;
@ -2170,9 +2172,9 @@ typedef struct {
int64_t newConsumerId; int64_t newConsumerId;
char subKey[TSDB_SUBSCRIBE_KEY_LEN]; char subKey[TSDB_SUBSCRIBE_KEY_LEN];
int8_t subType; int8_t subType;
int8_t withTbName; // int8_t withTbName;
int8_t withSchema; // int8_t withSchema;
int8_t withTag; // int8_t withTag;
char* qmsg; char* qmsg;
} SMqRebVgReq; } SMqRebVgReq;
@ -2184,10 +2186,10 @@ static FORCE_INLINE int32_t tEncodeSMqRebVgReq(void** buf, const SMqRebVgReq* pR
tlen += taosEncodeFixedI64(buf, pReq->newConsumerId); tlen += taosEncodeFixedI64(buf, pReq->newConsumerId);
tlen += taosEncodeString(buf, pReq->subKey); tlen += taosEncodeString(buf, pReq->subKey);
tlen += taosEncodeFixedI8(buf, pReq->subType); tlen += taosEncodeFixedI8(buf, pReq->subType);
tlen += taosEncodeFixedI8(buf, pReq->withTbName); // tlen += taosEncodeFixedI8(buf, pReq->withTbName);
tlen += taosEncodeFixedI8(buf, pReq->withSchema); // tlen += taosEncodeFixedI8(buf, pReq->withSchema);
tlen += taosEncodeFixedI8(buf, pReq->withTag); // tlen += taosEncodeFixedI8(buf, pReq->withTag);
if (pReq->subType == TOPIC_SUB_TYPE__TABLE) { if (pReq->subType == TOPIC_SUB_TYPE__COLUMN) {
tlen += taosEncodeString(buf, pReq->qmsg); tlen += taosEncodeString(buf, pReq->qmsg);
} }
return tlen; return tlen;
@ -2200,10 +2202,10 @@ static FORCE_INLINE void* tDecodeSMqRebVgReq(const void* buf, SMqRebVgReq* pReq)
buf = taosDecodeFixedI64(buf, &pReq->newConsumerId); buf = taosDecodeFixedI64(buf, &pReq->newConsumerId);
buf = taosDecodeStringTo(buf, pReq->subKey); buf = taosDecodeStringTo(buf, pReq->subKey);
buf = taosDecodeFixedI8(buf, &pReq->subType); buf = taosDecodeFixedI8(buf, &pReq->subType);
buf = taosDecodeFixedI8(buf, &pReq->withTbName); // buf = taosDecodeFixedI8(buf, &pReq->withTbName);
buf = taosDecodeFixedI8(buf, &pReq->withSchema); // buf = taosDecodeFixedI8(buf, &pReq->withSchema);
buf = taosDecodeFixedI8(buf, &pReq->withTag); // buf = taosDecodeFixedI8(buf, &pReq->withTag);
if (pReq->subType == TOPIC_SUB_TYPE__TABLE) { if (pReq->subType == TOPIC_SUB_TYPE__COLUMN) {
buf = taosDecodeString(buf, &pReq->qmsg); buf = taosDecodeString(buf, &pReq->qmsg);
} }
return (void*)buf; return (void*)buf;

View File

@ -163,97 +163,95 @@
#define TK_AS 145 #define TK_AS 145
#define TK_CONSUMER 146 #define TK_CONSUMER 146
#define TK_GROUP 147 #define TK_GROUP 147
#define TK_WITH 148 #define TK_DESC 148
#define TK_SCHEMA 149 #define TK_DESCRIBE 149
#define TK_DESC 150 #define TK_RESET 150
#define TK_DESCRIBE 151 #define TK_QUERY 151
#define TK_RESET 152 #define TK_CACHE 152
#define TK_QUERY 153 #define TK_EXPLAIN 153
#define TK_CACHE 154 #define TK_ANALYZE 154
#define TK_EXPLAIN 155 #define TK_VERBOSE 155
#define TK_ANALYZE 156 #define TK_NK_BOOL 156
#define TK_VERBOSE 157 #define TK_RATIO 157
#define TK_NK_BOOL 158 #define TK_COMPACT 158
#define TK_RATIO 159 #define TK_VNODES 159
#define TK_COMPACT 160 #define TK_IN 160
#define TK_VNODES 161 #define TK_OUTPUTTYPE 161
#define TK_IN 162 #define TK_AGGREGATE 162
#define TK_OUTPUTTYPE 163 #define TK_BUFSIZE 163
#define TK_AGGREGATE 164 #define TK_STREAM 164
#define TK_BUFSIZE 165 #define TK_INTO 165
#define TK_STREAM 166 #define TK_TRIGGER 166
#define TK_INTO 167 #define TK_AT_ONCE 167
#define TK_TRIGGER 168 #define TK_WINDOW_CLOSE 168
#define TK_AT_ONCE 169 #define TK_WATERMARK 169
#define TK_WINDOW_CLOSE 170 #define TK_KILL 170
#define TK_WATERMARK 171 #define TK_CONNECTION 171
#define TK_KILL 172 #define TK_TRANSACTION 172
#define TK_CONNECTION 173 #define TK_MERGE 173
#define TK_TRANSACTION 174 #define TK_VGROUP 174
#define TK_MERGE 175 #define TK_REDISTRIBUTE 175
#define TK_VGROUP 176 #define TK_SPLIT 176
#define TK_REDISTRIBUTE 177 #define TK_SYNCDB 177
#define TK_SPLIT 178 #define TK_NULL 178
#define TK_SYNCDB 179 #define TK_NK_QUESTION 179
#define TK_NULL 180 #define TK_NK_ARROW 180
#define TK_NK_QUESTION 181 #define TK_ROWTS 181
#define TK_NK_ARROW 182 #define TK_TBNAME 182
#define TK_ROWTS 183 #define TK_QSTARTTS 183
#define TK_TBNAME 184 #define TK_QENDTS 184
#define TK_QSTARTTS 185 #define TK_WSTARTTS 185
#define TK_QENDTS 186 #define TK_WENDTS 186
#define TK_WSTARTTS 187 #define TK_WDURATION 187
#define TK_WENDTS 188 #define TK_CAST 188
#define TK_WDURATION 189 #define TK_NOW 189
#define TK_CAST 190 #define TK_TODAY 190
#define TK_NOW 191 #define TK_TIMEZONE 191
#define TK_TODAY 192 #define TK_COUNT 192
#define TK_TIMEZONE 193 #define TK_FIRST 193
#define TK_COUNT 194 #define TK_LAST 194
#define TK_FIRST 195 #define TK_LAST_ROW 195
#define TK_LAST 196 #define TK_BETWEEN 196
#define TK_LAST_ROW 197 #define TK_IS 197
#define TK_BETWEEN 198 #define TK_NK_LT 198
#define TK_IS 199 #define TK_NK_GT 199
#define TK_NK_LT 200 #define TK_NK_LE 200
#define TK_NK_GT 201 #define TK_NK_GE 201
#define TK_NK_LE 202 #define TK_NK_NE 202
#define TK_NK_GE 203 #define TK_MATCH 203
#define TK_NK_NE 204 #define TK_NMATCH 204
#define TK_MATCH 205 #define TK_CONTAINS 205
#define TK_NMATCH 206 #define TK_JOIN 206
#define TK_CONTAINS 207 #define TK_INNER 207
#define TK_JOIN 208 #define TK_SELECT 208
#define TK_INNER 209 #define TK_DISTINCT 209
#define TK_SELECT 210 #define TK_WHERE 210
#define TK_DISTINCT 211 #define TK_PARTITION 211
#define TK_WHERE 212 #define TK_BY 212
#define TK_PARTITION 213 #define TK_SESSION 213
#define TK_BY 214 #define TK_STATE_WINDOW 214
#define TK_SESSION 215 #define TK_SLIDING 215
#define TK_STATE_WINDOW 216 #define TK_FILL 216
#define TK_SLIDING 217 #define TK_VALUE 217
#define TK_FILL 218 #define TK_NONE 218
#define TK_VALUE 219 #define TK_PREV 219
#define TK_NONE 220 #define TK_LINEAR 220
#define TK_PREV 221 #define TK_NEXT 221
#define TK_LINEAR 222 #define TK_HAVING 222
#define TK_NEXT 223 #define TK_ORDER 223
#define TK_HAVING 224 #define TK_SLIMIT 224
#define TK_ORDER 225 #define TK_SOFFSET 225
#define TK_SLIMIT 226 #define TK_LIMIT 226
#define TK_SOFFSET 227 #define TK_OFFSET 227
#define TK_LIMIT 228 #define TK_ASC 228
#define TK_OFFSET 229 #define TK_NULLS 229
#define TK_ASC 230 #define TK_ID 230
#define TK_NULLS 231 #define TK_NK_BITNOT 231
#define TK_ID 232 #define TK_INSERT 232
#define TK_NK_BITNOT 233 #define TK_VALUES 233
#define TK_INSERT 234 #define TK_IMPORT 234
#define TK_VALUES 235 #define TK_NK_SEMI 235
#define TK_IMPORT 236 #define TK_FILE 236
#define TK_NK_SEMI 237
#define TK_FILE 238
#define TK_NK_SPACE 300 #define TK_NK_SPACE 300
#define TK_NK_COMMENT 301 #define TK_NK_COMMENT 301

View File

@ -238,20 +238,13 @@ typedef struct SDropComponentNodeStmt {
int32_t dnodeId; int32_t dnodeId;
} SDropComponentNodeStmt; } SDropComponentNodeStmt;
typedef struct STopicOptions {
ENodeType type;
bool withTable;
bool withSchema;
bool withTag;
} STopicOptions;
typedef struct SCreateTopicStmt { typedef struct SCreateTopicStmt {
ENodeType type; ENodeType type;
char topicName[TSDB_TABLE_NAME_LEN]; char topicName[TSDB_TABLE_NAME_LEN];
char subscribeDbName[TSDB_DB_NAME_LEN]; char subDbName[TSDB_DB_NAME_LEN];
char subSTbName[TSDB_TABLE_NAME_LEN];
bool ignoreExists; bool ignoreExists;
SNode* pQuery; SNode* pQuery;
STopicOptions* pOptions;
} SCreateTopicStmt; } SCreateTopicStmt;
typedef struct SDropTopicStmt { typedef struct SDropTopicStmt {

View File

@ -95,7 +95,6 @@ typedef enum ENodeType {
QUERY_NODE_INDEX_OPTIONS, QUERY_NODE_INDEX_OPTIONS,
QUERY_NODE_EXPLAIN_OPTIONS, QUERY_NODE_EXPLAIN_OPTIONS,
QUERY_NODE_STREAM_OPTIONS, QUERY_NODE_STREAM_OPTIONS,
QUERY_NODE_TOPIC_OPTIONS,
QUERY_NODE_LEFT_VALUE, QUERY_NODE_LEFT_VALUE,
// Statement nodes are used in parser and planner module. // Statement nodes are used in parser and planner module.

View File

@ -2668,25 +2668,23 @@ int32_t tDeserializeSMDropCgroupReq(void *buf, int32_t bufLen, SMDropCgroupReq *
} }
int32_t tSerializeSCMCreateTopicReq(void *buf, int32_t bufLen, const SCMCreateTopicReq *pReq) { int32_t tSerializeSCMCreateTopicReq(void *buf, int32_t bufLen, const SCMCreateTopicReq *pReq) {
int32_t sqlLen = 0;
int32_t astLen = 0;
if (pReq->sql != NULL) sqlLen = (int32_t)strlen(pReq->sql);
if (pReq->ast != NULL) astLen = (int32_t)strlen(pReq->ast);
SEncoder encoder = {0}; SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen); tEncoderInit(&encoder, buf, bufLen);
if (tStartEncode(&encoder) < 0) return -1; if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->name) < 0) return -1; if (tEncodeCStr(&encoder, pReq->name) < 0) return -1;
if (tEncodeI8(&encoder, pReq->igExists) < 0) return -1; if (tEncodeI8(&encoder, pReq->igExists) < 0) return -1;
if (tEncodeI8(&encoder, pReq->withTbName) < 0) return -1; if (tEncodeI8(&encoder, pReq->subType) < 0) return -1;
if (tEncodeI8(&encoder, pReq->withSchema) < 0) return -1; if (tEncodeCStr(&encoder, pReq->subDbName) < 0) return -1;
if (tEncodeI8(&encoder, pReq->withTag) < 0) return -1; if (TOPIC_SUB_TYPE__DB == pReq->subType) {
if (tEncodeCStr(&encoder, pReq->subscribeDbName) < 0) return -1; } else if (TOPIC_SUB_TYPE__TABLE == pReq->subType) {
if (tEncodeI32(&encoder, sqlLen) < 0) return -1; if (tEncodeCStr(&encoder, pReq->subStbName) < 0) return -1;
if (tEncodeI32(&encoder, astLen) < 0) return -1; } else {
if (sqlLen > 0 && tEncodeCStr(&encoder, pReq->sql) < 0) return -1; if (tEncodeI32(&encoder, strlen(pReq->ast)) < 0) return -1;
if (astLen > 0 && tEncodeCStr(&encoder, pReq->ast) < 0) return -1; if (tEncodeCStr(&encoder, pReq->ast) < 0) return -1;
}
if (tEncodeI32(&encoder, strlen(pReq->sql)) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->sql) < 0) return -1;
tEndEncode(&encoder); tEndEncode(&encoder);
@ -2705,24 +2703,24 @@ int32_t tDeserializeSCMCreateTopicReq(void *buf, int32_t bufLen, SCMCreateTopicR
if (tStartDecode(&decoder) < 0) return -1; if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->name) < 0) return -1; if (tDecodeCStrTo(&decoder, pReq->name) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->igExists) < 0) return -1; if (tDecodeI8(&decoder, &pReq->igExists) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->withTbName) < 0) return -1; if (tDecodeI8(&decoder, &pReq->subType) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->withSchema) < 0) return -1; if (tDecodeCStrTo(&decoder, pReq->subDbName) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->withTag) < 0) return -1; if (TOPIC_SUB_TYPE__DB == pReq->subType) {
if (tDecodeCStrTo(&decoder, pReq->subscribeDbName) < 0) return -1; } else if (TOPIC_SUB_TYPE__TABLE == pReq->subType) {
if (tDecodeI32(&decoder, &sqlLen) < 0) return -1; if (tDecodeCStrTo(&decoder, pReq->subStbName) < 0) return -1;
} else {
if (tDecodeI32(&decoder, &astLen) < 0) return -1; if (tDecodeI32(&decoder, &astLen) < 0) return -1;
if (sqlLen > 0) {
pReq->sql = taosMemoryCalloc(1, sqlLen + 1);
if (pReq->sql == NULL) return -1;
if (tDecodeCStrTo(&decoder, pReq->sql) < 0) return -1;
}
if (astLen > 0) { if (astLen > 0) {
pReq->ast = taosMemoryCalloc(1, astLen + 1); pReq->ast = taosMemoryCalloc(1, astLen + 1);
if (pReq->ast == NULL) return -1; if (pReq->ast == NULL) return -1;
if (tDecodeCStrTo(&decoder, pReq->ast) < 0) return -1; if (tDecodeCStrTo(&decoder, pReq->ast) < 0) return -1;
} else { }
}
if (tDecodeI32(&decoder, &sqlLen) < 0) return -1;
if (sqlLen > 0) {
pReq->sql = taosMemoryCalloc(1, sqlLen + 1);
if (pReq->sql == NULL) return -1;
if (tDecodeCStrTo(&decoder, pReq->sql) < 0) return -1;
} }
tEndDecode(&decoder); tEndDecode(&decoder);
@ -2733,7 +2731,9 @@ int32_t tDeserializeSCMCreateTopicReq(void *buf, int32_t bufLen, SCMCreateTopicR
void tFreeSCMCreateTopicReq(SCMCreateTopicReq *pReq) { void tFreeSCMCreateTopicReq(SCMCreateTopicReq *pReq) {
taosMemoryFreeClear(pReq->sql); taosMemoryFreeClear(pReq->sql);
if (TOPIC_SUB_TYPE__COLUMN == pReq->subType) {
taosMemoryFreeClear(pReq->ast); taosMemoryFreeClear(pReq->ast);
}
} }
int32_t tSerializeSCMCreateTopicRsp(void *buf, int32_t bufLen, const SCMCreateTopicRsp *pRsp) { int32_t tSerializeSCMCreateTopicRsp(void *buf, int32_t bufLen, const SCMCreateTopicRsp *pRsp) {

View File

@ -459,10 +459,10 @@ typedef struct {
int64_t uid; int64_t uid;
int64_t dbUid; int64_t dbUid;
int32_t version; int32_t version;
int8_t subType; // db or table int8_t subType; // column, db or stable
int8_t withTbName; // int8_t withTbName;
int8_t withSchema; // int8_t withSchema;
int8_t withTag; // int8_t withTag;
SRWLatch lock; SRWLatch lock;
int32_t consumerCnt; int32_t consumerCnt;
int32_t sqlLen; int32_t sqlLen;
@ -530,9 +530,9 @@ typedef struct {
int64_t dbUid; int64_t dbUid;
int32_t vgNum; int32_t vgNum;
int8_t subType; int8_t subType;
int8_t withTbName; // int8_t withTbName;
int8_t withSchema; // int8_t withSchema;
int8_t withTag; // int8_t withTag;
SHashObj* consumerHash; // consumerId -> SMqConsumerEp SHashObj* consumerHash; // consumerId -> SMqConsumerEp
SArray* unassignedVgs; // SArray<SMqVgEp*> SArray* unassignedVgs; // SArray<SMqVgEp*>
} SMqSubscribeObj; } SMqSubscribeObj;

View File

@ -396,9 +396,9 @@ SMqSubscribeObj *tCloneSubscribeObj(const SMqSubscribeObj *pSub) {
pSubNew->dbUid = pSub->dbUid; pSubNew->dbUid = pSub->dbUid;
pSubNew->subType = pSub->subType; pSubNew->subType = pSub->subType;
pSubNew->withTbName = pSub->withTbName; /*pSubNew->withTbName = pSub->withTbName;*/
pSubNew->withSchema = pSub->withSchema; /*pSubNew->withSchema = pSub->withSchema;*/
pSubNew->withTag = pSub->withTag; /*pSubNew->withTag = pSub->withTag;*/
pSubNew->vgNum = pSub->vgNum; pSubNew->vgNum = pSub->vgNum;
pSubNew->consumerHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); pSubNew->consumerHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
@ -431,9 +431,9 @@ int32_t tEncodeSubscribeObj(void **buf, const SMqSubscribeObj *pSub) {
tlen += taosEncodeFixedI64(buf, pSub->dbUid); tlen += taosEncodeFixedI64(buf, pSub->dbUid);
tlen += taosEncodeFixedI32(buf, pSub->vgNum); tlen += taosEncodeFixedI32(buf, pSub->vgNum);
tlen += taosEncodeFixedI8(buf, pSub->subType); tlen += taosEncodeFixedI8(buf, pSub->subType);
tlen += taosEncodeFixedI8(buf, pSub->withTbName); /*tlen += taosEncodeFixedI8(buf, pSub->withTbName);*/
tlen += taosEncodeFixedI8(buf, pSub->withSchema); /*tlen += taosEncodeFixedI8(buf, pSub->withSchema);*/
tlen += taosEncodeFixedI8(buf, pSub->withTag); /*tlen += taosEncodeFixedI8(buf, pSub->withTag);*/
void *pIter = NULL; void *pIter = NULL;
int32_t sz = taosHashGetSize(pSub->consumerHash); int32_t sz = taosHashGetSize(pSub->consumerHash);
@ -458,9 +458,9 @@ void *tDecodeSubscribeObj(const void *buf, SMqSubscribeObj *pSub) {
buf = taosDecodeFixedI64(buf, &pSub->dbUid); buf = taosDecodeFixedI64(buf, &pSub->dbUid);
buf = taosDecodeFixedI32(buf, &pSub->vgNum); buf = taosDecodeFixedI32(buf, &pSub->vgNum);
buf = taosDecodeFixedI8(buf, &pSub->subType); buf = taosDecodeFixedI8(buf, &pSub->subType);
buf = taosDecodeFixedI8(buf, &pSub->withTbName); /*buf = taosDecodeFixedI8(buf, &pSub->withTbName);*/
buf = taosDecodeFixedI8(buf, &pSub->withSchema); /*buf = taosDecodeFixedI8(buf, &pSub->withSchema);*/
buf = taosDecodeFixedI8(buf, &pSub->withTag); /*buf = taosDecodeFixedI8(buf, &pSub->withTag);*/
int32_t sz; int32_t sz;
buf = taosDecodeFixedI32(buf, &sz); buf = taosDecodeFixedI32(buf, &sz);

View File

@ -507,7 +507,7 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib
SQueryPlan* pPlan = NULL; SQueryPlan* pPlan = NULL;
SSubplan* plan = NULL; SSubplan* plan = NULL;
if (pTopic->subType == TOPIC_SUB_TYPE__TABLE) { if (pTopic->subType == TOPIC_SUB_TYPE__COLUMN) {
pPlan = qStringToQueryPlan(pTopic->physicalPlan); pPlan = qStringToQueryPlan(pTopic->physicalPlan);
if (pPlan == NULL) { if (pPlan == NULL) {
terrno = TSDB_CODE_QRY_INVALID_INPUT; terrno = TSDB_CODE_QRY_INVALID_INPUT;
@ -553,7 +553,7 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib
mDebug("init subscription %s, assign vg: %d", pSub->key, pVgEp->vgId); mDebug("init subscription %s, assign vg: %d", pSub->key, pVgEp->vgId);
if (pTopic->subType == TOPIC_SUB_TYPE__TABLE) { if (pTopic->subType == TOPIC_SUB_TYPE__COLUMN) {
int32_t msgLen; int32_t msgLen;
plan->execNode.epSet = pVgEp->epSet; plan->execNode.epSet = pVgEp->epSet;

View File

@ -93,9 +93,9 @@ static SMqSubscribeObj *mndCreateSub(SMnode *pMnode, const SMqTopicObj *pTopic,
} }
pSub->dbUid = pTopic->dbUid; pSub->dbUid = pTopic->dbUid;
pSub->subType = pTopic->subType; pSub->subType = pTopic->subType;
pSub->withTbName = pTopic->withTbName; /*pSub->withTbName = pTopic->withTbName;*/
pSub->withSchema = pTopic->withSchema; /*pSub->withSchema = pTopic->withSchema;*/
pSub->withTag = pTopic->withTag; /*pSub->withTag = pTopic->withTag;*/
ASSERT(pSub->unassignedVgs->size == 0); ASSERT(pSub->unassignedVgs->size == 0);
ASSERT(taosHashGetSize(pSub->consumerHash) == 0); ASSERT(taosHashGetSize(pSub->consumerHash) == 0);
@ -120,9 +120,9 @@ static int32_t mndBuildSubChangeReq(void **pBuf, int32_t *pLen, const SMqSubscri
req.vgId = pRebVg->pVgEp->vgId; req.vgId = pRebVg->pVgEp->vgId;
req.qmsg = pRebVg->pVgEp->qmsg; req.qmsg = pRebVg->pVgEp->qmsg;
req.subType = pSub->subType; req.subType = pSub->subType;
req.withTbName = pSub->withTbName; /*req.withTbName = pSub->withTbName;*/
req.withSchema = pSub->withSchema; /*req.withSchema = pSub->withSchema;*/
req.withTag = pSub->withTag; /*req.withTag = pSub->withTag;*/
strncpy(req.subKey, pSub->key, TSDB_SUBSCRIBE_KEY_LEN); strncpy(req.subKey, pSub->key, TSDB_SUBSCRIBE_KEY_LEN);
int32_t tlen = sizeof(SMsgHead) + tEncodeSMqRebVgReq(NULL, &req); int32_t tlen = sizeof(SMsgHead) + tEncodeSMqRebVgReq(NULL, &req);

View File

@ -96,9 +96,9 @@ SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) {
SDB_SET_INT64(pRaw, dataPos, pTopic->dbUid, TOPIC_ENCODE_OVER); SDB_SET_INT64(pRaw, dataPos, pTopic->dbUid, TOPIC_ENCODE_OVER);
SDB_SET_INT32(pRaw, dataPos, pTopic->version, TOPIC_ENCODE_OVER); SDB_SET_INT32(pRaw, dataPos, pTopic->version, TOPIC_ENCODE_OVER);
SDB_SET_INT8(pRaw, dataPos, pTopic->subType, TOPIC_ENCODE_OVER); SDB_SET_INT8(pRaw, dataPos, pTopic->subType, TOPIC_ENCODE_OVER);
SDB_SET_INT8(pRaw, dataPos, pTopic->withTbName, TOPIC_ENCODE_OVER); /*SDB_SET_INT8(pRaw, dataPos, pTopic->withTbName, TOPIC_ENCODE_OVER);*/
SDB_SET_INT8(pRaw, dataPos, pTopic->withSchema, TOPIC_ENCODE_OVER); /*SDB_SET_INT8(pRaw, dataPos, pTopic->withSchema, TOPIC_ENCODE_OVER);*/
SDB_SET_INT8(pRaw, dataPos, pTopic->withTag, TOPIC_ENCODE_OVER); /*SDB_SET_INT8(pRaw, dataPos, pTopic->withTag, TOPIC_ENCODE_OVER);*/
SDB_SET_INT32(pRaw, dataPos, pTopic->consumerCnt, TOPIC_ENCODE_OVER); SDB_SET_INT32(pRaw, dataPos, pTopic->consumerCnt, TOPIC_ENCODE_OVER);
SDB_SET_INT32(pRaw, dataPos, pTopic->sqlLen, TOPIC_ENCODE_OVER); SDB_SET_INT32(pRaw, dataPos, pTopic->sqlLen, TOPIC_ENCODE_OVER);
@ -168,9 +168,9 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) {
SDB_GET_INT64(pRaw, dataPos, &pTopic->dbUid, TOPIC_DECODE_OVER); SDB_GET_INT64(pRaw, dataPos, &pTopic->dbUid, TOPIC_DECODE_OVER);
SDB_GET_INT32(pRaw, dataPos, &pTopic->version, TOPIC_DECODE_OVER); SDB_GET_INT32(pRaw, dataPos, &pTopic->version, TOPIC_DECODE_OVER);
SDB_GET_INT8(pRaw, dataPos, &pTopic->subType, TOPIC_DECODE_OVER); SDB_GET_INT8(pRaw, dataPos, &pTopic->subType, TOPIC_DECODE_OVER);
SDB_GET_INT8(pRaw, dataPos, &pTopic->withTbName, TOPIC_DECODE_OVER); /*SDB_GET_INT8(pRaw, dataPos, &pTopic->withTbName, TOPIC_DECODE_OVER);*/
SDB_GET_INT8(pRaw, dataPos, &pTopic->withSchema, TOPIC_DECODE_OVER); /*SDB_GET_INT8(pRaw, dataPos, &pTopic->withSchema, TOPIC_DECODE_OVER);*/
SDB_GET_INT8(pRaw, dataPos, &pTopic->withTag, TOPIC_DECODE_OVER); /*SDB_GET_INT8(pRaw, dataPos, &pTopic->withTag, TOPIC_DECODE_OVER);*/
SDB_GET_INT32(pRaw, dataPos, &pTopic->consumerCnt, TOPIC_DECODE_OVER); SDB_GET_INT32(pRaw, dataPos, &pTopic->consumerCnt, TOPIC_DECODE_OVER);
@ -308,11 +308,19 @@ static SDDropTopicReq *mndBuildDropTopicMsg(SMnode *pMnode, SVgObj *pVgroup, SMq
} }
static int32_t mndCheckCreateTopicReq(SCMCreateTopicReq *pCreate) { static int32_t mndCheckCreateTopicReq(SCMCreateTopicReq *pCreate) {
if (pCreate->name[0] == 0 || pCreate->sql == NULL || pCreate->sql[0] == 0 || pCreate->subscribeDbName[0] == 0) {
terrno = TSDB_CODE_MND_INVALID_TOPIC; terrno = TSDB_CODE_MND_INVALID_TOPIC;
return -1;
if (pCreate->sql == NULL) return -1;
if (pCreate->subType == TOPIC_SUB_TYPE__COLUMN) {
if (pCreate->ast == NULL || pCreate->ast[0] == 0) return -1;
} else if (pCreate->subType == TOPIC_SUB_TYPE__TABLE) {
if (pCreate->subStbName[0] == 0) return -1;
} else if (pCreate->subType == TOPIC_SUB_TYPE__DB) {
if (pCreate->subDbName[0] == 0) return -1;
} }
terrno = TSDB_CODE_SUCCESS;
return 0; return 0;
} }
@ -328,14 +336,13 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
topicObj.version = 1; topicObj.version = 1;
topicObj.sql = strdup(pCreate->sql); topicObj.sql = strdup(pCreate->sql);
topicObj.sqlLen = strlen(pCreate->sql) + 1; topicObj.sqlLen = strlen(pCreate->sql) + 1;
/*topicObj.refConsumerCnt = 0;*/ topicObj.subType = pCreate->subType;
if (pCreate->ast && pCreate->ast[0]) { if (pCreate->subType == TOPIC_SUB_TYPE__COLUMN) {
topicObj.ast = strdup(pCreate->ast); topicObj.ast = strdup(pCreate->ast);
topicObj.astLen = strlen(pCreate->ast) + 1; topicObj.astLen = strlen(pCreate->ast) + 1;
topicObj.subType = TOPIC_SUB_TYPE__TABLE; /*topicObj.withTbName = pCreate->withTbName;*/
topicObj.withTbName = pCreate->withTbName; /*topicObj.withSchema = pCreate->withSchema;*/
topicObj.withSchema = pCreate->withSchema;
SNode *pAst = NULL; SNode *pAst = NULL;
if (nodesStringToNode(pCreate->ast, &pAst) != 0) { if (nodesStringToNode(pCreate->ast, &pAst) != 0) {
@ -368,13 +375,12 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
taosMemoryFree(topicObj.sql); taosMemoryFree(topicObj.sql);
return -1; return -1;
} }
} else { /*} else if (pCreate->subType == TOPIC_SUB_TYPE__DB) {*/
topicObj.ast = NULL; /*topicObj.ast = NULL;*/
topicObj.astLen = 0; /*topicObj.astLen = 0;*/
topicObj.physicalPlan = NULL; /*topicObj.physicalPlan = NULL;*/
topicObj.subType = TOPIC_SUB_TYPE__DB; /*topicObj.withTbName = 1;*/
topicObj.withTbName = 1; /*topicObj.withSchema = 1;*/
topicObj.withSchema = 1;
} }
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_TOPIC, pReq); STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_TOPIC, pReq);
@ -442,7 +448,7 @@ static int32_t mndProcessCreateTopicReq(SRpcMsg *pReq) {
goto CREATE_TOPIC_OVER; goto CREATE_TOPIC_OVER;
} }
pDb = mndAcquireDb(pMnode, createTopicReq.subscribeDbName); pDb = mndAcquireDb(pMnode, createTopicReq.subDbName);
if (pDb == NULL) { if (pDb == NULL) {
terrno = TSDB_CODE_MND_DB_NOT_SELECTED; terrno = TSDB_CODE_MND_DB_NOT_SELECTED;
goto CREATE_TOPIC_OVER; goto CREATE_TOPIC_OVER;

View File

@ -51,6 +51,7 @@ target_sources(
# tq # tq
"src/tq/tq.c" "src/tq/tq.c"
"src/tq/tqExec.c"
"src/tq/tqCommit.c" "src/tq/tqCommit.c"
"src/tq/tqOffset.c" "src/tq/tqOffset.c"
"src/tq/tqPush.c" "src/tq/tqPush.c"

View File

@ -44,21 +44,27 @@ extern "C" {
typedef struct STqOffsetCfg STqOffsetCfg; typedef struct STqOffsetCfg STqOffsetCfg;
typedef struct STqOffsetStore STqOffsetStore; typedef struct STqOffsetStore STqOffsetStore;
// tqRead
struct STqReadHandle { struct STqReadHandle {
int64_t ver; int64_t ver;
SHashObj* tbIdHash;
const SSubmitReq* pMsg; const SSubmitReq* pMsg;
SSubmitBlk* pBlock; SSubmitBlk* pBlock;
SSubmitMsgIter msgIter; SSubmitMsgIter msgIter;
SSubmitBlkIter blkIter; SSubmitBlkIter blkIter;
SMeta* pVnodeMeta; SMeta* pVnodeMeta;
SHashObj* tbIdHash;
SArray* pColIdList; // SArray<int16_t> SArray* pColIdList; // SArray<int16_t>
int32_t sver;
int32_t cachedSchemaVer;
int64_t cachedSchemaUid; int64_t cachedSchemaUid;
SSchemaWrapper* pSchemaWrapper; SSchemaWrapper* pSchemaWrapper;
STSchema* pSchema; STSchema* pSchema;
}; };
// tqPush
typedef struct { typedef struct {
int64_t consumerId; int64_t consumerId;
int32_t epoch; int32_t epoch;
@ -68,14 +74,15 @@ typedef struct {
SRpcMsg* handle; SRpcMsg* handle;
} STqPushHandle; } STqPushHandle;
#if 0
typedef struct { typedef struct {
char subKey[TSDB_SUBSCRIBE_KEY_LEN]; char subKey[TSDB_SUBSCRIBE_KEY_LEN];
int64_t consumerId; int64_t consumerId;
int32_t epoch; int32_t epoch;
int8_t subType; int8_t subType;
int8_t withTbName; // int8_t withTbName;
int8_t withSchema; // int8_t withSchema;
int8_t withTag; // int8_t withTag;
char* qmsg; char* qmsg;
SHashObj* pDropTbUid; SHashObj* pDropTbUid;
STqPushHandle pushHandle; STqPushHandle pushHandle;
@ -85,15 +92,55 @@ typedef struct {
STqReadHandle* pExecReader[5]; STqReadHandle* pExecReader[5];
qTaskInfo_t task[5]; qTaskInfo_t task[5];
} STqExec; } STqExec;
#endif
int32_t tEncodeSTqExec(SEncoder* pEncoder, const STqExec* pExec); // tqExec
int32_t tDecodeSTqExec(SDecoder* pDecoder, STqExec* pExec);
typedef struct {
char* qmsg;
qTaskInfo_t task[5];
} STqExecCol;
typedef struct {
int64_t suid;
} STqExecTb;
typedef struct {
SHashObj* pFilterOutTbUid;
} STqExecDb;
typedef struct {
int8_t subType;
STqReadHandle* pExecReader[5];
union {
STqExecCol execCol;
STqExecTb execTb;
STqExecDb execDb;
} exec;
} STqExecHandle;
typedef struct {
// info
char subKey[TSDB_SUBSCRIBE_KEY_LEN];
int64_t consumerId;
int32_t epoch;
// reader
SWalReadHandle* pWalReader;
// push
STqPushHandle pushHandle;
// exec
STqExecHandle execHandle;
} STqHandle;
struct STQ { struct STQ {
char* path; char* path;
SHashObj* pushMgr; // consumerId -> STqExec* SHashObj* pushMgr; // consumerId -> STqHandle*
SHashObj* execs; // subKey -> STqExec SHashObj* handles; // subKey -> STqHandle
SHashObj* pStreamTasks; SHashObj* pStreamTasks; // taksId -> SStreamTask
SVnode* pVnode; SVnode* pVnode;
SWal* pWal; SWal* pWal;
TDB* pMetaStore; TDB* pMetaStore;
@ -111,6 +158,16 @@ static STqMgmt tqMgmt = {0};
int tqInit(); int tqInit();
void tqCleanUp(); void tqCleanUp();
// int32_t tEncodeSTqExec(SEncoder* pEncoder, const STqExec* pExec);
// int32_t tDecodeSTqExec(SDecoder* pDecoder, STqExec* pExec);
int32_t tEncodeSTqHandle(SEncoder* pEncoder, const STqHandle* pHandle);
int32_t tDecodeSTqHandle(SDecoder* pDecoder, STqHandle* pHandle);
int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalHead** pHeadWithCkSum);
int32_t tqDataExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataBlkRsp* pRsp, int32_t workerId);
// tqOffset // tqOffset
STqOffsetStore* STqOffsetOpen(STqOffsetCfg*); STqOffsetStore* STqOffsetOpen(STqOffsetCfg*);
void STqOffsetClose(STqOffsetStore*); void STqOffsetClose(STqOffsetStore*);

View File

@ -51,10 +51,10 @@ int tqExecKeyCompare(const void* pKey1, int32_t kLen1, const void* pKey2, int32_
return strcmp(pKey1, pKey2); return strcmp(pKey1, pKey2);
} }
int32_t tqStoreExec(STQ* pTq, const char* key, const STqExec* pExec) { int32_t tqStoreHandle(STQ* pTq, const char* key, const STqHandle* pHandle) {
int32_t code; int32_t code;
int32_t vlen; int32_t vlen;
tEncodeSize(tEncodeSTqExec, pExec, vlen, code); tEncodeSize(tEncodeSTqHandle, pHandle, vlen, code);
ASSERT(code == 0); ASSERT(code == 0);
void* buf = taosMemoryCalloc(1, vlen); void* buf = taosMemoryCalloc(1, vlen);
@ -65,7 +65,7 @@ int32_t tqStoreExec(STQ* pTq, const char* key, const STqExec* pExec) {
SEncoder encoder; SEncoder encoder;
tEncoderInit(&encoder, buf, vlen); tEncoderInit(&encoder, buf, vlen);
if (tEncodeSTqExec(&encoder, pExec) < 0) { if (tEncodeSTqHandle(&encoder, pHandle) < 0) {
ASSERT(0); ASSERT(0);
} }
@ -102,7 +102,7 @@ STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal) {
pTq->pVnode = pVnode; pTq->pVnode = pVnode;
pTq->pWal = pWal; pTq->pWal = pWal;
pTq->execs = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK); pTq->handles = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
pTq->pStreamTasks = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); pTq->pStreamTasks = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
@ -112,7 +112,7 @@ STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal) {
ASSERT(0); ASSERT(0);
} }
if (tdbTbOpen("exec", -1, -1, tqExecKeyCompare, pTq->pMetaStore, &pTq->pExecStore) < 0) { if (tdbTbOpen("handles", -1, -1, tqExecKeyCompare, pTq->pMetaStore, &pTq->pExecStore) < 0) {
ASSERT(0); ASSERT(0);
} }
@ -122,10 +122,6 @@ STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal) {
ASSERT(0); ASSERT(0);
} }
/*if (tdbBegin(pTq->pMetaStore, &txn) < 0) {*/
/*ASSERT(0);*/
/*}*/
TBC* pCur; TBC* pCur;
if (tdbTbcOpen(pTq->pExecStore, &pCur, &txn) < 0) { if (tdbTbcOpen(pTq->pExecStore, &pCur, &txn) < 0) {
ASSERT(0); ASSERT(0);
@ -138,30 +134,31 @@ STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal) {
tdbTbcMoveToFirst(pCur); tdbTbcMoveToFirst(pCur);
SDecoder decoder; SDecoder decoder;
while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) {
STqExec exec;
tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
tDecodeSTqExec(&decoder, &exec);
exec.pWalReader = walOpenReadHandle(pTq->pVnode->pWal);
if (exec.subType == TOPIC_SUB_TYPE__TABLE) {
for (int32_t i = 0; i < 5; i++) {
exec.pExecReader[i] = tqInitSubmitMsgScanner(pTq->pVnode->pMeta);
SReadHandle handle = { while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) {
.reader = exec.pExecReader[i], STqHandle handle;
tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
tDecodeSTqHandle(&decoder, &handle);
handle.pWalReader = walOpenReadHandle(pTq->pVnode->pWal);
for (int32_t i = 0; i < 5; i++) {
handle.execHandle.pExecReader[i] = tqInitSubmitMsgScanner(pTq->pVnode->pMeta);
}
if (handle.execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
for (int32_t i = 0; i < 5; i++) {
SReadHandle reader = {
.reader = handle.execHandle.pExecReader[i],
.meta = pTq->pVnode->pMeta, .meta = pTq->pVnode->pMeta,
.pMsgCb = &pTq->pVnode->msgCb, .pMsgCb = &pTq->pVnode->msgCb,
}; };
exec.task[i] = qCreateStreamExecTaskInfo(exec.qmsg, &handle); handle.execHandle.exec.execCol.task[i] =
ASSERT(exec.task[i]); qCreateStreamExecTaskInfo(handle.execHandle.exec.execCol.qmsg, &reader);
ASSERT(handle.execHandle.exec.execCol.task[i]);
} }
} else { } else {
for (int32_t i = 0; i < 5; i++) { handle.execHandle.exec.execDb.pFilterOutTbUid =
exec.pExecReader[i] = tqInitSubmitMsgScanner(pTq->pVnode->pMeta); taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
} }
exec.pDropTbUid = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); taosHashPut(pTq->handles, pKey, kLen, &handle, sizeof(STqHandle));
}
taosHashPut(pTq->execs, pKey, kLen, &exec, sizeof(STqExec));
} }
if (tdbTxnClose(&txn) < 0) { if (tdbTxnClose(&txn) < 0) {
@ -174,7 +171,7 @@ STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal) {
void tqClose(STQ* pTq) { void tqClose(STQ* pTq) {
if (pTq) { if (pTq) {
taosMemoryFreeClear(pTq->path); taosMemoryFreeClear(pTq->path);
taosHashCleanup(pTq->execs); taosHashCleanup(pTq->handles);
taosHashCleanup(pTq->pStreamTasks); taosHashCleanup(pTq->pStreamTasks);
taosHashCleanup(pTq->pushMgr); taosHashCleanup(pTq->pushMgr);
tdbClose(pTq->pMetaStore); tdbClose(pTq->pMetaStore);
@ -183,16 +180,17 @@ void tqClose(STQ* pTq) {
// TODO // TODO
} }
#if 0
int32_t tEncodeSTqExec(SEncoder* pEncoder, const STqExec* pExec) { int32_t tEncodeSTqExec(SEncoder* pEncoder, const STqExec* pExec) {
if (tStartEncode(pEncoder) < 0) return -1; if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeCStr(pEncoder, pExec->subKey) < 0) return -1; if (tEncodeCStr(pEncoder, pExec->subKey) < 0) return -1;
if (tEncodeI64(pEncoder, pExec->consumerId) < 0) return -1; if (tEncodeI64(pEncoder, pExec->consumerId) < 0) return -1;
if (tEncodeI32(pEncoder, pExec->epoch) < 0) return -1; if (tEncodeI32(pEncoder, pExec->epoch) < 0) return -1;
if (tEncodeI8(pEncoder, pExec->subType) < 0) return -1; if (tEncodeI8(pEncoder, pExec->subType) < 0) return -1;
if (tEncodeI8(pEncoder, pExec->withTbName) < 0) return -1; /*if (tEncodeI8(pEncoder, pExec->withTbName) < 0) return -1;*/
if (tEncodeI8(pEncoder, pExec->withSchema) < 0) return -1; /*if (tEncodeI8(pEncoder, pExec->withSchema) < 0) return -1;*/
if (tEncodeI8(pEncoder, pExec->withTag) < 0) return -1; /*if (tEncodeI8(pEncoder, pExec->withTag) < 0) return -1;*/
if (pExec->subType == TOPIC_SUB_TYPE__TABLE) { if (pExec->subType == TOPIC_SUB_TYPE__COLUMN) {
if (tEncodeCStr(pEncoder, pExec->qmsg) < 0) return -1; if (tEncodeCStr(pEncoder, pExec->qmsg) < 0) return -1;
} }
tEndEncode(pEncoder); tEndEncode(pEncoder);
@ -205,34 +203,64 @@ int32_t tDecodeSTqExec(SDecoder* pDecoder, STqExec* pExec) {
if (tDecodeI64(pDecoder, &pExec->consumerId) < 0) return -1; if (tDecodeI64(pDecoder, &pExec->consumerId) < 0) return -1;
if (tDecodeI32(pDecoder, &pExec->epoch) < 0) return -1; if (tDecodeI32(pDecoder, &pExec->epoch) < 0) return -1;
if (tDecodeI8(pDecoder, &pExec->subType) < 0) return -1; if (tDecodeI8(pDecoder, &pExec->subType) < 0) return -1;
if (tDecodeI8(pDecoder, &pExec->withTbName) < 0) return -1; /*if (tDecodeI8(pDecoder, &pExec->withTbName) < 0) return -1;*/
if (tDecodeI8(pDecoder, &pExec->withSchema) < 0) return -1; /*if (tDecodeI8(pDecoder, &pExec->withSchema) < 0) return -1;*/
if (tDecodeI8(pDecoder, &pExec->withTag) < 0) return -1; /*if (tDecodeI8(pDecoder, &pExec->withTag) < 0) return -1;*/
if (pExec->subType == TOPIC_SUB_TYPE__TABLE) { if (pExec->subType == TOPIC_SUB_TYPE__COLUMN) {
if (tDecodeCStrAlloc(pDecoder, &pExec->qmsg) < 0) return -1; if (tDecodeCStrAlloc(pDecoder, &pExec->qmsg) < 0) return -1;
} }
tEndDecode(pDecoder); tEndDecode(pDecoder);
return 0; return 0;
} }
#endif
int32_t tEncodeSTqHandle(SEncoder* pEncoder, const STqHandle* pHandle) {
if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeCStr(pEncoder, pHandle->subKey) < 0) return -1;
if (tEncodeI64(pEncoder, pHandle->consumerId) < 0) return -1;
if (tEncodeI32(pEncoder, pHandle->epoch) < 0) return -1;
if (tEncodeI8(pEncoder, pHandle->execHandle.subType) < 0) return -1;
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
if (tEncodeCStr(pEncoder, pHandle->execHandle.exec.execCol.qmsg) < 0) return -1;
}
tEndEncode(pEncoder);
return pEncoder->pos;
}
int32_t tDecodeSTqHandle(SDecoder* pDecoder, STqHandle* pHandle) {
if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeCStrTo(pDecoder, pHandle->subKey) < 0) return -1;
if (tDecodeI64(pDecoder, &pHandle->consumerId) < 0) return -1;
if (tDecodeI32(pDecoder, &pHandle->epoch) < 0) return -1;
if (tDecodeI8(pDecoder, &pHandle->execHandle.subType) < 0) return -1;
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
if (tDecodeCStrAlloc(pDecoder, &pHandle->execHandle.exec.execCol.qmsg) < 0) return -1;
}
tEndDecode(pDecoder);
return 0;
}
int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) { int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
void* pIter = NULL; void* pIter = NULL;
while (1) { while (1) {
pIter = taosHashIterate(pTq->execs, pIter); pIter = taosHashIterate(pTq->handles, pIter);
if (pIter == NULL) break; if (pIter == NULL) break;
STqExec* pExec = (STqExec*)pIter; STqHandle* pExec = (STqHandle*)pIter;
if (pExec->subType == TOPIC_SUB_TYPE__DB) { if (pExec->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
for (int32_t i = 0; i < 5; i++) {
int32_t code = qUpdateQualifiedTableId(pExec->execHandle.exec.execCol.task[i], tbUidList, isAdd);
ASSERT(code == 0);
}
} else if (pExec->execHandle.subType == TOPIC_SUB_TYPE__DB) {
if (!isAdd) { if (!isAdd) {
int32_t sz = taosArrayGetSize(tbUidList); int32_t sz = taosArrayGetSize(tbUidList);
for (int32_t i = 0; i < sz; i++) { for (int32_t i = 0; i < sz; i++) {
int64_t tbUid = *(int64_t*)taosArrayGet(tbUidList, i); int64_t tbUid = *(int64_t*)taosArrayGet(tbUidList, i);
taosHashPut(pExec->pDropTbUid, &tbUid, sizeof(int64_t), NULL, 0); taosHashPut(pExec->execHandle.exec.execDb.pFilterOutTbUid, &tbUid, sizeof(int64_t), NULL, 0);
} }
} }
} else { } else {
for (int32_t i = 0; i < 5; i++) { // tq update id
int32_t code = qUpdateQualifiedTableId(pExec->task[i], tbUidList, isAdd);
ASSERT(code == 0);
}
} }
} }
while (1) { while (1) {
@ -250,7 +278,7 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver, SRpcHandleInfo handleInfo) { int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver, SRpcHandleInfo handleInfo) {
if (msgType != TDMT_VND_SUBMIT) return 0; if (msgType != TDMT_VND_SUBMIT) return 0;
void* pIter = NULL; void* pIter = NULL;
STqExec* pExec = NULL; STqHandle* pHandle = NULL;
SSubmitReq* pReq = (SSubmitReq*)msg; SSubmitReq* pReq = (SSubmitReq*)msg;
int32_t workerId = 4; int32_t workerId = 4;
int64_t fetchOffset = ver; int64_t fetchOffset = ver;
@ -258,84 +286,27 @@ int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_
while (1) { while (1) {
pIter = taosHashIterate(pTq->pushMgr, pIter); pIter = taosHashIterate(pTq->pushMgr, pIter);
if (pIter == NULL) break; if (pIter == NULL) break;
pExec = *(STqExec**)pIter; pHandle = *(STqHandle**)pIter;
taosWLockLatch(&pExec->pushHandle.lock); taosWLockLatch(&pHandle->pushHandle.lock);
SRpcMsg* pMsg = atomic_load_ptr(&pExec->pushHandle.handle); SRpcMsg* pMsg = atomic_load_ptr(&pHandle->pushHandle.handle);
ASSERT(pMsg); ASSERT(pMsg);
SMqDataBlkRsp rsp = {0}; SMqDataBlkRsp rsp = {0};
rsp.reqOffset = pExec->pushHandle.reqOffset; rsp.reqOffset = pHandle->pushHandle.reqOffset;
rsp.blockData = taosArrayInit(0, sizeof(void*)); rsp.blockData = taosArrayInit(0, sizeof(void*));
rsp.blockDataLen = taosArrayInit(0, sizeof(int32_t)); rsp.blockDataLen = taosArrayInit(0, sizeof(int32_t));
if (pExec->subType == TOPIC_SUB_TYPE__TABLE) { if (msgType == TDMT_VND_SUBMIT) {
qTaskInfo_t task = pExec->task[workerId]; tqDataExec(pTq, &pHandle->execHandle, pReq, &rsp, workerId);
ASSERT(task);
qSetStreamInput(task, pReq, STREAM_DATA_TYPE_SUBMIT_BLOCK, false);
while (1) {
SSDataBlock* pDataBlock = NULL;
uint64_t ts = 0;
if (qExecTask(task, &pDataBlock, &ts) < 0) {
ASSERT(0);
}
if (pDataBlock == NULL) break;
ASSERT(pDataBlock->info.rows != 0);
ASSERT(pDataBlock->info.numOfCols != 0);
int32_t dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pDataBlock);
void* buf = taosMemoryCalloc(1, dataStrLen);
SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)buf;
pRetrieve->useconds = ts;
pRetrieve->precision = TSDB_DEFAULT_PRECISION;
pRetrieve->compressed = 0;
pRetrieve->completed = 1;
pRetrieve->numOfRows = htonl(pDataBlock->info.rows);
// TODO enable compress
int32_t actualLen = 0;
blockCompressEncode(pDataBlock, pRetrieve->data, &actualLen, pDataBlock->info.numOfCols, false);
actualLen += sizeof(SRetrieveTableRsp);
ASSERT(actualLen <= dataStrLen);
taosArrayPush(rsp.blockDataLen, &actualLen);
taosArrayPush(rsp.blockData, &buf);
rsp.blockNum++;
}
} else if (pExec->subType == TOPIC_SUB_TYPE__DB) {
STqReadHandle* pReader = pExec->pExecReader[workerId];
tqReadHandleSetMsg(pReader, pReq, 0);
while (tqNextDataBlock(pReader)) {
SSDataBlock block = {0};
if (tqRetrieveDataBlock(&block.pDataBlock, pReader, &block.info.groupId, &block.info.uid, &block.info.rows,
&block.info.numOfCols) < 0) {
ASSERT(0);
}
int32_t dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(&block);
void* buf = taosMemoryCalloc(1, dataStrLen);
SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)buf;
/*pRetrieve->useconds = 0;*/
pRetrieve->precision = TSDB_DEFAULT_PRECISION;
pRetrieve->compressed = 0;
pRetrieve->completed = 1;
pRetrieve->numOfRows = htonl(block.info.rows);
// TODO enable compress
int32_t actualLen = 0;
blockCompressEncode(&block, pRetrieve->data, &actualLen, block.info.numOfCols, false);
actualLen += sizeof(SRetrieveTableRsp);
ASSERT(actualLen <= dataStrLen);
taosArrayPush(rsp.blockDataLen, &actualLen);
taosArrayPush(rsp.blockData, &buf);
rsp.blockNum++;
}
} else { } else {
// TODO
ASSERT(0); ASSERT(0);
} }
if (rsp.blockNum == 0) { if (rsp.blockNum == 0) {
taosWUnLockLatch(&pExec->pushHandle.lock); taosWUnLockLatch(&pHandle->pushHandle.lock);
continue; continue;
} }
@ -352,8 +323,8 @@ int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_
} }
((SMqRspHead*)buf)->mqMsgType = TMQ_MSG_TYPE__POLL_RSP; ((SMqRspHead*)buf)->mqMsgType = TMQ_MSG_TYPE__POLL_RSP;
((SMqRspHead*)buf)->epoch = pExec->pushHandle.epoch; ((SMqRspHead*)buf)->epoch = pHandle->pushHandle.epoch;
((SMqRspHead*)buf)->consumerId = pExec->pushHandle.consumerId; ((SMqRspHead*)buf)->consumerId = pHandle->pushHandle.consumerId;
void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead)); void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
tEncodeSMqDataBlkRsp(&abuf, &rsp); tEncodeSMqDataBlkRsp(&abuf, &rsp);
@ -361,11 +332,11 @@ int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_
SRpcMsg resp = {.info = handleInfo, .pCont = buf, .contLen = tlen, .code = 0}; SRpcMsg resp = {.info = handleInfo, .pCont = buf, .contLen = tlen, .code = 0};
tmsgSendRsp(&resp); tmsgSendRsp(&resp);
atomic_store_ptr(&pExec->pushHandle.handle, NULL); atomic_store_ptr(&pHandle->pushHandle.handle, NULL);
taosWUnLockLatch(&pExec->pushHandle.lock); taosWUnLockLatch(&pHandle->pushHandle.lock);
tqDebug("vg %d offset %ld from consumer %ld (epoch %d) send rsp, block num: %d, reqOffset: %ld, rspOffset: %ld", tqDebug("vg %d offset %ld from consumer %ld (epoch %d) send rsp, block num: %d, reqOffset: %ld, rspOffset: %ld",
TD_VID(pTq->pVnode), fetchOffset, pExec->pushHandle.consumerId, pExec->pushHandle.epoch, rsp.blockNum, TD_VID(pTq->pVnode), fetchOffset, pHandle->pushHandle.consumerId, pHandle->pushHandle.epoch, rsp.blockNum,
rsp.reqOffset, rsp.rspOffset); rsp.reqOffset, rsp.rspOffset);
// TODO destroy // TODO destroy
@ -419,12 +390,12 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
tqDebug("tmq poll: consumer %ld (epoch %d) recv poll req in vg %d, req %ld %ld", consumerId, pReq->epoch, tqDebug("tmq poll: consumer %ld (epoch %d) recv poll req in vg %d, req %ld %ld", consumerId, pReq->epoch,
TD_VID(pTq->pVnode), pReq->currentOffset, fetchOffset); TD_VID(pTq->pVnode), pReq->currentOffset, fetchOffset);
STqExec* pExec = taosHashGet(pTq->execs, pReq->subKey, strlen(pReq->subKey)); STqHandle* pHandle = taosHashGet(pTq->handles, pReq->subKey, strlen(pReq->subKey));
ASSERT(pExec); ASSERT(pHandle);
int32_t consumerEpoch = atomic_load_32(&pExec->epoch); int32_t consumerEpoch = atomic_load_32(&pHandle->epoch);
while (consumerEpoch < reqEpoch) { while (consumerEpoch < reqEpoch) {
consumerEpoch = atomic_val_compare_exchange_32(&pExec->epoch, consumerEpoch, reqEpoch); consumerEpoch = atomic_val_compare_exchange_32(&pHandle->epoch, consumerEpoch, reqEpoch);
} }
SWalHead* pHeadWithCkSum = taosMemoryMalloc(sizeof(SWalHead) + 2048); SWalHead* pHeadWithCkSum = taosMemoryMalloc(sizeof(SWalHead) + 2048);
@ -432,50 +403,46 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
return -1; return -1;
} }
walSetReaderCapacity(pExec->pWalReader, 2048); walSetReaderCapacity(pHandle->pWalReader, 2048);
SMqDataBlkRsp rsp = {0}; SMqDataBlkRsp rsp = {0};
rsp.reqOffset = pReq->currentOffset; rsp.reqOffset = pReq->currentOffset;
rsp.withSchema = pExec->withSchema;
rsp.blockData = taosArrayInit(0, sizeof(void*)); rsp.blockData = taosArrayInit(0, sizeof(void*));
rsp.blockDataLen = taosArrayInit(0, sizeof(int32_t)); rsp.blockDataLen = taosArrayInit(0, sizeof(int32_t));
rsp.blockSchema = taosArrayInit(0, sizeof(void*)); rsp.blockSchema = taosArrayInit(0, sizeof(void*));
rsp.blockTbName = taosArrayInit(0, sizeof(void*)); rsp.blockTbName = taosArrayInit(0, sizeof(void*));
int8_t withTbName = pExec->withTbName; rsp.withTbName = pReq->withTbName;
if (pReq->withTbName != -1) { if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
withTbName = pReq->withTbName; rsp.withSchema = false;
rsp.withTag = false;
} else {
rsp.withSchema = true;
rsp.withTag = false;
} }
rsp.withTbName = withTbName;
/*int8_t withTbName = pExec->withTbName;*/
/*if (pReq->withTbName != -1) {*/
/*withTbName = pReq->withTbName;*/
/*}*/
/*rsp.withTbName = withTbName;*/
while (1) { while (1) {
consumerEpoch = atomic_load_32(&pExec->epoch); consumerEpoch = atomic_load_32(&pHandle->epoch);
if (consumerEpoch > reqEpoch) { if (consumerEpoch > reqEpoch) {
tqDebug("tmq poll: consumer %ld (epoch %d) vg %d offset %ld, found new consumer epoch %d discard req epoch %d", tqDebug("tmq poll: consumer %ld (epoch %d) vg %d offset %ld, found new consumer epoch %d discard req epoch %d",
consumerId, pReq->epoch, TD_VID(pTq->pVnode), fetchOffset, consumerEpoch, reqEpoch); consumerId, pReq->epoch, TD_VID(pTq->pVnode), fetchOffset, consumerEpoch, reqEpoch);
break; break;
} }
taosThreadMutexLock(&pExec->pWalReader->mutex); if (tqFetchLog(pTq, pHandle, &fetchOffset, &pHeadWithCkSum) < 0) {
// TODO add push mgr
if (walFetchHead(pExec->pWalReader, fetchOffset, pHeadWithCkSum) < 0) {
tqDebug("tmq poll: consumer %ld (epoch %d) vg %d offset %ld, no more log to return", consumerId, pReq->epoch,
TD_VID(pTq->pVnode), fetchOffset);
taosThreadMutexUnlock(&pExec->pWalReader->mutex);
break; break;
} }
if (pHeadWithCkSum->head.msgType != TDMT_VND_SUBMIT) {
ASSERT(walSkipFetchBody(pExec->pWalReader, pHeadWithCkSum) == 0);
} else {
ASSERT(walFetchBody(pExec->pWalReader, &pHeadWithCkSum) == 0);
}
SWalReadHead* pHead = &pHeadWithCkSum->head; SWalReadHead* pHead = &pHeadWithCkSum->head;
taosThreadMutexUnlock(&pExec->pWalReader->mutex);
#if 0 #if 0
SWalReadHead* pHead; SWalReadHead* pHead;
if (walReadWithHandle_s(pExec->pWalReader, fetchOffset, &pHead) < 0) { if (walReadWithHandle_s(pExec->pWalReader, fetchOffset, &pHead) < 0) {
@ -515,122 +482,28 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
if (pHead->msgType == TDMT_VND_SUBMIT) { if (pHead->msgType == TDMT_VND_SUBMIT) {
SSubmitReq* pCont = (SSubmitReq*)&pHead->body; SSubmitReq* pCont = (SSubmitReq*)&pHead->body;
// table subscribe
if (pExec->subType == TOPIC_SUB_TYPE__TABLE) {
qTaskInfo_t task = pExec->task[workerId];
ASSERT(task);
qSetStreamInput(task, pCont, STREAM_DATA_TYPE_SUBMIT_BLOCK, false);
while (1) {
SSDataBlock* pDataBlock = NULL;
uint64_t ts = 0;
if (qExecTask(task, &pDataBlock, &ts) < 0) {
ASSERT(0);
}
if (pDataBlock == NULL) break;
ASSERT(pDataBlock->info.rows != 0); tqDataExec(pTq, &pHandle->execHandle, pCont, &rsp, workerId);
ASSERT(pDataBlock->info.numOfCols != 0);
int32_t dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pDataBlock);
void* buf = taosMemoryCalloc(1, dataStrLen);
SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)buf;
pRetrieve->useconds = ts;
pRetrieve->precision = TSDB_DEFAULT_PRECISION;
pRetrieve->compressed = 0;
pRetrieve->completed = 1;
pRetrieve->numOfRows = htonl(pDataBlock->info.rows);
// TODO enable compress
int32_t actualLen = 0;
blockCompressEncode(pDataBlock, pRetrieve->data, &actualLen, pDataBlock->info.numOfCols, false);
actualLen += sizeof(SRetrieveTableRsp);
ASSERT(actualLen <= dataStrLen);
taosArrayPush(rsp.blockDataLen, &actualLen);
taosArrayPush(rsp.blockData, &buf);
if (pExec->withSchema) {
SSchemaWrapper* pSW = tCloneSSchemaWrapper(pExec->pExecReader[workerId]->pSchemaWrapper);
taosArrayPush(rsp.blockSchema, &pSW);
}
if (withTbName) {
SMetaReader mr = {0};
metaReaderInit(&mr, pTq->pVnode->pMeta, 0);
int64_t uid = pExec->pExecReader[workerId]->msgIter.uid;
if (metaGetTableEntryByUid(&mr, uid) < 0) {
ASSERT(0);
}
char* tbName = strdup(mr.me.name);
taosArrayPush(rsp.blockTbName, &tbName);
metaReaderClear(&mr);
}
rsp.blockNum++;
}
// db subscribe
} else if (pExec->subType == TOPIC_SUB_TYPE__DB) {
rsp.withSchema = 1;
STqReadHandle* pReader = pExec->pExecReader[workerId];
tqReadHandleSetMsg(pReader, pCont, 0);
while (tqNextDataBlockFilterOut(pReader, pExec->pDropTbUid)) {
SSDataBlock block = {0};
if (tqRetrieveDataBlock(&block.pDataBlock, pReader, &block.info.groupId, &block.info.uid, &block.info.rows,
&block.info.numOfCols) < 0) {
if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue;
ASSERT(0);
}
int32_t dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(&block);
void* buf = taosMemoryCalloc(1, dataStrLen);
SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)buf;
/*pRetrieve->useconds = 0;*/
pRetrieve->precision = TSDB_DEFAULT_PRECISION;
pRetrieve->compressed = 0;
pRetrieve->completed = 1;
pRetrieve->numOfRows = htonl(block.info.rows);
// TODO enable compress
int32_t actualLen = 0;
blockCompressEncode(&block, pRetrieve->data, &actualLen, block.info.numOfCols, false);
actualLen += sizeof(SRetrieveTableRsp);
ASSERT(actualLen <= dataStrLen);
taosArrayPush(rsp.blockDataLen, &actualLen);
taosArrayPush(rsp.blockData, &buf);
if (withTbName) {
SMetaReader mr = {0};
metaReaderInit(&mr, pTq->pVnode->pMeta, 0);
if (metaGetTableEntryByUid(&mr, block.info.uid) < 0) {
ASSERT(0);
}
char* tbName = strdup(mr.me.name);
taosArrayPush(rsp.blockTbName, &tbName);
metaReaderClear(&mr);
}
SSchemaWrapper* pSW = tCloneSSchemaWrapper(pExec->pExecReader[workerId]->pSchemaWrapper);
taosArrayPush(rsp.blockSchema, &pSW);
rsp.blockNum++;
}
} else { } else {
// TODO
ASSERT(0); ASSERT(0);
} }
}
// TODO batch optimization: // TODO batch optimization:
// TODO continue scan until meeting batch requirement // TODO continue scan until meeting batch requirement
if (rsp.blockNum != 0) break; if (rsp.blockNum > 0 /* threshold */) {
rsp.skipLogNum++; break;
} else {
fetchOffset++; fetchOffset++;
} }
}
taosMemoryFree(pHeadWithCkSum); taosMemoryFree(pHeadWithCkSum);
ASSERT(taosArrayGetSize(rsp.blockData) == rsp.blockNum); ASSERT(taosArrayGetSize(rsp.blockData) == rsp.blockNum);
ASSERT(taosArrayGetSize(rsp.blockDataLen) == rsp.blockNum); ASSERT(taosArrayGetSize(rsp.blockDataLen) == rsp.blockNum);
if (rsp.blockNum != 0)
rsp.rspOffset = fetchOffset; rsp.rspOffset = fetchOffset;
else
rsp.rspOffset = fetchOffset - 1;
int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqDataBlkRsp(NULL, &rsp); int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqDataBlkRsp(NULL, &rsp);
void* buf = rpcMallocCont(tlen); void* buf = rpcMallocCont(tlen);
@ -646,13 +519,18 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead)); void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
tEncodeSMqDataBlkRsp(&abuf, &rsp); tEncodeSMqDataBlkRsp(&abuf, &rsp);
SRpcMsg resp = {.info = pMsg->info, .pCont = buf, .contLen = tlen, .code = 0}; SRpcMsg resp = {
.info = pMsg->info,
.pCont = buf,
.contLen = tlen,
.code = 0,
};
tmsgSendRsp(&resp); tmsgSendRsp(&resp);
tqDebug("vg %d offset %ld from consumer %ld (epoch %d) send rsp, block num: %d, reqOffset: %ld, rspOffset: %ld", tqDebug("vg %d offset %ld from consumer %ld (epoch %d) send rsp, block num: %d, reqOffset: %ld, rspOffset: %ld",
TD_VID(pTq->pVnode), fetchOffset, consumerId, pReq->epoch, rsp.blockNum, rsp.reqOffset, rsp.rspOffset); TD_VID(pTq->pVnode), fetchOffset, consumerId, pReq->epoch, rsp.blockNum, rsp.reqOffset, rsp.rspOffset);
// TODO destroy // TODO wrap in destroy func
taosArrayDestroy(rsp.blockData); taosArrayDestroy(rsp.blockData);
taosArrayDestroy(rsp.blockDataLen); taosArrayDestroy(rsp.blockDataLen);
taosArrayDestroyP(rsp.blockSchema, (FDelete)tDeleteSSchemaWrapper); taosArrayDestroyP(rsp.blockSchema, (FDelete)tDeleteSSchemaWrapper);
@ -664,7 +542,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
int32_t tqProcessVgDeleteReq(STQ* pTq, char* msg, int32_t msgLen) { int32_t tqProcessVgDeleteReq(STQ* pTq, char* msg, int32_t msgLen) {
SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg; SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg;
int32_t code = taosHashRemove(pTq->execs, pReq->subKey, strlen(pReq->subKey)); int32_t code = taosHashRemove(pTq->handles, pReq->subKey, strlen(pReq->subKey));
ASSERT(code == 0); ASSERT(code == 0);
TXN txn; TXN txn;
@ -693,63 +571,59 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
SMqRebVgReq req = {0}; SMqRebVgReq req = {0};
tDecodeSMqRebVgReq(msg, &req); tDecodeSMqRebVgReq(msg, &req);
// todo lock // todo lock
STqExec* pExec = taosHashGet(pTq->execs, req.subKey, strlen(req.subKey)); STqHandle* pHandle = taosHashGet(pTq->handles, req.subKey, strlen(req.subKey));
if (pExec == NULL) { if (pHandle == NULL) {
ASSERT(req.oldConsumerId == -1); ASSERT(req.oldConsumerId == -1);
ASSERT(req.newConsumerId != -1); ASSERT(req.newConsumerId != -1);
STqExec exec = {0}; STqHandle tqHandle = {0};
pExec = &exec; pHandle = &tqHandle;
/*taosInitRWLatch(&pExec->lock);*/ /*taosInitRWLatch(&pExec->lock);*/
memcpy(pExec->subKey, req.subKey, TSDB_SUBSCRIBE_KEY_LEN); memcpy(pHandle->subKey, req.subKey, TSDB_SUBSCRIBE_KEY_LEN);
pExec->consumerId = req.newConsumerId; pHandle->consumerId = req.newConsumerId;
pExec->epoch = -1; pHandle->epoch = -1;
pExec->subType = req.subType; pHandle->execHandle.subType = req.subType;
pExec->withTbName = req.withTbName; /*pExec->withTbName = req.withTbName;*/
pExec->withSchema = req.withSchema; /*pExec->withSchema = req.withSchema;*/
pExec->withTag = req.withTag; /*pExec->withTag = req.withTag;*/
pExec->qmsg = req.qmsg; pHandle->execHandle.exec.execCol.qmsg = req.qmsg;
req.qmsg = NULL; req.qmsg = NULL;
pExec->pWalReader = walOpenReadHandle(pTq->pVnode->pWal); pHandle->pWalReader = walOpenReadHandle(pTq->pVnode->pWal);
if (pExec->subType == TOPIC_SUB_TYPE__TABLE) { for (int32_t i = 0; i < 5; i++) {
pHandle->execHandle.pExecReader[i] = tqInitSubmitMsgScanner(pTq->pVnode->pMeta);
}
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
for (int32_t i = 0; i < 5; i++) { for (int32_t i = 0; i < 5; i++) {
pExec->pExecReader[i] = tqInitSubmitMsgScanner(pTq->pVnode->pMeta);
SReadHandle handle = { SReadHandle handle = {
.reader = pExec->pExecReader[i], .reader = pHandle->execHandle.pExecReader[i],
.meta = pTq->pVnode->pMeta, .meta = pTq->pVnode->pMeta,
.pMsgCb = &pTq->pVnode->msgCb, .pMsgCb = &pTq->pVnode->msgCb,
}; };
pExec->task[i] = qCreateStreamExecTaskInfo(pExec->qmsg, &handle); pHandle->execHandle.exec.execCol.task[i] =
ASSERT(pExec->task[i]); qCreateStreamExecTaskInfo(pHandle->execHandle.exec.execCol.qmsg, &handle);
ASSERT(pHandle->execHandle.exec.execCol.task[i]);
} }
} else { } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) {
for (int32_t i = 0; i < 5; i++) { pHandle->execHandle.exec.execDb.pFilterOutTbUid =
pExec->pExecReader[i] = tqInitSubmitMsgScanner(pTq->pVnode->pMeta); taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
} else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
} }
pExec->pDropTbUid = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); taosHashPut(pTq->handles, req.subKey, strlen(req.subKey), pHandle, sizeof(STqHandle));
}
taosHashPut(pTq->execs, req.subKey, strlen(req.subKey), pExec, sizeof(STqExec));
if (tqStoreExec(pTq, req.subKey, pExec) < 0) {
// TODO
}
return 0;
} else { } else {
/*ASSERT(pExec->consumerId == req.oldConsumerId);*/ /*ASSERT(pExec->consumerId == req.oldConsumerId);*/
// TODO handle qmsg and exec modification // TODO handle qmsg and exec modification
atomic_store_32(&pExec->epoch, -1); atomic_store_32(&pHandle->epoch, -1);
atomic_store_64(&pExec->consumerId, req.newConsumerId); atomic_store_64(&pHandle->consumerId, req.newConsumerId);
atomic_add_fetch_32(&pExec->epoch, 1); atomic_add_fetch_32(&pHandle->epoch, 1);
}
if (tqStoreExec(pTq, req.subKey, pExec) < 0) { if (tqStoreHandle(pTq, req.subKey, pHandle) < 0) {
// TODO // TODO
} }
return 0; return 0;
}
} }
void tqTableSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data) { void tqTableSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data) {

View File

@ -0,0 +1,124 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tq.h"
static int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataBlkRsp* pRsp) {
int32_t dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock);
void* buf = taosMemoryCalloc(1, dataStrLen);
if (buf == NULL) return -1;
SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)buf;
pRetrieve->useconds = 0;
pRetrieve->precision = TSDB_DEFAULT_PRECISION;
pRetrieve->compressed = 0;
pRetrieve->completed = 1;
pRetrieve->numOfRows = htonl(pBlock->info.rows);
// TODO enable compress
int32_t actualLen = 0;
blockCompressEncode(pBlock, pRetrieve->data, &actualLen, pBlock->info.numOfCols, false);
actualLen += sizeof(SRetrieveTableRsp);
ASSERT(actualLen <= dataStrLen);
taosArrayPush(pRsp->blockDataLen, &actualLen);
taosArrayPush(pRsp->blockData, &buf);
return 0;
}
static int32_t tqAddBlockSchemaToRsp(const STqExecHandle* pExec, int32_t workerId, SMqDataBlkRsp* pRsp) {
SSchemaWrapper* pSW = tCloneSSchemaWrapper(pExec->pExecReader[workerId]->pSchemaWrapper);
taosArrayPush(pRsp->blockSchema, &pSW);
return 0;
}
static int32_t tqAddTbNameToRsp(const STQ* pTq, const STqExecHandle* pExec, SMqDataBlkRsp* pRsp, int32_t workerId) {
SMetaReader mr = {0};
metaReaderInit(&mr, pTq->pVnode->pMeta, 0);
int64_t uid = pExec->pExecReader[workerId]->msgIter.uid;
if (metaGetTableEntryByUid(&mr, uid) < 0) {
ASSERT(0);
return -1;
}
char* tbName = strdup(mr.me.name);
taosArrayPush(pRsp->blockTbName, &tbName);
metaReaderClear(&mr);
return 0;
}
int32_t tqDataExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataBlkRsp* pRsp, int32_t workerId) {
if (pExec->subType == TOPIC_SUB_TYPE__COLUMN) {
qTaskInfo_t task = pExec->exec.execCol.task[workerId];
ASSERT(task);
qSetStreamInput(task, pReq, STREAM_DATA_TYPE_SUBMIT_BLOCK, false);
while (1) {
SSDataBlock* pDataBlock = NULL;
uint64_t ts = 0;
if (qExecTask(task, &pDataBlock, &ts) < 0) {
ASSERT(0);
}
if (pDataBlock == NULL) break;
ASSERT(pDataBlock->info.rows != 0);
ASSERT(pDataBlock->info.numOfCols != 0);
tqAddBlockDataToRsp(pDataBlock, pRsp);
if (pRsp->withTbName) {
tqAddTbNameToRsp(pTq, pExec, pRsp, workerId);
}
pRsp->blockNum++;
}
} else if (pExec->subType == TOPIC_SUB_TYPE__TABLE) {
pRsp->withSchema = 1;
STqReadHandle* pReader = pExec->pExecReader[workerId];
tqReadHandleSetMsg(pReader, pReq, 0);
while (tqNextDataBlock(pReader)) {
SSDataBlock block = {0};
if (tqRetrieveDataBlock(&block.pDataBlock, pReader, &block.info.groupId, &block.info.uid, &block.info.rows,
&block.info.numOfCols) < 0) {
if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue;
ASSERT(0);
}
tqAddBlockDataToRsp(&block, pRsp);
if (pRsp->withTbName) {
tqAddTbNameToRsp(pTq, pExec, pRsp, workerId);
}
tqAddBlockSchemaToRsp(pExec, workerId, pRsp);
pRsp->blockNum++;
}
} else if (pExec->subType == TOPIC_SUB_TYPE__DB) {
pRsp->withSchema = 1;
STqReadHandle* pReader = pExec->pExecReader[workerId];
tqReadHandleSetMsg(pReader, pReq, 0);
while (tqNextDataBlockFilterOut(pReader, pExec->exec.execDb.pFilterOutTbUid)) {
SSDataBlock block = {0};
if (tqRetrieveDataBlock(&block.pDataBlock, pReader, &block.info.groupId, &block.info.uid, &block.info.rows,
&block.info.numOfCols) < 0) {
if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue;
ASSERT(0);
}
tqAddBlockDataToRsp(&block, pRsp);
if (pRsp->withTbName) {
tqAddTbNameToRsp(pTq, pExec, pRsp, workerId);
}
tqAddBlockSchemaToRsp(pExec, workerId, pRsp);
pRsp->blockNum++;
}
}
if (pRsp->blockNum == 0) {
pRsp->skipLogNum++;
return -1;
}
return 0;
}

View File

@ -0,0 +1,14 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

View File

@ -15,6 +15,48 @@
#include "tq.h" #include "tq.h"
int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalHead** ppHeadWithCkSum) {
int32_t code = 0;
taosThreadMutexLock(&pHandle->pWalReader->mutex);
int64_t offset = *fetchOffset;
while (1) {
if (walFetchHead(pHandle->pWalReader, offset, *ppHeadWithCkSum) < 0) {
tqDebug("tmq poll: consumer %ld (epoch %d) vg %d offset %ld, no more log to return", pHandle->consumerId,
pHandle->epoch, TD_VID(pTq->pVnode), offset);
*fetchOffset = offset - 1;
code = -1;
goto END;
}
if ((*ppHeadWithCkSum)->head.msgType == TDMT_VND_SUBMIT) {
code = walFetchBody(pHandle->pWalReader, ppHeadWithCkSum);
if (code < 0) {
ASSERT(0);
*fetchOffset = offset;
code = -1;
goto END;
}
*fetchOffset = offset;
code = 0;
goto END;
} else {
code = walSkipFetchBody(pHandle->pWalReader, *ppHeadWithCkSum);
if (code < 0) {
ASSERT(0);
*fetchOffset = offset;
code = -1;
goto END;
}
offset++;
}
}
END:
taosThreadMutexUnlock(&pHandle->pWalReader->mutex);
return code;
}
STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta) { STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta) {
STqReadHandle* pReadHandle = taosMemoryMalloc(sizeof(STqReadHandle)); STqReadHandle* pReadHandle = taosMemoryMalloc(sizeof(STqReadHandle));
if (pReadHandle == NULL) { if (pReadHandle == NULL) {
@ -24,7 +66,7 @@ STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta) {
pReadHandle->pMsg = NULL; pReadHandle->pMsg = NULL;
pReadHandle->ver = -1; pReadHandle->ver = -1;
pReadHandle->pColIdList = NULL; pReadHandle->pColIdList = NULL;
pReadHandle->sver = -1; pReadHandle->cachedSchemaVer = -1;
pReadHandle->cachedSchemaUid = -1; pReadHandle->cachedSchemaUid = -1;
pReadHandle->pSchema = NULL; pReadHandle->pSchema = NULL;
pReadHandle->pSchemaWrapper = NULL; pReadHandle->pSchemaWrapper = NULL;
@ -88,11 +130,11 @@ int32_t tqRetrieveDataBlock(SArray** ppCols, STqReadHandle* pHandle, uint64_t* p
// TODO set to real sversion // TODO set to real sversion
/*int32_t sversion = 1;*/ /*int32_t sversion = 1;*/
int32_t sversion = htonl(pHandle->pBlock->sversion); int32_t sversion = htonl(pHandle->pBlock->sversion);
if (pHandle->sver != sversion || pHandle->cachedSchemaUid != pHandle->msgIter.suid) { if (pHandle->cachedSchemaVer != sversion || pHandle->cachedSchemaUid != pHandle->msgIter.suid) {
pHandle->pSchema = metaGetTbTSchema(pHandle->pVnodeMeta, pHandle->msgIter.uid, sversion); pHandle->pSchema = metaGetTbTSchema(pHandle->pVnodeMeta, pHandle->msgIter.uid, sversion);
if (pHandle->pSchema == NULL) { if (pHandle->pSchema == NULL) {
tqWarn("cannot found tsschema for table: uid: %ld (suid: %ld), version %d, possibly dropped table", tqWarn("cannot found tsschema for table: uid: %ld (suid: %ld), version %d, possibly dropped table",
pHandle->msgIter.uid, pHandle->msgIter.suid, pHandle->sver); pHandle->msgIter.uid, pHandle->msgIter.suid, pHandle->cachedSchemaVer);
/*ASSERT(0);*/ /*ASSERT(0);*/
terrno = TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND; terrno = TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND;
return -1; return -1;
@ -102,12 +144,12 @@ int32_t tqRetrieveDataBlock(SArray** ppCols, STqReadHandle* pHandle, uint64_t* p
pHandle->pSchemaWrapper = metaGetTableSchema(pHandle->pVnodeMeta, pHandle->msgIter.suid, sversion, true); pHandle->pSchemaWrapper = metaGetTableSchema(pHandle->pVnodeMeta, pHandle->msgIter.suid, sversion, true);
if (pHandle->pSchemaWrapper == NULL) { if (pHandle->pSchemaWrapper == NULL) {
tqWarn("cannot found schema wrapper for table: suid: %ld, version %d, possibly dropped table", tqWarn("cannot found schema wrapper for table: suid: %ld, version %d, possibly dropped table",
pHandle->msgIter.suid, pHandle->sver); pHandle->msgIter.suid, pHandle->cachedSchemaVer);
/*ASSERT(0);*/ /*ASSERT(0);*/
terrno = TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND; terrno = TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND;
return -1; return -1;
} }
pHandle->sver = sversion; pHandle->cachedSchemaVer = sversion;
pHandle->cachedSchemaUid = pHandle->msgIter.suid; pHandle->cachedSchemaUid = pHandle->msgIter.suid;
} }

View File

@ -3289,7 +3289,7 @@ static int32_t createTopicStmtToJson(const void* pObj, SJson* pJson) {
int32_t code = tjsonAddStringToObject(pJson, jkCreateTopicStmtTopicName, pNode->topicName); int32_t code = tjsonAddStringToObject(pJson, jkCreateTopicStmtTopicName, pNode->topicName);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddStringToObject(pJson, jkCreateTopicStmtSubscribeDbName, pNode->subscribeDbName); code = tjsonAddStringToObject(pJson, jkCreateTopicStmtSubscribeDbName, pNode->subDbName);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddBoolToObject(pJson, jkCreateTopicStmtIgnoreExists, pNode->ignoreExists); code = tjsonAddBoolToObject(pJson, jkCreateTopicStmtIgnoreExists, pNode->ignoreExists);
@ -3306,7 +3306,7 @@ static int32_t jsonToCreateTopicStmt(const SJson* pJson, void* pObj) {
int32_t code = tjsonGetStringValue(pJson, jkCreateTopicStmtTopicName, pNode->topicName); int32_t code = tjsonGetStringValue(pJson, jkCreateTopicStmtTopicName, pNode->topicName);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetStringValue(pJson, jkCreateTopicStmtSubscribeDbName, pNode->subscribeDbName); code = tjsonGetStringValue(pJson, jkCreateTopicStmtSubscribeDbName, pNode->subDbName);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBoolValue(pJson, jkCreateTopicStmtIgnoreExists, &pNode->ignoreExists); code = tjsonGetBoolValue(pJson, jkCreateTopicStmtIgnoreExists, &pNode->ignoreExists);

View File

@ -86,8 +86,6 @@ SNodeptr nodesMakeNode(ENodeType type) {
return makeNode(type, sizeof(SExplainOptions)); return makeNode(type, sizeof(SExplainOptions));
case QUERY_NODE_STREAM_OPTIONS: case QUERY_NODE_STREAM_OPTIONS:
return makeNode(type, sizeof(SStreamOptions)); return makeNode(type, sizeof(SStreamOptions));
case QUERY_NODE_TOPIC_OPTIONS:
return makeNode(type, sizeof(STopicOptions));
case QUERY_NODE_LEFT_VALUE: case QUERY_NODE_LEFT_VALUE:
return makeNode(type, sizeof(SLeftValueNode)); return makeNode(type, sizeof(SLeftValueNode));
case QUERY_NODE_SET_OPERATOR: case QUERY_NODE_SET_OPERATOR:

View File

@ -167,7 +167,7 @@ SNode* createCreateComponentNodeStmt(SAstCreateContext* pCxt, ENodeType type, co
SNode* createDropComponentNodeStmt(SAstCreateContext* pCxt, ENodeType type, const SToken* pDnodeId); SNode* createDropComponentNodeStmt(SAstCreateContext* pCxt, ENodeType type, const SToken* pDnodeId);
SNode* createTopicOptions(SAstCreateContext* pCxt); SNode* createTopicOptions(SAstCreateContext* pCxt);
SNode* createCreateTopicStmt(SAstCreateContext* pCxt, bool ignoreExists, const SToken* pTopicName, SNode* pQuery, SNode* createCreateTopicStmt(SAstCreateContext* pCxt, bool ignoreExists, const SToken* pTopicName, SNode* pQuery,
const SToken* pSubscribeDbName, SNode* pOptions); const SToken* pSubDbName, SNode* pRealTable);
SNode* createDropTopicStmt(SAstCreateContext* pCxt, bool ignoreNotExists, const SToken* pTopicName); SNode* createDropTopicStmt(SAstCreateContext* pCxt, bool ignoreNotExists, const SToken* pTopicName);
SNode* createDropCGroupStmt(SAstCreateContext* pCxt, bool ignoreNotExists, const SToken* pCGroupId, SNode* createDropCGroupStmt(SAstCreateContext* pCxt, bool ignoreNotExists, const SToken* pCGroupId,
const SToken* pTopicName); const SToken* pTopicName);

View File

@ -402,18 +402,13 @@ func_list(A) ::= func_list(B) NK_COMMA func(C).
func(A) ::= function_name(B) NK_LP expression_list(C) NK_RP. { A = createFunctionNode(pCxt, &B, C); } func(A) ::= function_name(B) NK_LP expression_list(C) NK_RP. { A = createFunctionNode(pCxt, &B, C); }
/************************************************ create/drop topic ***************************************************/ /************************************************ create/drop topic ***************************************************/
cmd ::= CREATE TOPIC not_exists_opt(A) cmd ::= CREATE TOPIC not_exists_opt(A) topic_name(B) AS query_expression(C). { pCxt->pRootNode = createCreateTopicStmt(pCxt, A, &B, C, NULL, NULL); }
topic_name(B) topic_options(D) AS query_expression(C). { pCxt->pRootNode = createCreateTopicStmt(pCxt, A, &B, C, NULL, D); } cmd ::= CREATE TOPIC not_exists_opt(A) topic_name(B) AS DATABASE db_name(C). { pCxt->pRootNode = createCreateTopicStmt(pCxt, A, &B, NULL, &C, NULL); }
cmd ::= CREATE TOPIC not_exists_opt(A) cmd ::= CREATE TOPIC not_exists_opt(A) topic_name(B)
topic_name(B) topic_options(D) AS db_name(C). { pCxt->pRootNode = createCreateTopicStmt(pCxt, A, &B, NULL, &C, D); } AS STABLE full_table_name(C). { pCxt->pRootNode = createCreateTopicStmt(pCxt, A, &B, NULL, NULL, C); }
cmd ::= DROP TOPIC exists_opt(A) topic_name(B). { pCxt->pRootNode = createDropTopicStmt(pCxt, A, &B); } cmd ::= DROP TOPIC exists_opt(A) topic_name(B). { pCxt->pRootNode = createDropTopicStmt(pCxt, A, &B); }
cmd ::= DROP CONSUMER GROUP exists_opt(A) cgroup_name(B) ON topic_name(C). { pCxt->pRootNode = createDropCGroupStmt(pCxt, A, &B, &C); } cmd ::= DROP CONSUMER GROUP exists_opt(A) cgroup_name(B) ON topic_name(C). { pCxt->pRootNode = createDropCGroupStmt(pCxt, A, &B, &C); }
topic_options(A) ::= . { A = createTopicOptions(pCxt); }
topic_options(A) ::= topic_options(B) WITH TABLE. { ((STopicOptions*)B)->withTable = true; A = B; }
topic_options(A) ::= topic_options(B) WITH SCHEMA. { ((STopicOptions*)B)->withSchema = true; A = B; }
topic_options(A) ::= topic_options(B) WITH TAG. { ((STopicOptions*)B)->withTag = true; A = B; }
/************************************************ desc/describe *******************************************************/ /************************************************ desc/describe *******************************************************/
cmd ::= DESC full_table_name(A). { pCxt->pRootNode = createDescribeStmt(pCxt, A); } cmd ::= DESC full_table_name(A). { pCxt->pRootNode = createDescribeStmt(pCxt, A); }
cmd ::= DESCRIBE full_table_name(A). { pCxt->pRootNode = createDescribeStmt(pCxt, A); } cmd ::= DESCRIBE full_table_name(A). { pCxt->pRootNode = createDescribeStmt(pCxt, A); }

View File

@ -1261,28 +1261,22 @@ SNode* createDropComponentNodeStmt(SAstCreateContext* pCxt, ENodeType type, cons
return (SNode*)pStmt; return (SNode*)pStmt;
} }
SNode* createTopicOptions(SAstCreateContext* pCxt) {
CHECK_PARSER_STATUS(pCxt);
STopicOptions* pOptions = nodesMakeNode(QUERY_NODE_TOPIC_OPTIONS);
CHECK_OUT_OF_MEM(pOptions);
pOptions->withTable = false;
pOptions->withSchema = false;
pOptions->withTag = false;
return (SNode*)pOptions;
}
SNode* createCreateTopicStmt(SAstCreateContext* pCxt, bool ignoreExists, const SToken* pTopicName, SNode* pQuery, SNode* createCreateTopicStmt(SAstCreateContext* pCxt, bool ignoreExists, const SToken* pTopicName, SNode* pQuery,
const SToken* pSubscribeDbName, SNode* pOptions) { const SToken* pSubDbName, SNode* pRealTable) {
CHECK_PARSER_STATUS(pCxt); CHECK_PARSER_STATUS(pCxt);
SCreateTopicStmt* pStmt = nodesMakeNode(QUERY_NODE_CREATE_TOPIC_STMT); SCreateTopicStmt* pStmt = nodesMakeNode(QUERY_NODE_CREATE_TOPIC_STMT);
CHECK_OUT_OF_MEM(pStmt); CHECK_OUT_OF_MEM(pStmt);
strncpy(pStmt->topicName, pTopicName->z, pTopicName->n); strncpy(pStmt->topicName, pTopicName->z, pTopicName->n);
pStmt->ignoreExists = ignoreExists; pStmt->ignoreExists = ignoreExists;
if (NULL != pRealTable) {
strcpy(pStmt->subDbName, ((SRealTableNode*)pRealTable)->table.dbName);
strcpy(pStmt->subSTbName, ((SRealTableNode*)pRealTable)->table.tableName);
nodesDestroyNode(pRealTable);
} else if (NULL != pSubDbName) {
strncpy(pStmt->subDbName, pSubDbName->z, pSubDbName->n);
} else {
pStmt->pQuery = pQuery; pStmt->pQuery = pQuery;
if (NULL != pSubscribeDbName) {
strncpy(pStmt->subscribeDbName, pSubscribeDbName->z, pSubscribeDbName->n);
} }
pStmt->pOptions = (STopicOptions*)pOptions;
return (SNode*)pStmt; return (SNode*)pStmt;
} }

View File

@ -155,7 +155,6 @@ static SKeyword keywordTable[] = {
{"RETENTIONS", TK_RETENTIONS}, {"RETENTIONS", TK_RETENTIONS},
{"REVOKE", TK_REVOKE}, {"REVOKE", TK_REVOKE},
{"ROLLUP", TK_ROLLUP}, {"ROLLUP", TK_ROLLUP},
{"SCHEMA", TK_SCHEMA},
{"SCHEMALESS", TK_SCHEMALESS}, {"SCHEMALESS", TK_SCHEMALESS},
{"SCORES", TK_SCORES}, {"SCORES", TK_SCORES},
{"SELECT", TK_SELECT}, {"SELECT", TK_SELECT},
@ -213,7 +212,6 @@ static SKeyword keywordTable[] = {
{"WATERMARK", TK_WATERMARK}, {"WATERMARK", TK_WATERMARK},
{"WHERE", TK_WHERE}, {"WHERE", TK_WHERE},
{"WINDOW_CLOSE", TK_WINDOW_CLOSE}, {"WINDOW_CLOSE", TK_WINDOW_CLOSE},
{"WITH", TK_WITH},
{"WRITE", TK_WRITE}, {"WRITE", TK_WRITE},
{"_C0", TK_ROWTS}, {"_C0", TK_ROWTS},
{"_QENDTS", TK_QENDTS}, {"_QENDTS", TK_QENDTS},

View File

@ -3299,9 +3299,6 @@ static int32_t buildCreateTopicReq(STranslateContext* pCxt, SCreateTopicStmt* pS
tNameSetDbName(&name, pCxt->pParseCxt->acctId, pStmt->topicName, strlen(pStmt->topicName)); tNameSetDbName(&name, pCxt->pParseCxt->acctId, pStmt->topicName, strlen(pStmt->topicName));
tNameGetFullDbName(&name, pReq->name); tNameGetFullDbName(&name, pReq->name);
pReq->igExists = pStmt->ignoreExists; pReq->igExists = pStmt->ignoreExists;
pReq->withTbName = pStmt->pOptions->withTable;
pReq->withSchema = pStmt->pOptions->withSchema;
pReq->withTag = pStmt->pOptions->withTag;
pReq->sql = strdup(pCxt->pParseCxt->pSql); pReq->sql = strdup(pCxt->pParseCxt->pSql);
if (NULL == pReq->sql) { if (NULL == pReq->sql) {
@ -3310,19 +3307,25 @@ static int32_t buildCreateTopicReq(STranslateContext* pCxt, SCreateTopicStmt* pS
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
const char* dbName; if ('\0' != pStmt->subSTbName[0]) {
if (NULL != pStmt->pQuery) { pReq->subType = TOPIC_SUB_TYPE__TABLE;
dbName = ((SRealTableNode*)(((SSelectStmt*)pStmt->pQuery)->pFromTable))->table.dbName; toName(pCxt->pParseCxt->acctId, pStmt->subDbName, pStmt->subSTbName, &name);
tNameExtractFullName(&name, pReq->subStbName);
} else if ('\0' != pStmt->subDbName[0]) {
pReq->subType = TOPIC_SUB_TYPE__DB;
tNameSetDbName(&name, pCxt->pParseCxt->acctId, pStmt->subDbName, strlen(pStmt->subDbName));
tNameGetFullDbName(&name, pReq->subDbName);
} else {
pReq->subType = TOPIC_SUB_TYPE__COLUMN;
char* dbName = ((SRealTableNode*)(((SSelectStmt*)pStmt->pQuery)->pFromTable))->table.dbName;
tNameSetDbName(&name, pCxt->pParseCxt->acctId, dbName, strlen(dbName));
tNameGetFullDbName(&name, pReq->subDbName);
pCxt->pParseCxt->topicQuery = true; pCxt->pParseCxt->topicQuery = true;
code = translateQuery(pCxt, pStmt->pQuery); code = translateQuery(pCxt, pStmt->pQuery);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = nodesNodeToString(pStmt->pQuery, false, &pReq->ast, NULL); code = nodesNodeToString(pStmt->pQuery, false, &pReq->ast, NULL);
} }
} else {
dbName = pStmt->subscribeDbName;
} }
tNameSetDbName(&name, pCxt->pParseCxt->acctId, dbName, strlen(dbName));
tNameGetFullDbName(&name, pReq->subscribeDbName);
return code; return code;
} }

File diff suppressed because it is too large Load Diff

View File

@ -475,13 +475,62 @@ TEST_F(ParserInitialCTest, createTable) {
TEST_F(ParserInitialCTest, createTopic) { TEST_F(ParserInitialCTest, createTopic) {
useDb("root", "test"); useDb("root", "test");
SCMCreateTopicReq expect = {0};
auto setCreateTopicReqFunc = [&](const char* pTopicName, int8_t igExists, const char* pSql, const char* pAst,
const char* pDbName = nullptr, const char* pTbname = nullptr) {
memset(&expect, 0, sizeof(SMCreateStbReq));
snprintf(expect.name, sizeof(expect.name), "0.%s", pTopicName);
expect.igExists = igExists;
expect.sql = (char*)pSql;
if (nullptr != pTbname) {
expect.subType = TOPIC_SUB_TYPE__TABLE;
snprintf(expect.subStbName, sizeof(expect.subStbName), "0.%s.%s", pDbName, pTbname);
} else if (nullptr != pAst) {
expect.subType = TOPIC_SUB_TYPE__COLUMN;
expect.ast = (char*)pAst;
} else {
expect.subType = TOPIC_SUB_TYPE__DB;
snprintf(expect.subDbName, sizeof(expect.subDbName), "0.%s", pDbName);
}
};
setCheckDdlFunc([&](const SQuery* pQuery, ParserStage stage) {
ASSERT_EQ(nodeType(pQuery->pRoot), QUERY_NODE_CREATE_TOPIC_STMT);
SCMCreateTopicReq req = {0};
ASSERT_TRUE(TSDB_CODE_SUCCESS ==
tDeserializeSCMCreateTopicReq(pQuery->pCmdMsg->pMsg, pQuery->pCmdMsg->msgLen, &req));
ASSERT_EQ(std::string(req.name), std::string(expect.name));
ASSERT_EQ(req.igExists, expect.igExists);
ASSERT_EQ(req.subType, expect.subType);
ASSERT_EQ(std::string(req.sql), std::string(expect.sql));
switch (expect.subType) {
case TOPIC_SUB_TYPE__DB:
ASSERT_EQ(std::string(req.subDbName), std::string(expect.subDbName));
break;
case TOPIC_SUB_TYPE__TABLE:
ASSERT_EQ(std::string(req.subStbName), std::string(expect.subStbName));
break;
case TOPIC_SUB_TYPE__COLUMN:
ASSERT_NE(req.ast, nullptr);
break;
default:
ASSERT_TRUE(false);
}
});
setCreateTopicReqFunc("tp1", 0, "create topic tp1 as select * from t1", "ast");
run("CREATE TOPIC tp1 AS SELECT * FROM t1"); run("CREATE TOPIC tp1 AS SELECT * FROM t1");
run("CREATE TOPIC IF NOT EXISTS tp1 AS SELECT * FROM t1"); setCreateTopicReqFunc("tp1", 1, "create topic if not exists tp1 as select ts, ceil(c1) from t1", "ast");
run("CREATE TOPIC IF NOT EXISTS tp1 AS SELECT ts, CEIL(c1) FROM t1");
run("CREATE TOPIC tp1 AS test"); setCreateTopicReqFunc("tp1", 0, "create topic tp1 as database test", nullptr, "test");
run("CREATE TOPIC tp1 AS DATABASE test");
run("CREATE TOPIC IF NOT EXISTS tp1 AS test"); setCreateTopicReqFunc("tp1", 1, "create topic if not exists tp1 as stable st1", nullptr, "test", "st1");
run("CREATE TOPIC IF NOT EXISTS tp1 AS STABLE st1");
} }
TEST_F(ParserInitialCTest, createUser) { TEST_F(ParserInitialCTest, createUser) {

View File

@ -182,7 +182,7 @@ class TDTestCase:
tdLog.info("create topics from db") tdLog.info("create topics from db")
topicName1 = 'topic_db1' topicName1 = 'topic_db1'
tdSql.execute("create topic %s as %s" %(topicName1, parameterDict['dbName'])) tdSql.execute("create topic %s as database %s" %(topicName1, parameterDict['dbName']))
consumerId = 0 consumerId = 0
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"]
topicList = topicName1 topicList = topicName1
@ -223,7 +223,7 @@ class TDTestCase:
tdLog.info("create topics from db") tdLog.info("create topics from db")
topicName1 = 'topic_db1' topicName1 = 'topic_db1'
tdSql.execute("create topic %s as %s" %(topicName1, parameterDict['dbName'])) tdSql.execute("create topic %s as database %s" %(topicName1, parameterDict['dbName']))
consumerId = 0 consumerId = 0
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"]
topicList = topicName1 topicList = topicName1
@ -279,7 +279,7 @@ class TDTestCase:
tdLog.info("create topics from db") tdLog.info("create topics from db")
topicName1 = 'topic_db1' topicName1 = 'topic_db1'
tdSql.execute("create topic %s as %s" %(topicName1, parameterDict['dbName'])) tdSql.execute("create topic %s as database %s" %(topicName1, parameterDict['dbName']))
consumerId = 0 consumerId = 0
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"]
@ -343,7 +343,7 @@ class TDTestCase:
tdLog.info("create topics from db") tdLog.info("create topics from db")
topicName1 = 'topic_db1' topicName1 = 'topic_db1'
tdSql.execute("create topic %s as %s" %(topicName1, parameterDict['dbName'])) tdSql.execute("create topic %s as database %s" %(topicName1, parameterDict['dbName']))
consumerId = 0 consumerId = 0
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"]
@ -427,7 +427,7 @@ class TDTestCase:
tdLog.info("create topics from db") tdLog.info("create topics from db")
topicName1 = 'topic_db1' topicName1 = 'topic_db1'
tdSql.execute("create topic %s as %s" %(topicName1, parameterDict['dbName'])) tdSql.execute("create topic %s as database %s" %(topicName1, parameterDict['dbName']))
consumerId = 0 consumerId = 0
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] + parameterDict2["rowsPerTbl"] * parameterDict2["ctbNum"] expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] + parameterDict2["rowsPerTbl"] * parameterDict2["ctbNum"]

View File

@ -195,7 +195,7 @@ class TDTestCase:
tdLog.info("create topics from db") tdLog.info("create topics from db")
topicName1 = 'topic_db1' topicName1 = 'topic_db1'
tdSql.execute("create topic %s as %s" %(topicName1, parameterDict['dbName'])) tdSql.execute("create topic %s as database %s" %(topicName1, parameterDict['dbName']))
consumerId = 0 consumerId = 0
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] + parameterDict2["rowsPerTbl"] * parameterDict2["ctbNum"] expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] + parameterDict2["rowsPerTbl"] * parameterDict2["ctbNum"]
@ -272,7 +272,7 @@ class TDTestCase:
tdLog.info("create topics from db") tdLog.info("create topics from db")
topicName1 = 'topic_db1' topicName1 = 'topic_db1'
tdSql.execute("create topic %s as %s" %(topicName1, parameterDict['dbName'])) tdSql.execute("create topic %s as database %s" %(topicName1, parameterDict['dbName']))
consumerId = 0 consumerId = 0
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] + parameterDict2["rowsPerTbl"] * parameterDict2["ctbNum"] expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] + parameterDict2["rowsPerTbl"] * parameterDict2["ctbNum"]
@ -358,8 +358,8 @@ class TDTestCase:
topicName1 = 'topic_db60' topicName1 = 'topic_db60'
topicName2 = 'topic_db61' topicName2 = 'topic_db61'
tdSql.execute("create topic %s as %s" %(topicName1, parameterDict['dbName'])) tdSql.execute("create topic %s as database %s" %(topicName1, parameterDict['dbName']))
tdSql.execute("create topic %s as %s" %(topicName2, parameterDict2['dbName'])) tdSql.execute("create topic %s as database %s" %(topicName2, parameterDict2['dbName']))
consumerId = 0 consumerId = 0
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] + parameterDict2["rowsPerTbl"] * parameterDict2["ctbNum"] expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] + parameterDict2["rowsPerTbl"] * parameterDict2["ctbNum"]
@ -443,8 +443,8 @@ class TDTestCase:
topicName1 = 'topic_db60' topicName1 = 'topic_db60'
topicName2 = 'topic_db61' topicName2 = 'topic_db61'
tdSql.execute("create topic %s as %s" %(topicName1, parameterDict['dbName'])) tdSql.execute("create topic %s as database %s" %(topicName1, parameterDict['dbName']))
tdSql.execute("create topic %s as %s" %(topicName2, parameterDict2['dbName'])) tdSql.execute("create topic %s as database %s" %(topicName2, parameterDict2['dbName']))
consumerId = 0 consumerId = 0
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] + parameterDict2["rowsPerTbl"] * parameterDict2["ctbNum"] expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] + parameterDict2["rowsPerTbl"] * parameterDict2["ctbNum"]

View File

@ -183,7 +183,7 @@ class TDTestCase:
tdLog.info("create topics from db") tdLog.info("create topics from db")
topicName1 = 'topic_db1' topicName1 = 'topic_db1'
tdSql.execute("create topic %s as %s" %(topicName1, parameterDict['dbName'])) tdSql.execute("create topic %s as database %s" %(topicName1, parameterDict['dbName']))
consumerId = 0 consumerId = 0
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] / 2 expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] / 2
topicList = topicName1 topicList = topicName1
@ -261,7 +261,7 @@ class TDTestCase:
tdLog.info("create topics from db") tdLog.info("create topics from db")
topicName1 = 'topic_db1' topicName1 = 'topic_db1'
tdSql.execute("create topic %s as %s" %(topicName1, parameterDict['dbName'])) tdSql.execute("create topic %s as database %s" %(topicName1, parameterDict['dbName']))
consumerId = 0 consumerId = 0
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] / 2 expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] / 2
topicList = topicName1 topicList = topicName1
@ -339,7 +339,7 @@ class TDTestCase:
tdLog.info("create topics from db") tdLog.info("create topics from db")
topicName1 = 'topic_db1' topicName1 = 'topic_db1'
tdSql.execute("create topic %s as %s" %(topicName1, parameterDict['dbName'])) tdSql.execute("create topic %s as database %s" %(topicName1, parameterDict['dbName']))
consumerId = 0 consumerId = 0
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"]
topicList = topicName1 topicList = topicName1
@ -411,7 +411,7 @@ class TDTestCase:
tdLog.info("create topics from db") tdLog.info("create topics from db")
topicName1 = 'topic_db1' topicName1 = 'topic_db1'
tdSql.execute("create topic %s as %s" %(topicName1, parameterDict['dbName'])) tdSql.execute("create topic %s as database %s" %(topicName1, parameterDict['dbName']))
consumerId = 0 consumerId = 0
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"]
topicList = topicName1 topicList = topicName1