feat(tmq): create topic new grammar
This commit is contained in:
parent
2248bc4207
commit
451d19f986
|
@ -106,8 +106,8 @@ int32_t create_topic() {
|
||||||
}
|
}
|
||||||
taos_free_result(pRes);
|
taos_free_result(pRes);
|
||||||
|
|
||||||
/*pRes = taos_query(pConn, "create topic topic_ctb_column as abc1");*/
|
pRes = taos_query(pConn, "create topic topic_ctb_column as database abc1");
|
||||||
pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from st1");
|
/*pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from st1");*/
|
||||||
if (taos_errno(pRes) != 0) {
|
if (taos_errno(pRes) != 0) {
|
||||||
printf("failed to create topic topic_ctb_column, reason:%s\n", taos_errstr(pRes));
|
printf("failed to create topic topic_ctb_column, reason:%s\n", taos_errstr(pRes));
|
||||||
return -1;
|
return -1;
|
||||||
|
|
|
@ -2170,10 +2170,10 @@ typedef struct {
|
||||||
int64_t newConsumerId;
|
int64_t newConsumerId;
|
||||||
char subKey[TSDB_SUBSCRIBE_KEY_LEN];
|
char subKey[TSDB_SUBSCRIBE_KEY_LEN];
|
||||||
int8_t subType;
|
int8_t subType;
|
||||||
int8_t withTbName;
|
// int8_t withTbName;
|
||||||
int8_t withSchema;
|
// int8_t withSchema;
|
||||||
int8_t withTag;
|
// int8_t withTag;
|
||||||
char* qmsg;
|
char* qmsg;
|
||||||
} SMqRebVgReq;
|
} SMqRebVgReq;
|
||||||
|
|
||||||
static FORCE_INLINE int32_t tEncodeSMqRebVgReq(void** buf, const SMqRebVgReq* pReq) {
|
static FORCE_INLINE int32_t tEncodeSMqRebVgReq(void** buf, const SMqRebVgReq* pReq) {
|
||||||
|
@ -2184,10 +2184,10 @@ static FORCE_INLINE int32_t tEncodeSMqRebVgReq(void** buf, const SMqRebVgReq* pR
|
||||||
tlen += taosEncodeFixedI64(buf, pReq->newConsumerId);
|
tlen += taosEncodeFixedI64(buf, pReq->newConsumerId);
|
||||||
tlen += taosEncodeString(buf, pReq->subKey);
|
tlen += taosEncodeString(buf, pReq->subKey);
|
||||||
tlen += taosEncodeFixedI8(buf, pReq->subType);
|
tlen += taosEncodeFixedI8(buf, pReq->subType);
|
||||||
tlen += taosEncodeFixedI8(buf, pReq->withTbName);
|
// tlen += taosEncodeFixedI8(buf, pReq->withTbName);
|
||||||
tlen += taosEncodeFixedI8(buf, pReq->withSchema);
|
// tlen += taosEncodeFixedI8(buf, pReq->withSchema);
|
||||||
tlen += taosEncodeFixedI8(buf, pReq->withTag);
|
// tlen += taosEncodeFixedI8(buf, pReq->withTag);
|
||||||
if (pReq->subType == TOPIC_SUB_TYPE__TABLE) {
|
if (pReq->subType == TOPIC_SUB_TYPE__COLUMN) {
|
||||||
tlen += taosEncodeString(buf, pReq->qmsg);
|
tlen += taosEncodeString(buf, pReq->qmsg);
|
||||||
}
|
}
|
||||||
return tlen;
|
return tlen;
|
||||||
|
@ -2200,10 +2200,10 @@ static FORCE_INLINE void* tDecodeSMqRebVgReq(const void* buf, SMqRebVgReq* pReq)
|
||||||
buf = taosDecodeFixedI64(buf, &pReq->newConsumerId);
|
buf = taosDecodeFixedI64(buf, &pReq->newConsumerId);
|
||||||
buf = taosDecodeStringTo(buf, pReq->subKey);
|
buf = taosDecodeStringTo(buf, pReq->subKey);
|
||||||
buf = taosDecodeFixedI8(buf, &pReq->subType);
|
buf = taosDecodeFixedI8(buf, &pReq->subType);
|
||||||
buf = taosDecodeFixedI8(buf, &pReq->withTbName);
|
// buf = taosDecodeFixedI8(buf, &pReq->withTbName);
|
||||||
buf = taosDecodeFixedI8(buf, &pReq->withSchema);
|
// buf = taosDecodeFixedI8(buf, &pReq->withSchema);
|
||||||
buf = taosDecodeFixedI8(buf, &pReq->withTag);
|
// buf = taosDecodeFixedI8(buf, &pReq->withTag);
|
||||||
if (pReq->subType == TOPIC_SUB_TYPE__TABLE) {
|
if (pReq->subType == TOPIC_SUB_TYPE__COLUMN) {
|
||||||
buf = taosDecodeString(buf, &pReq->qmsg);
|
buf = taosDecodeString(buf, &pReq->qmsg);
|
||||||
}
|
}
|
||||||
return (void*)buf;
|
return (void*)buf;
|
||||||
|
|
|
@ -454,17 +454,17 @@ int32_t tEncodeSMqOffsetObj(void** buf, const SMqOffsetObj* pOffset);
|
||||||
void* tDecodeSMqOffsetObj(void* buf, SMqOffsetObj* pOffset);
|
void* tDecodeSMqOffsetObj(void* buf, SMqOffsetObj* pOffset);
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
char name[TSDB_TOPIC_FNAME_LEN];
|
char name[TSDB_TOPIC_FNAME_LEN];
|
||||||
char db[TSDB_DB_FNAME_LEN];
|
char db[TSDB_DB_FNAME_LEN];
|
||||||
int64_t createTime;
|
int64_t createTime;
|
||||||
int64_t updateTime;
|
int64_t updateTime;
|
||||||
int64_t uid;
|
int64_t uid;
|
||||||
int64_t dbUid;
|
int64_t dbUid;
|
||||||
int32_t version;
|
int32_t version;
|
||||||
int8_t subType; // db or table
|
int8_t subType; // column, db or stable
|
||||||
int8_t withTbName;
|
// int8_t withTbName;
|
||||||
int8_t withSchema;
|
// int8_t withSchema;
|
||||||
int8_t withTag;
|
// int8_t withTag;
|
||||||
SRWLatch lock;
|
SRWLatch lock;
|
||||||
int32_t consumerCnt;
|
int32_t consumerCnt;
|
||||||
int32_t sqlLen;
|
int32_t sqlLen;
|
||||||
|
@ -527,14 +527,14 @@ int32_t tEncodeSMqConsumerEp(void** buf, const SMqConsumerEp* pEp);
|
||||||
void* tDecodeSMqConsumerEp(const void* buf, SMqConsumerEp* pEp);
|
void* tDecodeSMqConsumerEp(const void* buf, SMqConsumerEp* pEp);
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
char key[TSDB_SUBSCRIBE_KEY_LEN];
|
char key[TSDB_SUBSCRIBE_KEY_LEN];
|
||||||
SRWLatch lock;
|
SRWLatch lock;
|
||||||
int64_t dbUid;
|
int64_t dbUid;
|
||||||
int32_t vgNum;
|
int32_t vgNum;
|
||||||
int8_t subType;
|
int8_t subType;
|
||||||
int8_t withTbName;
|
// int8_t withTbName;
|
||||||
int8_t withSchema;
|
// int8_t withSchema;
|
||||||
int8_t withTag;
|
// int8_t withTag;
|
||||||
SHashObj* consumerHash; // consumerId -> SMqConsumerEp
|
SHashObj* consumerHash; // consumerId -> SMqConsumerEp
|
||||||
SArray* unassignedVgs; // SArray<SMqVgEp*>
|
SArray* unassignedVgs; // SArray<SMqVgEp*>
|
||||||
} SMqSubscribeObj;
|
} SMqSubscribeObj;
|
||||||
|
|
|
@ -396,9 +396,9 @@ SMqSubscribeObj *tCloneSubscribeObj(const SMqSubscribeObj *pSub) {
|
||||||
|
|
||||||
pSubNew->dbUid = pSub->dbUid;
|
pSubNew->dbUid = pSub->dbUid;
|
||||||
pSubNew->subType = pSub->subType;
|
pSubNew->subType = pSub->subType;
|
||||||
pSubNew->withTbName = pSub->withTbName;
|
/*pSubNew->withTbName = pSub->withTbName;*/
|
||||||
pSubNew->withSchema = pSub->withSchema;
|
/*pSubNew->withSchema = pSub->withSchema;*/
|
||||||
pSubNew->withTag = pSub->withTag;
|
/*pSubNew->withTag = pSub->withTag;*/
|
||||||
|
|
||||||
pSubNew->vgNum = pSub->vgNum;
|
pSubNew->vgNum = pSub->vgNum;
|
||||||
pSubNew->consumerHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
|
pSubNew->consumerHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
|
||||||
|
@ -431,9 +431,9 @@ int32_t tEncodeSubscribeObj(void **buf, const SMqSubscribeObj *pSub) {
|
||||||
tlen += taosEncodeFixedI64(buf, pSub->dbUid);
|
tlen += taosEncodeFixedI64(buf, pSub->dbUid);
|
||||||
tlen += taosEncodeFixedI32(buf, pSub->vgNum);
|
tlen += taosEncodeFixedI32(buf, pSub->vgNum);
|
||||||
tlen += taosEncodeFixedI8(buf, pSub->subType);
|
tlen += taosEncodeFixedI8(buf, pSub->subType);
|
||||||
tlen += taosEncodeFixedI8(buf, pSub->withTbName);
|
/*tlen += taosEncodeFixedI8(buf, pSub->withTbName);*/
|
||||||
tlen += taosEncodeFixedI8(buf, pSub->withSchema);
|
/*tlen += taosEncodeFixedI8(buf, pSub->withSchema);*/
|
||||||
tlen += taosEncodeFixedI8(buf, pSub->withTag);
|
/*tlen += taosEncodeFixedI8(buf, pSub->withTag);*/
|
||||||
|
|
||||||
void *pIter = NULL;
|
void *pIter = NULL;
|
||||||
int32_t sz = taosHashGetSize(pSub->consumerHash);
|
int32_t sz = taosHashGetSize(pSub->consumerHash);
|
||||||
|
@ -458,9 +458,9 @@ void *tDecodeSubscribeObj(const void *buf, SMqSubscribeObj *pSub) {
|
||||||
buf = taosDecodeFixedI64(buf, &pSub->dbUid);
|
buf = taosDecodeFixedI64(buf, &pSub->dbUid);
|
||||||
buf = taosDecodeFixedI32(buf, &pSub->vgNum);
|
buf = taosDecodeFixedI32(buf, &pSub->vgNum);
|
||||||
buf = taosDecodeFixedI8(buf, &pSub->subType);
|
buf = taosDecodeFixedI8(buf, &pSub->subType);
|
||||||
buf = taosDecodeFixedI8(buf, &pSub->withTbName);
|
/*buf = taosDecodeFixedI8(buf, &pSub->withTbName);*/
|
||||||
buf = taosDecodeFixedI8(buf, &pSub->withSchema);
|
/*buf = taosDecodeFixedI8(buf, &pSub->withSchema);*/
|
||||||
buf = taosDecodeFixedI8(buf, &pSub->withTag);
|
/*buf = taosDecodeFixedI8(buf, &pSub->withTag);*/
|
||||||
|
|
||||||
int32_t sz;
|
int32_t sz;
|
||||||
buf = taosDecodeFixedI32(buf, &sz);
|
buf = taosDecodeFixedI32(buf, &sz);
|
||||||
|
|
|
@ -506,7 +506,7 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib
|
||||||
SQueryPlan* pPlan = NULL;
|
SQueryPlan* pPlan = NULL;
|
||||||
SSubplan* plan = NULL;
|
SSubplan* plan = NULL;
|
||||||
|
|
||||||
if (pTopic->subType == TOPIC_SUB_TYPE__TABLE) {
|
if (pTopic->subType == TOPIC_SUB_TYPE__COLUMN) {
|
||||||
pPlan = qStringToQueryPlan(pTopic->physicalPlan);
|
pPlan = qStringToQueryPlan(pTopic->physicalPlan);
|
||||||
if (pPlan == NULL) {
|
if (pPlan == NULL) {
|
||||||
terrno = TSDB_CODE_QRY_INVALID_INPUT;
|
terrno = TSDB_CODE_QRY_INVALID_INPUT;
|
||||||
|
@ -552,7 +552,7 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib
|
||||||
|
|
||||||
mDebug("init subscription %s, assign vg: %d", pSub->key, pVgEp->vgId);
|
mDebug("init subscription %s, assign vg: %d", pSub->key, pVgEp->vgId);
|
||||||
|
|
||||||
if (pTopic->subType == TOPIC_SUB_TYPE__TABLE) {
|
if (pTopic->subType == TOPIC_SUB_TYPE__COLUMN) {
|
||||||
int32_t msgLen;
|
int32_t msgLen;
|
||||||
|
|
||||||
plan->execNode.epSet = pVgEp->epSet;
|
plan->execNode.epSet = pVgEp->epSet;
|
||||||
|
|
|
@ -93,9 +93,9 @@ static SMqSubscribeObj *mndCreateSub(SMnode *pMnode, const SMqTopicObj *pTopic,
|
||||||
}
|
}
|
||||||
pSub->dbUid = pTopic->dbUid;
|
pSub->dbUid = pTopic->dbUid;
|
||||||
pSub->subType = pTopic->subType;
|
pSub->subType = pTopic->subType;
|
||||||
pSub->withTbName = pTopic->withTbName;
|
/*pSub->withTbName = pTopic->withTbName;*/
|
||||||
pSub->withSchema = pTopic->withSchema;
|
/*pSub->withSchema = pTopic->withSchema;*/
|
||||||
pSub->withTag = pTopic->withTag;
|
/*pSub->withTag = pTopic->withTag;*/
|
||||||
|
|
||||||
ASSERT(pSub->unassignedVgs->size == 0);
|
ASSERT(pSub->unassignedVgs->size == 0);
|
||||||
ASSERT(taosHashGetSize(pSub->consumerHash) == 0);
|
ASSERT(taosHashGetSize(pSub->consumerHash) == 0);
|
||||||
|
@ -120,9 +120,9 @@ static int32_t mndBuildSubChangeReq(void **pBuf, int32_t *pLen, const SMqSubscri
|
||||||
req.vgId = pRebVg->pVgEp->vgId;
|
req.vgId = pRebVg->pVgEp->vgId;
|
||||||
req.qmsg = pRebVg->pVgEp->qmsg;
|
req.qmsg = pRebVg->pVgEp->qmsg;
|
||||||
req.subType = pSub->subType;
|
req.subType = pSub->subType;
|
||||||
req.withTbName = pSub->withTbName;
|
/*req.withTbName = pSub->withTbName;*/
|
||||||
req.withSchema = pSub->withSchema;
|
/*req.withSchema = pSub->withSchema;*/
|
||||||
req.withTag = pSub->withTag;
|
/*req.withTag = pSub->withTag;*/
|
||||||
strncpy(req.subKey, pSub->key, TSDB_SUBSCRIBE_KEY_LEN);
|
strncpy(req.subKey, pSub->key, TSDB_SUBSCRIBE_KEY_LEN);
|
||||||
|
|
||||||
int32_t tlen = sizeof(SMsgHead) + tEncodeSMqRebVgReq(NULL, &req);
|
int32_t tlen = sizeof(SMsgHead) + tEncodeSMqRebVgReq(NULL, &req);
|
||||||
|
|
|
@ -96,9 +96,9 @@ SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) {
|
||||||
SDB_SET_INT64(pRaw, dataPos, pTopic->dbUid, TOPIC_ENCODE_OVER);
|
SDB_SET_INT64(pRaw, dataPos, pTopic->dbUid, TOPIC_ENCODE_OVER);
|
||||||
SDB_SET_INT32(pRaw, dataPos, pTopic->version, TOPIC_ENCODE_OVER);
|
SDB_SET_INT32(pRaw, dataPos, pTopic->version, TOPIC_ENCODE_OVER);
|
||||||
SDB_SET_INT8(pRaw, dataPos, pTopic->subType, TOPIC_ENCODE_OVER);
|
SDB_SET_INT8(pRaw, dataPos, pTopic->subType, TOPIC_ENCODE_OVER);
|
||||||
SDB_SET_INT8(pRaw, dataPos, pTopic->withTbName, TOPIC_ENCODE_OVER);
|
/*SDB_SET_INT8(pRaw, dataPos, pTopic->withTbName, TOPIC_ENCODE_OVER);*/
|
||||||
SDB_SET_INT8(pRaw, dataPos, pTopic->withSchema, TOPIC_ENCODE_OVER);
|
/*SDB_SET_INT8(pRaw, dataPos, pTopic->withSchema, TOPIC_ENCODE_OVER);*/
|
||||||
SDB_SET_INT8(pRaw, dataPos, pTopic->withTag, TOPIC_ENCODE_OVER);
|
/*SDB_SET_INT8(pRaw, dataPos, pTopic->withTag, TOPIC_ENCODE_OVER);*/
|
||||||
|
|
||||||
SDB_SET_INT32(pRaw, dataPos, pTopic->consumerCnt, TOPIC_ENCODE_OVER);
|
SDB_SET_INT32(pRaw, dataPos, pTopic->consumerCnt, TOPIC_ENCODE_OVER);
|
||||||
SDB_SET_INT32(pRaw, dataPos, pTopic->sqlLen, TOPIC_ENCODE_OVER);
|
SDB_SET_INT32(pRaw, dataPos, pTopic->sqlLen, TOPIC_ENCODE_OVER);
|
||||||
|
@ -168,9 +168,9 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) {
|
||||||
SDB_GET_INT64(pRaw, dataPos, &pTopic->dbUid, TOPIC_DECODE_OVER);
|
SDB_GET_INT64(pRaw, dataPos, &pTopic->dbUid, TOPIC_DECODE_OVER);
|
||||||
SDB_GET_INT32(pRaw, dataPos, &pTopic->version, TOPIC_DECODE_OVER);
|
SDB_GET_INT32(pRaw, dataPos, &pTopic->version, TOPIC_DECODE_OVER);
|
||||||
SDB_GET_INT8(pRaw, dataPos, &pTopic->subType, TOPIC_DECODE_OVER);
|
SDB_GET_INT8(pRaw, dataPos, &pTopic->subType, TOPIC_DECODE_OVER);
|
||||||
SDB_GET_INT8(pRaw, dataPos, &pTopic->withTbName, TOPIC_DECODE_OVER);
|
/*SDB_GET_INT8(pRaw, dataPos, &pTopic->withTbName, TOPIC_DECODE_OVER);*/
|
||||||
SDB_GET_INT8(pRaw, dataPos, &pTopic->withSchema, TOPIC_DECODE_OVER);
|
/*SDB_GET_INT8(pRaw, dataPos, &pTopic->withSchema, TOPIC_DECODE_OVER);*/
|
||||||
SDB_GET_INT8(pRaw, dataPos, &pTopic->withTag, TOPIC_DECODE_OVER);
|
/*SDB_GET_INT8(pRaw, dataPos, &pTopic->withTag, TOPIC_DECODE_OVER);*/
|
||||||
|
|
||||||
SDB_GET_INT32(pRaw, dataPos, &pTopic->consumerCnt, TOPIC_DECODE_OVER);
|
SDB_GET_INT32(pRaw, dataPos, &pTopic->consumerCnt, TOPIC_DECODE_OVER);
|
||||||
|
|
||||||
|
@ -308,11 +308,19 @@ static SDDropTopicReq *mndBuildDropTopicMsg(SMnode *pMnode, SVgObj *pVgroup, SMq
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndCheckCreateTopicReq(SCMCreateTopicReq *pCreate) {
|
static int32_t mndCheckCreateTopicReq(SCMCreateTopicReq *pCreate) {
|
||||||
if (pCreate->name[0] == 0 || pCreate->sql == NULL || pCreate->sql[0] == 0 || pCreate->subDbName[0] == 0) {
|
terrno = TSDB_CODE_MND_INVALID_TOPIC;
|
||||||
terrno = TSDB_CODE_MND_INVALID_TOPIC;
|
|
||||||
return -1;
|
if (pCreate->sql == NULL) return -1;
|
||||||
|
|
||||||
|
if (pCreate->subType == TOPIC_SUB_TYPE__COLUMN) {
|
||||||
|
if (pCreate->ast == NULL || pCreate->ast[0] == 0) return -1;
|
||||||
|
} else if (pCreate->subType == TOPIC_SUB_TYPE__TABLE) {
|
||||||
|
if (pCreate->subStbName[0] == 0) return -1;
|
||||||
|
} else if (pCreate->subType == TOPIC_SUB_TYPE__DB) {
|
||||||
|
if (pCreate->subDbName[0] == 0) return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
terrno = TSDB_CODE_SUCCESS;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -328,12 +336,11 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
|
||||||
topicObj.version = 1;
|
topicObj.version = 1;
|
||||||
topicObj.sql = strdup(pCreate->sql);
|
topicObj.sql = strdup(pCreate->sql);
|
||||||
topicObj.sqlLen = strlen(pCreate->sql) + 1;
|
topicObj.sqlLen = strlen(pCreate->sql) + 1;
|
||||||
/*topicObj.refConsumerCnt = 0;*/
|
topicObj.subType = pCreate->subType;
|
||||||
|
|
||||||
if (pCreate->ast && pCreate->ast[0]) {
|
if (pCreate->subType == TOPIC_SUB_TYPE__COLUMN) {
|
||||||
topicObj.ast = strdup(pCreate->ast);
|
topicObj.ast = strdup(pCreate->ast);
|
||||||
topicObj.astLen = strlen(pCreate->ast) + 1;
|
topicObj.astLen = strlen(pCreate->ast) + 1;
|
||||||
topicObj.subType = TOPIC_SUB_TYPE__TABLE;
|
|
||||||
/*topicObj.withTbName = pCreate->withTbName;*/
|
/*topicObj.withTbName = pCreate->withTbName;*/
|
||||||
/*topicObj.withSchema = pCreate->withSchema;*/
|
/*topicObj.withSchema = pCreate->withSchema;*/
|
||||||
|
|
||||||
|
@ -368,13 +375,12 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
|
||||||
taosMemoryFree(topicObj.sql);
|
taosMemoryFree(topicObj.sql);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
} else {
|
/*} else if (pCreate->subType == TOPIC_SUB_TYPE__DB) {*/
|
||||||
topicObj.ast = NULL;
|
/*topicObj.ast = NULL;*/
|
||||||
topicObj.astLen = 0;
|
/*topicObj.astLen = 0;*/
|
||||||
topicObj.physicalPlan = NULL;
|
/*topicObj.physicalPlan = NULL;*/
|
||||||
topicObj.subType = TOPIC_SUB_TYPE__DB;
|
/*topicObj.withTbName = 1;*/
|
||||||
topicObj.withTbName = 1;
|
/*topicObj.withSchema = 1;*/
|
||||||
topicObj.withSchema = 1;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_TOPIC, pReq);
|
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_TOPIC, pReq);
|
||||||
|
|
|
@ -51,6 +51,7 @@ target_sources(
|
||||||
|
|
||||||
# tq
|
# tq
|
||||||
"src/tq/tq.c"
|
"src/tq/tq.c"
|
||||||
|
"src/tq/tqExec.c"
|
||||||
"src/tq/tqCommit.c"
|
"src/tq/tqCommit.c"
|
||||||
"src/tq/tqOffset.c"
|
"src/tq/tqOffset.c"
|
||||||
"src/tq/tqPush.c"
|
"src/tq/tqPush.c"
|
||||||
|
|
|
@ -44,21 +44,27 @@ extern "C" {
|
||||||
typedef struct STqOffsetCfg STqOffsetCfg;
|
typedef struct STqOffsetCfg STqOffsetCfg;
|
||||||
typedef struct STqOffsetStore STqOffsetStore;
|
typedef struct STqOffsetStore STqOffsetStore;
|
||||||
|
|
||||||
|
// tqRead
|
||||||
|
|
||||||
struct STqReadHandle {
|
struct STqReadHandle {
|
||||||
int64_t ver;
|
int64_t ver;
|
||||||
SHashObj* tbIdHash;
|
|
||||||
const SSubmitReq* pMsg;
|
const SSubmitReq* pMsg;
|
||||||
SSubmitBlk* pBlock;
|
SSubmitBlk* pBlock;
|
||||||
SSubmitMsgIter msgIter;
|
SSubmitMsgIter msgIter;
|
||||||
SSubmitBlkIter blkIter;
|
SSubmitBlkIter blkIter;
|
||||||
SMeta* pVnodeMeta;
|
|
||||||
SArray* pColIdList; // SArray<int16_t>
|
SMeta* pVnodeMeta;
|
||||||
int32_t sver;
|
SHashObj* tbIdHash;
|
||||||
int64_t cachedSchemaUid;
|
SArray* pColIdList; // SArray<int16_t>
|
||||||
SSchemaWrapper* pSchemaWrapper;
|
|
||||||
STSchema* pSchema;
|
int32_t cachedSchemaVer;
|
||||||
|
int64_t cachedSchemaUid;
|
||||||
|
SSchemaWrapper* pSchemaWrapper;
|
||||||
|
STSchema* pSchema;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// tqPush
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int64_t consumerId;
|
int64_t consumerId;
|
||||||
int32_t epoch;
|
int32_t epoch;
|
||||||
|
@ -68,14 +74,15 @@ typedef struct {
|
||||||
SRpcMsg* handle;
|
SRpcMsg* handle;
|
||||||
} STqPushHandle;
|
} STqPushHandle;
|
||||||
|
|
||||||
|
#if 0
|
||||||
typedef struct {
|
typedef struct {
|
||||||
char subKey[TSDB_SUBSCRIBE_KEY_LEN];
|
char subKey[TSDB_SUBSCRIBE_KEY_LEN];
|
||||||
int64_t consumerId;
|
int64_t consumerId;
|
||||||
int32_t epoch;
|
int32_t epoch;
|
||||||
int8_t subType;
|
int8_t subType;
|
||||||
int8_t withTbName;
|
// int8_t withTbName;
|
||||||
int8_t withSchema;
|
// int8_t withSchema;
|
||||||
int8_t withTag;
|
// int8_t withTag;
|
||||||
char* qmsg;
|
char* qmsg;
|
||||||
SHashObj* pDropTbUid;
|
SHashObj* pDropTbUid;
|
||||||
STqPushHandle pushHandle;
|
STqPushHandle pushHandle;
|
||||||
|
@ -85,15 +92,55 @@ typedef struct {
|
||||||
STqReadHandle* pExecReader[5];
|
STqReadHandle* pExecReader[5];
|
||||||
qTaskInfo_t task[5];
|
qTaskInfo_t task[5];
|
||||||
} STqExec;
|
} STqExec;
|
||||||
|
#endif
|
||||||
|
|
||||||
int32_t tEncodeSTqExec(SEncoder* pEncoder, const STqExec* pExec);
|
// tqExec
|
||||||
int32_t tDecodeSTqExec(SDecoder* pDecoder, STqExec* pExec);
|
|
||||||
|
typedef struct {
|
||||||
|
char* qmsg;
|
||||||
|
qTaskInfo_t task[5];
|
||||||
|
} STqExecCol;
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
int64_t suid;
|
||||||
|
} STqExecTb;
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
SHashObj* pFilterOutTbUid;
|
||||||
|
} STqExecDb;
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
int8_t subType;
|
||||||
|
|
||||||
|
STqReadHandle* pExecReader[5];
|
||||||
|
union {
|
||||||
|
STqExecCol execCol;
|
||||||
|
STqExecTb execTb;
|
||||||
|
STqExecDb execDb;
|
||||||
|
} exec;
|
||||||
|
} STqExecHandle;
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
// info
|
||||||
|
char subKey[TSDB_SUBSCRIBE_KEY_LEN];
|
||||||
|
int64_t consumerId;
|
||||||
|
int32_t epoch;
|
||||||
|
|
||||||
|
// reader
|
||||||
|
SWalReadHandle* pWalReader;
|
||||||
|
|
||||||
|
// push
|
||||||
|
STqPushHandle pushHandle;
|
||||||
|
|
||||||
|
// exec
|
||||||
|
STqExecHandle execHandle;
|
||||||
|
} STqHandle;
|
||||||
|
|
||||||
struct STQ {
|
struct STQ {
|
||||||
char* path;
|
char* path;
|
||||||
SHashObj* pushMgr; // consumerId -> STqExec*
|
SHashObj* pushMgr; // consumerId -> STqHandle*
|
||||||
SHashObj* execs; // subKey -> STqExec
|
SHashObj* handles; // subKey -> STqHandle
|
||||||
SHashObj* pStreamTasks;
|
SHashObj* pStreamTasks; // taksId -> SStreamTask
|
||||||
SVnode* pVnode;
|
SVnode* pVnode;
|
||||||
SWal* pWal;
|
SWal* pWal;
|
||||||
TDB* pMetaStore;
|
TDB* pMetaStore;
|
||||||
|
@ -111,6 +158,16 @@ static STqMgmt tqMgmt = {0};
|
||||||
int tqInit();
|
int tqInit();
|
||||||
void tqCleanUp();
|
void tqCleanUp();
|
||||||
|
|
||||||
|
// int32_t tEncodeSTqExec(SEncoder* pEncoder, const STqExec* pExec);
|
||||||
|
// int32_t tDecodeSTqExec(SDecoder* pDecoder, STqExec* pExec);
|
||||||
|
|
||||||
|
int32_t tEncodeSTqHandle(SEncoder* pEncoder, const STqHandle* pHandle);
|
||||||
|
int32_t tDecodeSTqHandle(SDecoder* pDecoder, STqHandle* pHandle);
|
||||||
|
|
||||||
|
int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalHead** pHeadWithCkSum);
|
||||||
|
|
||||||
|
int32_t tqDataExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataBlkRsp* pRsp, int32_t workerId);
|
||||||
|
|
||||||
// tqOffset
|
// tqOffset
|
||||||
STqOffsetStore* STqOffsetOpen(STqOffsetCfg*);
|
STqOffsetStore* STqOffsetOpen(STqOffsetCfg*);
|
||||||
void STqOffsetClose(STqOffsetStore*);
|
void STqOffsetClose(STqOffsetStore*);
|
||||||
|
|
|
@ -51,10 +51,10 @@ int tqExecKeyCompare(const void* pKey1, int32_t kLen1, const void* pKey2, int32_
|
||||||
return strcmp(pKey1, pKey2);
|
return strcmp(pKey1, pKey2);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqStoreExec(STQ* pTq, const char* key, const STqExec* pExec) {
|
int32_t tqStoreHandle(STQ* pTq, const char* key, const STqHandle* pHandle) {
|
||||||
int32_t code;
|
int32_t code;
|
||||||
int32_t vlen;
|
int32_t vlen;
|
||||||
tEncodeSize(tEncodeSTqExec, pExec, vlen, code);
|
tEncodeSize(tEncodeSTqHandle, pHandle, vlen, code);
|
||||||
ASSERT(code == 0);
|
ASSERT(code == 0);
|
||||||
|
|
||||||
void* buf = taosMemoryCalloc(1, vlen);
|
void* buf = taosMemoryCalloc(1, vlen);
|
||||||
|
@ -65,7 +65,7 @@ int32_t tqStoreExec(STQ* pTq, const char* key, const STqExec* pExec) {
|
||||||
SEncoder encoder;
|
SEncoder encoder;
|
||||||
tEncoderInit(&encoder, buf, vlen);
|
tEncoderInit(&encoder, buf, vlen);
|
||||||
|
|
||||||
if (tEncodeSTqExec(&encoder, pExec) < 0) {
|
if (tEncodeSTqHandle(&encoder, pHandle) < 0) {
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -102,7 +102,7 @@ STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal) {
|
||||||
pTq->pVnode = pVnode;
|
pTq->pVnode = pVnode;
|
||||||
pTq->pWal = pWal;
|
pTq->pWal = pWal;
|
||||||
|
|
||||||
pTq->execs = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
|
pTq->handles = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
|
||||||
|
|
||||||
pTq->pStreamTasks = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
|
pTq->pStreamTasks = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
|
||||||
|
|
||||||
|
@ -112,7 +112,7 @@ STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal) {
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (tdbTbOpen("exec", -1, -1, tqExecKeyCompare, pTq->pMetaStore, &pTq->pExecStore) < 0) {
|
if (tdbTbOpen("handles", -1, -1, tqExecKeyCompare, pTq->pMetaStore, &pTq->pExecStore) < 0) {
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -134,30 +134,31 @@ STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal) {
|
||||||
|
|
||||||
tdbTbcMoveToFirst(pCur);
|
tdbTbcMoveToFirst(pCur);
|
||||||
SDecoder decoder;
|
SDecoder decoder;
|
||||||
while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) {
|
|
||||||
STqExec exec;
|
|
||||||
tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
|
|
||||||
tDecodeSTqExec(&decoder, &exec);
|
|
||||||
exec.pWalReader = walOpenReadHandle(pTq->pVnode->pWal);
|
|
||||||
if (exec.subType == TOPIC_SUB_TYPE__TABLE) {
|
|
||||||
for (int32_t i = 0; i < 5; i++) {
|
|
||||||
exec.pExecReader[i] = tqInitSubmitMsgScanner(pTq->pVnode->pMeta);
|
|
||||||
|
|
||||||
SReadHandle handle = {
|
while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) {
|
||||||
.reader = exec.pExecReader[i],
|
STqHandle handle;
|
||||||
|
tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
|
||||||
|
tDecodeSTqHandle(&decoder, &handle);
|
||||||
|
handle.pWalReader = walOpenReadHandle(pTq->pVnode->pWal);
|
||||||
|
for (int32_t i = 0; i < 5; i++) {
|
||||||
|
handle.execHandle.pExecReader[i] = tqInitSubmitMsgScanner(pTq->pVnode->pMeta);
|
||||||
|
}
|
||||||
|
if (handle.execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
|
||||||
|
for (int32_t i = 0; i < 5; i++) {
|
||||||
|
SReadHandle reader = {
|
||||||
|
.reader = handle.execHandle.pExecReader[i],
|
||||||
.meta = pTq->pVnode->pMeta,
|
.meta = pTq->pVnode->pMeta,
|
||||||
.pMsgCb = &pTq->pVnode->msgCb,
|
.pMsgCb = &pTq->pVnode->msgCb,
|
||||||
};
|
};
|
||||||
exec.task[i] = qCreateStreamExecTaskInfo(exec.qmsg, &handle);
|
handle.execHandle.exec.execCol.task[i] =
|
||||||
ASSERT(exec.task[i]);
|
qCreateStreamExecTaskInfo(handle.execHandle.exec.execCol.qmsg, &reader);
|
||||||
|
ASSERT(handle.execHandle.exec.execCol.task[i]);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
for (int32_t i = 0; i < 5; i++) {
|
handle.execHandle.exec.execDb.pFilterOutTbUid =
|
||||||
exec.pExecReader[i] = tqInitSubmitMsgScanner(pTq->pVnode->pMeta);
|
taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
|
||||||
}
|
|
||||||
exec.pDropTbUid = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
|
|
||||||
}
|
}
|
||||||
taosHashPut(pTq->execs, pKey, kLen, &exec, sizeof(STqExec));
|
taosHashPut(pTq->handles, pKey, kLen, &handle, sizeof(STqHandle));
|
||||||
}
|
}
|
||||||
|
|
||||||
if (tdbTxnClose(&txn) < 0) {
|
if (tdbTxnClose(&txn) < 0) {
|
||||||
|
@ -170,7 +171,7 @@ STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal) {
|
||||||
void tqClose(STQ* pTq) {
|
void tqClose(STQ* pTq) {
|
||||||
if (pTq) {
|
if (pTq) {
|
||||||
taosMemoryFreeClear(pTq->path);
|
taosMemoryFreeClear(pTq->path);
|
||||||
taosHashCleanup(pTq->execs);
|
taosHashCleanup(pTq->handles);
|
||||||
taosHashCleanup(pTq->pStreamTasks);
|
taosHashCleanup(pTq->pStreamTasks);
|
||||||
taosHashCleanup(pTq->pushMgr);
|
taosHashCleanup(pTq->pushMgr);
|
||||||
tdbClose(pTq->pMetaStore);
|
tdbClose(pTq->pMetaStore);
|
||||||
|
@ -179,16 +180,17 @@ void tqClose(STQ* pTq) {
|
||||||
// TODO
|
// TODO
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#if 0
|
||||||
int32_t tEncodeSTqExec(SEncoder* pEncoder, const STqExec* pExec) {
|
int32_t tEncodeSTqExec(SEncoder* pEncoder, const STqExec* pExec) {
|
||||||
if (tStartEncode(pEncoder) < 0) return -1;
|
if (tStartEncode(pEncoder) < 0) return -1;
|
||||||
if (tEncodeCStr(pEncoder, pExec->subKey) < 0) return -1;
|
if (tEncodeCStr(pEncoder, pExec->subKey) < 0) return -1;
|
||||||
if (tEncodeI64(pEncoder, pExec->consumerId) < 0) return -1;
|
if (tEncodeI64(pEncoder, pExec->consumerId) < 0) return -1;
|
||||||
if (tEncodeI32(pEncoder, pExec->epoch) < 0) return -1;
|
if (tEncodeI32(pEncoder, pExec->epoch) < 0) return -1;
|
||||||
if (tEncodeI8(pEncoder, pExec->subType) < 0) return -1;
|
if (tEncodeI8(pEncoder, pExec->subType) < 0) return -1;
|
||||||
if (tEncodeI8(pEncoder, pExec->withTbName) < 0) return -1;
|
/*if (tEncodeI8(pEncoder, pExec->withTbName) < 0) return -1;*/
|
||||||
if (tEncodeI8(pEncoder, pExec->withSchema) < 0) return -1;
|
/*if (tEncodeI8(pEncoder, pExec->withSchema) < 0) return -1;*/
|
||||||
if (tEncodeI8(pEncoder, pExec->withTag) < 0) return -1;
|
/*if (tEncodeI8(pEncoder, pExec->withTag) < 0) return -1;*/
|
||||||
if (pExec->subType == TOPIC_SUB_TYPE__TABLE) {
|
if (pExec->subType == TOPIC_SUB_TYPE__COLUMN) {
|
||||||
if (tEncodeCStr(pEncoder, pExec->qmsg) < 0) return -1;
|
if (tEncodeCStr(pEncoder, pExec->qmsg) < 0) return -1;
|
||||||
}
|
}
|
||||||
tEndEncode(pEncoder);
|
tEndEncode(pEncoder);
|
||||||
|
@ -201,34 +203,64 @@ int32_t tDecodeSTqExec(SDecoder* pDecoder, STqExec* pExec) {
|
||||||
if (tDecodeI64(pDecoder, &pExec->consumerId) < 0) return -1;
|
if (tDecodeI64(pDecoder, &pExec->consumerId) < 0) return -1;
|
||||||
if (tDecodeI32(pDecoder, &pExec->epoch) < 0) return -1;
|
if (tDecodeI32(pDecoder, &pExec->epoch) < 0) return -1;
|
||||||
if (tDecodeI8(pDecoder, &pExec->subType) < 0) return -1;
|
if (tDecodeI8(pDecoder, &pExec->subType) < 0) return -1;
|
||||||
if (tDecodeI8(pDecoder, &pExec->withTbName) < 0) return -1;
|
/*if (tDecodeI8(pDecoder, &pExec->withTbName) < 0) return -1;*/
|
||||||
if (tDecodeI8(pDecoder, &pExec->withSchema) < 0) return -1;
|
/*if (tDecodeI8(pDecoder, &pExec->withSchema) < 0) return -1;*/
|
||||||
if (tDecodeI8(pDecoder, &pExec->withTag) < 0) return -1;
|
/*if (tDecodeI8(pDecoder, &pExec->withTag) < 0) return -1;*/
|
||||||
if (pExec->subType == TOPIC_SUB_TYPE__TABLE) {
|
if (pExec->subType == TOPIC_SUB_TYPE__COLUMN) {
|
||||||
if (tDecodeCStrAlloc(pDecoder, &pExec->qmsg) < 0) return -1;
|
if (tDecodeCStrAlloc(pDecoder, &pExec->qmsg) < 0) return -1;
|
||||||
}
|
}
|
||||||
tEndDecode(pDecoder);
|
tEndDecode(pDecoder);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
int32_t tEncodeSTqHandle(SEncoder* pEncoder, const STqHandle* pHandle) {
|
||||||
|
if (tStartEncode(pEncoder) < 0) return -1;
|
||||||
|
if (tEncodeCStr(pEncoder, pHandle->subKey) < 0) return -1;
|
||||||
|
if (tEncodeI64(pEncoder, pHandle->consumerId) < 0) return -1;
|
||||||
|
if (tEncodeI32(pEncoder, pHandle->epoch) < 0) return -1;
|
||||||
|
if (tEncodeI8(pEncoder, pHandle->execHandle.subType) < 0) return -1;
|
||||||
|
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
|
||||||
|
if (tEncodeCStr(pEncoder, pHandle->execHandle.exec.execCol.qmsg) < 0) return -1;
|
||||||
|
}
|
||||||
|
tEndEncode(pEncoder);
|
||||||
|
return pEncoder->pos;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tDecodeSTqHandle(SDecoder* pDecoder, STqHandle* pHandle) {
|
||||||
|
if (tStartDecode(pDecoder) < 0) return -1;
|
||||||
|
if (tDecodeCStrTo(pDecoder, pHandle->subKey) < 0) return -1;
|
||||||
|
if (tDecodeI64(pDecoder, &pHandle->consumerId) < 0) return -1;
|
||||||
|
if (tDecodeI32(pDecoder, &pHandle->epoch) < 0) return -1;
|
||||||
|
if (tDecodeI8(pDecoder, &pHandle->execHandle.subType) < 0) return -1;
|
||||||
|
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
|
||||||
|
if (tDecodeCStrAlloc(pDecoder, &pHandle->execHandle.exec.execCol.qmsg) < 0) return -1;
|
||||||
|
}
|
||||||
|
tEndDecode(pDecoder);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
|
int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
|
||||||
void* pIter = NULL;
|
void* pIter = NULL;
|
||||||
while (1) {
|
while (1) {
|
||||||
pIter = taosHashIterate(pTq->execs, pIter);
|
pIter = taosHashIterate(pTq->handles, pIter);
|
||||||
if (pIter == NULL) break;
|
if (pIter == NULL) break;
|
||||||
STqExec* pExec = (STqExec*)pIter;
|
STqHandle* pExec = (STqHandle*)pIter;
|
||||||
if (pExec->subType == TOPIC_SUB_TYPE__DB) {
|
if (pExec->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
|
||||||
|
for (int32_t i = 0; i < 5; i++) {
|
||||||
|
int32_t code = qUpdateQualifiedTableId(pExec->execHandle.exec.execCol.task[i], tbUidList, isAdd);
|
||||||
|
ASSERT(code == 0);
|
||||||
|
}
|
||||||
|
} else if (pExec->execHandle.subType == TOPIC_SUB_TYPE__DB) {
|
||||||
if (!isAdd) {
|
if (!isAdd) {
|
||||||
int32_t sz = taosArrayGetSize(tbUidList);
|
int32_t sz = taosArrayGetSize(tbUidList);
|
||||||
for (int32_t i = 0; i < sz; i++) {
|
for (int32_t i = 0; i < sz; i++) {
|
||||||
int64_t tbUid = *(int64_t*)taosArrayGet(tbUidList, i);
|
int64_t tbUid = *(int64_t*)taosArrayGet(tbUidList, i);
|
||||||
taosHashPut(pExec->pDropTbUid, &tbUid, sizeof(int64_t), NULL, 0);
|
taosHashPut(pExec->execHandle.exec.execDb.pFilterOutTbUid, &tbUid, sizeof(int64_t), NULL, 0);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
for (int32_t i = 0; i < 5; i++) {
|
// tq update id
|
||||||
int32_t code = qUpdateQualifiedTableId(pExec->task[i], tbUidList, isAdd);
|
|
||||||
ASSERT(code == 0);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
while (1) {
|
while (1) {
|
||||||
|
@ -246,7 +278,7 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
|
||||||
int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver, SRpcHandleInfo handleInfo) {
|
int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver, SRpcHandleInfo handleInfo) {
|
||||||
if (msgType != TDMT_VND_SUBMIT) return 0;
|
if (msgType != TDMT_VND_SUBMIT) return 0;
|
||||||
void* pIter = NULL;
|
void* pIter = NULL;
|
||||||
STqExec* pExec = NULL;
|
STqHandle* pHandle = NULL;
|
||||||
SSubmitReq* pReq = (SSubmitReq*)msg;
|
SSubmitReq* pReq = (SSubmitReq*)msg;
|
||||||
int32_t workerId = 4;
|
int32_t workerId = 4;
|
||||||
int64_t fetchOffset = ver;
|
int64_t fetchOffset = ver;
|
||||||
|
@ -254,84 +286,27 @@ int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_
|
||||||
while (1) {
|
while (1) {
|
||||||
pIter = taosHashIterate(pTq->pushMgr, pIter);
|
pIter = taosHashIterate(pTq->pushMgr, pIter);
|
||||||
if (pIter == NULL) break;
|
if (pIter == NULL) break;
|
||||||
pExec = *(STqExec**)pIter;
|
pHandle = *(STqHandle**)pIter;
|
||||||
|
|
||||||
taosWLockLatch(&pExec->pushHandle.lock);
|
taosWLockLatch(&pHandle->pushHandle.lock);
|
||||||
|
|
||||||
SRpcMsg* pMsg = atomic_load_ptr(&pExec->pushHandle.handle);
|
SRpcMsg* pMsg = atomic_load_ptr(&pHandle->pushHandle.handle);
|
||||||
ASSERT(pMsg);
|
ASSERT(pMsg);
|
||||||
|
|
||||||
SMqDataBlkRsp rsp = {0};
|
SMqDataBlkRsp rsp = {0};
|
||||||
rsp.reqOffset = pExec->pushHandle.reqOffset;
|
rsp.reqOffset = pHandle->pushHandle.reqOffset;
|
||||||
rsp.blockData = taosArrayInit(0, sizeof(void*));
|
rsp.blockData = taosArrayInit(0, sizeof(void*));
|
||||||
rsp.blockDataLen = taosArrayInit(0, sizeof(int32_t));
|
rsp.blockDataLen = taosArrayInit(0, sizeof(int32_t));
|
||||||
|
|
||||||
if (pExec->subType == TOPIC_SUB_TYPE__TABLE) {
|
if (msgType == TDMT_VND_SUBMIT) {
|
||||||
qTaskInfo_t task = pExec->task[workerId];
|
tqDataExec(pTq, &pHandle->execHandle, pReq, &rsp, workerId);
|
||||||
ASSERT(task);
|
|
||||||
qSetStreamInput(task, pReq, STREAM_DATA_TYPE_SUBMIT_BLOCK, false);
|
|
||||||
while (1) {
|
|
||||||
SSDataBlock* pDataBlock = NULL;
|
|
||||||
uint64_t ts = 0;
|
|
||||||
if (qExecTask(task, &pDataBlock, &ts) < 0) {
|
|
||||||
ASSERT(0);
|
|
||||||
}
|
|
||||||
if (pDataBlock == NULL) break;
|
|
||||||
|
|
||||||
ASSERT(pDataBlock->info.rows != 0);
|
|
||||||
ASSERT(pDataBlock->info.numOfCols != 0);
|
|
||||||
|
|
||||||
int32_t dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pDataBlock);
|
|
||||||
void* buf = taosMemoryCalloc(1, dataStrLen);
|
|
||||||
SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)buf;
|
|
||||||
pRetrieve->useconds = ts;
|
|
||||||
pRetrieve->precision = TSDB_DEFAULT_PRECISION;
|
|
||||||
pRetrieve->compressed = 0;
|
|
||||||
pRetrieve->completed = 1;
|
|
||||||
pRetrieve->numOfRows = htonl(pDataBlock->info.rows);
|
|
||||||
|
|
||||||
// TODO enable compress
|
|
||||||
int32_t actualLen = 0;
|
|
||||||
blockCompressEncode(pDataBlock, pRetrieve->data, &actualLen, pDataBlock->info.numOfCols, false);
|
|
||||||
actualLen += sizeof(SRetrieveTableRsp);
|
|
||||||
ASSERT(actualLen <= dataStrLen);
|
|
||||||
taosArrayPush(rsp.blockDataLen, &actualLen);
|
|
||||||
taosArrayPush(rsp.blockData, &buf);
|
|
||||||
rsp.blockNum++;
|
|
||||||
}
|
|
||||||
} else if (pExec->subType == TOPIC_SUB_TYPE__DB) {
|
|
||||||
STqReadHandle* pReader = pExec->pExecReader[workerId];
|
|
||||||
tqReadHandleSetMsg(pReader, pReq, 0);
|
|
||||||
while (tqNextDataBlock(pReader)) {
|
|
||||||
SSDataBlock block = {0};
|
|
||||||
if (tqRetrieveDataBlock(&block.pDataBlock, pReader, &block.info.groupId, &block.info.uid, &block.info.rows,
|
|
||||||
&block.info.numOfCols) < 0) {
|
|
||||||
ASSERT(0);
|
|
||||||
}
|
|
||||||
int32_t dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(&block);
|
|
||||||
void* buf = taosMemoryCalloc(1, dataStrLen);
|
|
||||||
SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)buf;
|
|
||||||
/*pRetrieve->useconds = 0;*/
|
|
||||||
pRetrieve->precision = TSDB_DEFAULT_PRECISION;
|
|
||||||
pRetrieve->compressed = 0;
|
|
||||||
pRetrieve->completed = 1;
|
|
||||||
pRetrieve->numOfRows = htonl(block.info.rows);
|
|
||||||
|
|
||||||
// TODO enable compress
|
|
||||||
int32_t actualLen = 0;
|
|
||||||
blockCompressEncode(&block, pRetrieve->data, &actualLen, block.info.numOfCols, false);
|
|
||||||
actualLen += sizeof(SRetrieveTableRsp);
|
|
||||||
ASSERT(actualLen <= dataStrLen);
|
|
||||||
taosArrayPush(rsp.blockDataLen, &actualLen);
|
|
||||||
taosArrayPush(rsp.blockData, &buf);
|
|
||||||
rsp.blockNum++;
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
|
// TODO
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (rsp.blockNum == 0) {
|
if (rsp.blockNum == 0) {
|
||||||
taosWUnLockLatch(&pExec->pushHandle.lock);
|
taosWUnLockLatch(&pHandle->pushHandle.lock);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -348,8 +323,8 @@ int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_
|
||||||
}
|
}
|
||||||
|
|
||||||
((SMqRspHead*)buf)->mqMsgType = TMQ_MSG_TYPE__POLL_RSP;
|
((SMqRspHead*)buf)->mqMsgType = TMQ_MSG_TYPE__POLL_RSP;
|
||||||
((SMqRspHead*)buf)->epoch = pExec->pushHandle.epoch;
|
((SMqRspHead*)buf)->epoch = pHandle->pushHandle.epoch;
|
||||||
((SMqRspHead*)buf)->consumerId = pExec->pushHandle.consumerId;
|
((SMqRspHead*)buf)->consumerId = pHandle->pushHandle.consumerId;
|
||||||
|
|
||||||
void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
|
void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
|
||||||
tEncodeSMqDataBlkRsp(&abuf, &rsp);
|
tEncodeSMqDataBlkRsp(&abuf, &rsp);
|
||||||
|
@ -357,11 +332,11 @@ int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_
|
||||||
SRpcMsg resp = {.info = handleInfo, .pCont = buf, .contLen = tlen, .code = 0};
|
SRpcMsg resp = {.info = handleInfo, .pCont = buf, .contLen = tlen, .code = 0};
|
||||||
tmsgSendRsp(&resp);
|
tmsgSendRsp(&resp);
|
||||||
|
|
||||||
atomic_store_ptr(&pExec->pushHandle.handle, NULL);
|
atomic_store_ptr(&pHandle->pushHandle.handle, NULL);
|
||||||
taosWUnLockLatch(&pExec->pushHandle.lock);
|
taosWUnLockLatch(&pHandle->pushHandle.lock);
|
||||||
|
|
||||||
tqDebug("vg %d offset %ld from consumer %ld (epoch %d) send rsp, block num: %d, reqOffset: %ld, rspOffset: %ld",
|
tqDebug("vg %d offset %ld from consumer %ld (epoch %d) send rsp, block num: %d, reqOffset: %ld, rspOffset: %ld",
|
||||||
TD_VID(pTq->pVnode), fetchOffset, pExec->pushHandle.consumerId, pExec->pushHandle.epoch, rsp.blockNum,
|
TD_VID(pTq->pVnode), fetchOffset, pHandle->pushHandle.consumerId, pHandle->pushHandle.epoch, rsp.blockNum,
|
||||||
rsp.reqOffset, rsp.rspOffset);
|
rsp.reqOffset, rsp.rspOffset);
|
||||||
|
|
||||||
// TODO destroy
|
// TODO destroy
|
||||||
|
@ -415,12 +390,12 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
|
||||||
tqDebug("tmq poll: consumer %ld (epoch %d) recv poll req in vg %d, req %ld %ld", consumerId, pReq->epoch,
|
tqDebug("tmq poll: consumer %ld (epoch %d) recv poll req in vg %d, req %ld %ld", consumerId, pReq->epoch,
|
||||||
TD_VID(pTq->pVnode), pReq->currentOffset, fetchOffset);
|
TD_VID(pTq->pVnode), pReq->currentOffset, fetchOffset);
|
||||||
|
|
||||||
STqExec* pExec = taosHashGet(pTq->execs, pReq->subKey, strlen(pReq->subKey));
|
STqHandle* pHandle = taosHashGet(pTq->handles, pReq->subKey, strlen(pReq->subKey));
|
||||||
ASSERT(pExec);
|
ASSERT(pHandle);
|
||||||
|
|
||||||
int32_t consumerEpoch = atomic_load_32(&pExec->epoch);
|
int32_t consumerEpoch = atomic_load_32(&pHandle->epoch);
|
||||||
while (consumerEpoch < reqEpoch) {
|
while (consumerEpoch < reqEpoch) {
|
||||||
consumerEpoch = atomic_val_compare_exchange_32(&pExec->epoch, consumerEpoch, reqEpoch);
|
consumerEpoch = atomic_val_compare_exchange_32(&pHandle->epoch, consumerEpoch, reqEpoch);
|
||||||
}
|
}
|
||||||
|
|
||||||
SWalHead* pHeadWithCkSum = taosMemoryMalloc(sizeof(SWalHead) + 2048);
|
SWalHead* pHeadWithCkSum = taosMemoryMalloc(sizeof(SWalHead) + 2048);
|
||||||
|
@ -428,49 +403,51 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
walSetReaderCapacity(pExec->pWalReader, 2048);
|
walSetReaderCapacity(pHandle->pWalReader, 2048);
|
||||||
|
|
||||||
SMqDataBlkRsp rsp = {0};
|
SMqDataBlkRsp rsp = {0};
|
||||||
rsp.reqOffset = pReq->currentOffset;
|
rsp.reqOffset = pReq->currentOffset;
|
||||||
rsp.withSchema = pExec->withSchema;
|
|
||||||
|
|
||||||
rsp.blockData = taosArrayInit(0, sizeof(void*));
|
rsp.blockData = taosArrayInit(0, sizeof(void*));
|
||||||
rsp.blockDataLen = taosArrayInit(0, sizeof(int32_t));
|
rsp.blockDataLen = taosArrayInit(0, sizeof(int32_t));
|
||||||
rsp.blockSchema = taosArrayInit(0, sizeof(void*));
|
rsp.blockSchema = taosArrayInit(0, sizeof(void*));
|
||||||
rsp.blockTbName = taosArrayInit(0, sizeof(void*));
|
rsp.blockTbName = taosArrayInit(0, sizeof(void*));
|
||||||
|
|
||||||
int8_t withTbName = pExec->withTbName;
|
rsp.withTbName = pReq->withTbName;
|
||||||
if (pReq->withTbName != -1) {
|
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
|
||||||
withTbName = pReq->withTbName;
|
rsp.withSchema = false;
|
||||||
|
rsp.withTag = false;
|
||||||
|
} else {
|
||||||
|
rsp.withSchema = true;
|
||||||
|
rsp.withTag = false;
|
||||||
}
|
}
|
||||||
rsp.withTbName = withTbName;
|
|
||||||
|
/*int8_t withTbName = pExec->withTbName;*/
|
||||||
|
/*if (pReq->withTbName != -1) {*/
|
||||||
|
/*withTbName = pReq->withTbName;*/
|
||||||
|
/*}*/
|
||||||
|
/*rsp.withTbName = withTbName;*/
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
consumerEpoch = atomic_load_32(&pExec->epoch);
|
consumerEpoch = atomic_load_32(&pHandle->epoch);
|
||||||
if (consumerEpoch > reqEpoch) {
|
if (consumerEpoch > reqEpoch) {
|
||||||
tqDebug("tmq poll: consumer %ld (epoch %d) vg %d offset %ld, found new consumer epoch %d discard req epoch %d",
|
tqDebug("tmq poll: consumer %ld (epoch %d) vg %d offset %ld, found new consumer epoch %d discard req epoch %d",
|
||||||
consumerId, pReq->epoch, TD_VID(pTq->pVnode), fetchOffset, consumerEpoch, reqEpoch);
|
consumerId, pReq->epoch, TD_VID(pTq->pVnode), fetchOffset, consumerEpoch, reqEpoch);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
taosThreadMutexLock(&pExec->pWalReader->mutex);
|
if (tqFetchLog(pTq, pHandle, &fetchOffset, &pHeadWithCkSum) < 0) {
|
||||||
|
// TODO add push mgr
|
||||||
if (walFetchHead(pExec->pWalReader, fetchOffset, pHeadWithCkSum) < 0) {
|
|
||||||
tqDebug("tmq poll: consumer %ld (epoch %d) vg %d offset %ld, no more log to return", consumerId, pReq->epoch,
|
|
||||||
TD_VID(pTq->pVnode), fetchOffset);
|
|
||||||
taosThreadMutexUnlock(&pExec->pWalReader->mutex);
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pHeadWithCkSum->head.msgType != TDMT_VND_SUBMIT) {
|
tqDebug("tmq poll vg %d: offset %ld msgType %d", pTq->pVnode->config.vgId, fetchOffset,
|
||||||
ASSERT(walSkipFetchBody(pExec->pWalReader, pHeadWithCkSum) == 0);
|
pHeadWithCkSum->head.msgType);
|
||||||
} else {
|
|
||||||
ASSERT(walFetchBody(pExec->pWalReader, &pHeadWithCkSum) == 0);
|
|
||||||
}
|
|
||||||
|
|
||||||
SWalReadHead* pHead = &pHeadWithCkSum->head;
|
SWalReadHead* pHead = &pHeadWithCkSum->head;
|
||||||
|
|
||||||
taosThreadMutexUnlock(&pExec->pWalReader->mutex);
|
tqDebug("tmq poll vg %d: offset %ld msgType %d", pTq->pVnode->config.vgId, fetchOffset,
|
||||||
|
pHeadWithCkSum->head.msgType);
|
||||||
|
|
||||||
#if 0
|
#if 0
|
||||||
SWalReadHead* pHead;
|
SWalReadHead* pHead;
|
||||||
|
@ -511,122 +488,26 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
|
||||||
|
|
||||||
if (pHead->msgType == TDMT_VND_SUBMIT) {
|
if (pHead->msgType == TDMT_VND_SUBMIT) {
|
||||||
SSubmitReq* pCont = (SSubmitReq*)&pHead->body;
|
SSubmitReq* pCont = (SSubmitReq*)&pHead->body;
|
||||||
// table subscribe
|
|
||||||
if (pExec->subType == TOPIC_SUB_TYPE__TABLE) {
|
|
||||||
qTaskInfo_t task = pExec->task[workerId];
|
|
||||||
ASSERT(task);
|
|
||||||
qSetStreamInput(task, pCont, STREAM_DATA_TYPE_SUBMIT_BLOCK, false);
|
|
||||||
while (1) {
|
|
||||||
SSDataBlock* pDataBlock = NULL;
|
|
||||||
uint64_t ts = 0;
|
|
||||||
if (qExecTask(task, &pDataBlock, &ts) < 0) {
|
|
||||||
ASSERT(0);
|
|
||||||
}
|
|
||||||
if (pDataBlock == NULL) break;
|
|
||||||
|
|
||||||
ASSERT(pDataBlock->info.rows != 0);
|
tqDataExec(pTq, &pHandle->execHandle, pCont, &rsp, workerId);
|
||||||
ASSERT(pDataBlock->info.numOfCols != 0);
|
} else {
|
||||||
|
// TODO
|
||||||
int32_t dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pDataBlock);
|
ASSERT(0);
|
||||||
void* buf = taosMemoryCalloc(1, dataStrLen);
|
|
||||||
SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)buf;
|
|
||||||
pRetrieve->useconds = ts;
|
|
||||||
pRetrieve->precision = TSDB_DEFAULT_PRECISION;
|
|
||||||
pRetrieve->compressed = 0;
|
|
||||||
pRetrieve->completed = 1;
|
|
||||||
pRetrieve->numOfRows = htonl(pDataBlock->info.rows);
|
|
||||||
|
|
||||||
// TODO enable compress
|
|
||||||
int32_t actualLen = 0;
|
|
||||||
blockCompressEncode(pDataBlock, pRetrieve->data, &actualLen, pDataBlock->info.numOfCols, false);
|
|
||||||
actualLen += sizeof(SRetrieveTableRsp);
|
|
||||||
ASSERT(actualLen <= dataStrLen);
|
|
||||||
taosArrayPush(rsp.blockDataLen, &actualLen);
|
|
||||||
taosArrayPush(rsp.blockData, &buf);
|
|
||||||
|
|
||||||
if (pExec->withSchema) {
|
|
||||||
SSchemaWrapper* pSW = tCloneSSchemaWrapper(pExec->pExecReader[workerId]->pSchemaWrapper);
|
|
||||||
taosArrayPush(rsp.blockSchema, &pSW);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (withTbName) {
|
|
||||||
SMetaReader mr = {0};
|
|
||||||
metaReaderInit(&mr, pTq->pVnode->pMeta, 0);
|
|
||||||
int64_t uid = pExec->pExecReader[workerId]->msgIter.uid;
|
|
||||||
if (metaGetTableEntryByUid(&mr, uid) < 0) {
|
|
||||||
ASSERT(0);
|
|
||||||
}
|
|
||||||
char* tbName = strdup(mr.me.name);
|
|
||||||
taosArrayPush(rsp.blockTbName, &tbName);
|
|
||||||
metaReaderClear(&mr);
|
|
||||||
}
|
|
||||||
|
|
||||||
rsp.blockNum++;
|
|
||||||
}
|
|
||||||
// db subscribe
|
|
||||||
} else if (pExec->subType == TOPIC_SUB_TYPE__DB) {
|
|
||||||
rsp.withSchema = 1;
|
|
||||||
STqReadHandle* pReader = pExec->pExecReader[workerId];
|
|
||||||
tqReadHandleSetMsg(pReader, pCont, 0);
|
|
||||||
while (tqNextDataBlockFilterOut(pReader, pExec->pDropTbUid)) {
|
|
||||||
SSDataBlock block = {0};
|
|
||||||
if (tqRetrieveDataBlock(&block.pDataBlock, pReader, &block.info.groupId, &block.info.uid, &block.info.rows,
|
|
||||||
&block.info.numOfCols) < 0) {
|
|
||||||
if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue;
|
|
||||||
ASSERT(0);
|
|
||||||
}
|
|
||||||
int32_t dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(&block);
|
|
||||||
void* buf = taosMemoryCalloc(1, dataStrLen);
|
|
||||||
SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)buf;
|
|
||||||
/*pRetrieve->useconds = 0;*/
|
|
||||||
pRetrieve->precision = TSDB_DEFAULT_PRECISION;
|
|
||||||
pRetrieve->compressed = 0;
|
|
||||||
pRetrieve->completed = 1;
|
|
||||||
pRetrieve->numOfRows = htonl(block.info.rows);
|
|
||||||
|
|
||||||
// TODO enable compress
|
|
||||||
int32_t actualLen = 0;
|
|
||||||
blockCompressEncode(&block, pRetrieve->data, &actualLen, block.info.numOfCols, false);
|
|
||||||
actualLen += sizeof(SRetrieveTableRsp);
|
|
||||||
ASSERT(actualLen <= dataStrLen);
|
|
||||||
taosArrayPush(rsp.blockDataLen, &actualLen);
|
|
||||||
taosArrayPush(rsp.blockData, &buf);
|
|
||||||
if (withTbName) {
|
|
||||||
SMetaReader mr = {0};
|
|
||||||
metaReaderInit(&mr, pTq->pVnode->pMeta, 0);
|
|
||||||
if (metaGetTableEntryByUid(&mr, block.info.uid) < 0) {
|
|
||||||
ASSERT(0);
|
|
||||||
}
|
|
||||||
char* tbName = strdup(mr.me.name);
|
|
||||||
taosArrayPush(rsp.blockTbName, &tbName);
|
|
||||||
metaReaderClear(&mr);
|
|
||||||
}
|
|
||||||
|
|
||||||
SSchemaWrapper* pSW = tCloneSSchemaWrapper(pExec->pExecReader[workerId]->pSchemaWrapper);
|
|
||||||
taosArrayPush(rsp.blockSchema, &pSW);
|
|
||||||
|
|
||||||
rsp.blockNum++;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
ASSERT(0);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO batch optimization:
|
// TODO batch optimization:
|
||||||
// TODO continue scan until meeting batch requirement
|
// TODO continue scan until meeting batch requirement
|
||||||
if (rsp.blockNum != 0) break;
|
if (rsp.blockNum > 0 /* threshold */) {
|
||||||
rsp.skipLogNum++;
|
break;
|
||||||
fetchOffset++;
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
taosMemoryFree(pHeadWithCkSum);
|
taosMemoryFree(pHeadWithCkSum);
|
||||||
|
|
||||||
ASSERT(taosArrayGetSize(rsp.blockData) == rsp.blockNum);
|
ASSERT(taosArrayGetSize(rsp.blockData) == rsp.blockNum);
|
||||||
ASSERT(taosArrayGetSize(rsp.blockDataLen) == rsp.blockNum);
|
ASSERT(taosArrayGetSize(rsp.blockDataLen) == rsp.blockNum);
|
||||||
|
|
||||||
if (rsp.blockNum != 0)
|
rsp.rspOffset = fetchOffset;
|
||||||
rsp.rspOffset = fetchOffset;
|
|
||||||
else
|
|
||||||
rsp.rspOffset = fetchOffset - 1;
|
|
||||||
|
|
||||||
int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqDataBlkRsp(NULL, &rsp);
|
int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqDataBlkRsp(NULL, &rsp);
|
||||||
void* buf = rpcMallocCont(tlen);
|
void* buf = rpcMallocCont(tlen);
|
||||||
|
@ -642,13 +523,18 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
|
||||||
void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
|
void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
|
||||||
tEncodeSMqDataBlkRsp(&abuf, &rsp);
|
tEncodeSMqDataBlkRsp(&abuf, &rsp);
|
||||||
|
|
||||||
SRpcMsg resp = {.info = pMsg->info, .pCont = buf, .contLen = tlen, .code = 0};
|
SRpcMsg resp = {
|
||||||
|
.info = pMsg->info,
|
||||||
|
.pCont = buf,
|
||||||
|
.contLen = tlen,
|
||||||
|
.code = 0,
|
||||||
|
};
|
||||||
tmsgSendRsp(&resp);
|
tmsgSendRsp(&resp);
|
||||||
|
|
||||||
tqDebug("vg %d offset %ld from consumer %ld (epoch %d) send rsp, block num: %d, reqOffset: %ld, rspOffset: %ld",
|
tqDebug("vg %d offset %ld from consumer %ld (epoch %d) send rsp, block num: %d, reqOffset: %ld, rspOffset: %ld",
|
||||||
TD_VID(pTq->pVnode), fetchOffset, consumerId, pReq->epoch, rsp.blockNum, rsp.reqOffset, rsp.rspOffset);
|
TD_VID(pTq->pVnode), fetchOffset, consumerId, pReq->epoch, rsp.blockNum, rsp.reqOffset, rsp.rspOffset);
|
||||||
|
|
||||||
// TODO destroy
|
// TODO wrap in destroy func
|
||||||
taosArrayDestroy(rsp.blockData);
|
taosArrayDestroy(rsp.blockData);
|
||||||
taosArrayDestroy(rsp.blockDataLen);
|
taosArrayDestroy(rsp.blockDataLen);
|
||||||
taosArrayDestroyP(rsp.blockSchema, (FDelete)tDeleteSSchemaWrapper);
|
taosArrayDestroyP(rsp.blockSchema, (FDelete)tDeleteSSchemaWrapper);
|
||||||
|
@ -660,7 +546,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
|
||||||
int32_t tqProcessVgDeleteReq(STQ* pTq, char* msg, int32_t msgLen) {
|
int32_t tqProcessVgDeleteReq(STQ* pTq, char* msg, int32_t msgLen) {
|
||||||
SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg;
|
SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg;
|
||||||
|
|
||||||
int32_t code = taosHashRemove(pTq->execs, pReq->subKey, strlen(pReq->subKey));
|
int32_t code = taosHashRemove(pTq->handles, pReq->subKey, strlen(pReq->subKey));
|
||||||
ASSERT(code == 0);
|
ASSERT(code == 0);
|
||||||
|
|
||||||
TXN txn;
|
TXN txn;
|
||||||
|
@ -689,63 +575,59 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
|
||||||
SMqRebVgReq req = {0};
|
SMqRebVgReq req = {0};
|
||||||
tDecodeSMqRebVgReq(msg, &req);
|
tDecodeSMqRebVgReq(msg, &req);
|
||||||
// todo lock
|
// todo lock
|
||||||
STqExec* pExec = taosHashGet(pTq->execs, req.subKey, strlen(req.subKey));
|
STqHandle* pHandle = taosHashGet(pTq->handles, req.subKey, strlen(req.subKey));
|
||||||
if (pExec == NULL) {
|
if (pHandle == NULL) {
|
||||||
ASSERT(req.oldConsumerId == -1);
|
ASSERT(req.oldConsumerId == -1);
|
||||||
ASSERT(req.newConsumerId != -1);
|
ASSERT(req.newConsumerId != -1);
|
||||||
STqExec exec = {0};
|
STqHandle tqHandle = {0};
|
||||||
pExec = &exec;
|
pHandle = &tqHandle;
|
||||||
/*taosInitRWLatch(&pExec->lock);*/
|
/*taosInitRWLatch(&pExec->lock);*/
|
||||||
|
|
||||||
memcpy(pExec->subKey, req.subKey, TSDB_SUBSCRIBE_KEY_LEN);
|
memcpy(pHandle->subKey, req.subKey, TSDB_SUBSCRIBE_KEY_LEN);
|
||||||
pExec->consumerId = req.newConsumerId;
|
pHandle->consumerId = req.newConsumerId;
|
||||||
pExec->epoch = -1;
|
pHandle->epoch = -1;
|
||||||
|
|
||||||
pExec->subType = req.subType;
|
pHandle->execHandle.subType = req.subType;
|
||||||
pExec->withTbName = req.withTbName;
|
/*pExec->withTbName = req.withTbName;*/
|
||||||
pExec->withSchema = req.withSchema;
|
/*pExec->withSchema = req.withSchema;*/
|
||||||
pExec->withTag = req.withTag;
|
/*pExec->withTag = req.withTag;*/
|
||||||
|
|
||||||
pExec->qmsg = req.qmsg;
|
pHandle->execHandle.exec.execCol.qmsg = req.qmsg;
|
||||||
req.qmsg = NULL;
|
req.qmsg = NULL;
|
||||||
|
|
||||||
pExec->pWalReader = walOpenReadHandle(pTq->pVnode->pWal);
|
pHandle->pWalReader = walOpenReadHandle(pTq->pVnode->pWal);
|
||||||
if (pExec->subType == TOPIC_SUB_TYPE__TABLE) {
|
for (int32_t i = 0; i < 5; i++) {
|
||||||
|
pHandle->execHandle.pExecReader[i] = tqInitSubmitMsgScanner(pTq->pVnode->pMeta);
|
||||||
|
}
|
||||||
|
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
|
||||||
for (int32_t i = 0; i < 5; i++) {
|
for (int32_t i = 0; i < 5; i++) {
|
||||||
pExec->pExecReader[i] = tqInitSubmitMsgScanner(pTq->pVnode->pMeta);
|
|
||||||
|
|
||||||
SReadHandle handle = {
|
SReadHandle handle = {
|
||||||
.reader = pExec->pExecReader[i],
|
.reader = pHandle->execHandle.pExecReader[i],
|
||||||
.meta = pTq->pVnode->pMeta,
|
.meta = pTq->pVnode->pMeta,
|
||||||
.pMsgCb = &pTq->pVnode->msgCb,
|
.pMsgCb = &pTq->pVnode->msgCb,
|
||||||
};
|
};
|
||||||
pExec->task[i] = qCreateStreamExecTaskInfo(pExec->qmsg, &handle);
|
pHandle->execHandle.exec.execCol.task[i] =
|
||||||
ASSERT(pExec->task[i]);
|
qCreateStreamExecTaskInfo(pHandle->execHandle.exec.execCol.qmsg, &handle);
|
||||||
|
ASSERT(pHandle->execHandle.exec.execCol.task[i]);
|
||||||
}
|
}
|
||||||
} else {
|
} else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) {
|
||||||
for (int32_t i = 0; i < 5; i++) {
|
pHandle->execHandle.exec.execDb.pFilterOutTbUid =
|
||||||
pExec->pExecReader[i] = tqInitSubmitMsgScanner(pTq->pVnode->pMeta);
|
taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
|
||||||
}
|
} else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
|
||||||
pExec->pDropTbUid = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
|
|
||||||
}
|
}
|
||||||
taosHashPut(pTq->execs, req.subKey, strlen(req.subKey), pExec, sizeof(STqExec));
|
taosHashPut(pTq->handles, req.subKey, strlen(req.subKey), pHandle, sizeof(STqHandle));
|
||||||
|
|
||||||
if (tqStoreExec(pTq, req.subKey, pExec) < 0) {
|
|
||||||
// TODO
|
|
||||||
}
|
|
||||||
return 0;
|
|
||||||
} else {
|
} else {
|
||||||
/*ASSERT(pExec->consumerId == req.oldConsumerId);*/
|
/*ASSERT(pExec->consumerId == req.oldConsumerId);*/
|
||||||
// TODO handle qmsg and exec modification
|
// TODO handle qmsg and exec modification
|
||||||
atomic_store_32(&pExec->epoch, -1);
|
atomic_store_32(&pHandle->epoch, -1);
|
||||||
atomic_store_64(&pExec->consumerId, req.newConsumerId);
|
atomic_store_64(&pHandle->consumerId, req.newConsumerId);
|
||||||
atomic_add_fetch_32(&pExec->epoch, 1);
|
atomic_add_fetch_32(&pHandle->epoch, 1);
|
||||||
|
|
||||||
if (tqStoreExec(pTq, req.subKey, pExec) < 0) {
|
|
||||||
// TODO
|
|
||||||
}
|
|
||||||
return 0;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (tqStoreHandle(pTq, req.subKey, pHandle) < 0) {
|
||||||
|
// TODO
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
void tqTableSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data) {
|
void tqTableSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data) {
|
||||||
|
|
|
@ -0,0 +1,124 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "tq.h"
|
||||||
|
|
||||||
|
static int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataBlkRsp* pRsp) {
|
||||||
|
int32_t dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock);
|
||||||
|
void* buf = taosMemoryCalloc(1, dataStrLen);
|
||||||
|
if (buf == NULL) return -1;
|
||||||
|
|
||||||
|
SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)buf;
|
||||||
|
pRetrieve->useconds = 0;
|
||||||
|
pRetrieve->precision = TSDB_DEFAULT_PRECISION;
|
||||||
|
pRetrieve->compressed = 0;
|
||||||
|
pRetrieve->completed = 1;
|
||||||
|
pRetrieve->numOfRows = htonl(pBlock->info.rows);
|
||||||
|
|
||||||
|
// TODO enable compress
|
||||||
|
int32_t actualLen = 0;
|
||||||
|
blockCompressEncode(pBlock, pRetrieve->data, &actualLen, pBlock->info.numOfCols, false);
|
||||||
|
actualLen += sizeof(SRetrieveTableRsp);
|
||||||
|
ASSERT(actualLen <= dataStrLen);
|
||||||
|
taosArrayPush(pRsp->blockDataLen, &actualLen);
|
||||||
|
taosArrayPush(pRsp->blockData, &buf);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t tqAddBlockSchemaToRsp(const STqExecHandle* pExec, int32_t workerId, SMqDataBlkRsp* pRsp) {
|
||||||
|
SSchemaWrapper* pSW = tCloneSSchemaWrapper(pExec->pExecReader[workerId]->pSchemaWrapper);
|
||||||
|
taosArrayPush(pRsp->blockSchema, &pSW);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t tqAddTbNameToRsp(const STQ* pTq, const STqExecHandle* pExec, SMqDataBlkRsp* pRsp, int32_t workerId) {
|
||||||
|
SMetaReader mr = {0};
|
||||||
|
metaReaderInit(&mr, pTq->pVnode->pMeta, 0);
|
||||||
|
int64_t uid = pExec->pExecReader[workerId]->msgIter.uid;
|
||||||
|
if (metaGetTableEntryByUid(&mr, uid) < 0) {
|
||||||
|
ASSERT(0);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
char* tbName = strdup(mr.me.name);
|
||||||
|
taosArrayPush(pRsp->blockTbName, &tbName);
|
||||||
|
metaReaderClear(&mr);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tqDataExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataBlkRsp* pRsp, int32_t workerId) {
|
||||||
|
if (pExec->subType == TOPIC_SUB_TYPE__COLUMN) {
|
||||||
|
qTaskInfo_t task = pExec->exec.execCol.task[workerId];
|
||||||
|
ASSERT(task);
|
||||||
|
qSetStreamInput(task, pReq, STREAM_DATA_TYPE_SUBMIT_BLOCK, false);
|
||||||
|
while (1) {
|
||||||
|
SSDataBlock* pDataBlock = NULL;
|
||||||
|
uint64_t ts = 0;
|
||||||
|
if (qExecTask(task, &pDataBlock, &ts) < 0) {
|
||||||
|
ASSERT(0);
|
||||||
|
}
|
||||||
|
if (pDataBlock == NULL) break;
|
||||||
|
|
||||||
|
ASSERT(pDataBlock->info.rows != 0);
|
||||||
|
ASSERT(pDataBlock->info.numOfCols != 0);
|
||||||
|
|
||||||
|
tqAddBlockDataToRsp(pDataBlock, pRsp);
|
||||||
|
if (pRsp->withTbName) {
|
||||||
|
tqAddTbNameToRsp(pTq, pExec, pRsp, workerId);
|
||||||
|
}
|
||||||
|
pRsp->blockNum++;
|
||||||
|
}
|
||||||
|
} else if (pExec->subType == TOPIC_SUB_TYPE__TABLE) {
|
||||||
|
pRsp->withSchema = 1;
|
||||||
|
STqReadHandle* pReader = pExec->pExecReader[workerId];
|
||||||
|
tqReadHandleSetMsg(pReader, pReq, 0);
|
||||||
|
while (tqNextDataBlock(pReader)) {
|
||||||
|
SSDataBlock block = {0};
|
||||||
|
if (tqRetrieveDataBlock(&block.pDataBlock, pReader, &block.info.groupId, &block.info.uid, &block.info.rows,
|
||||||
|
&block.info.numOfCols) < 0) {
|
||||||
|
if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue;
|
||||||
|
ASSERT(0);
|
||||||
|
}
|
||||||
|
tqAddBlockDataToRsp(&block, pRsp);
|
||||||
|
if (pRsp->withTbName) {
|
||||||
|
tqAddTbNameToRsp(pTq, pExec, pRsp, workerId);
|
||||||
|
}
|
||||||
|
tqAddBlockSchemaToRsp(pExec, workerId, pRsp);
|
||||||
|
pRsp->blockNum++;
|
||||||
|
}
|
||||||
|
} else if (pExec->subType == TOPIC_SUB_TYPE__DB) {
|
||||||
|
pRsp->withSchema = 1;
|
||||||
|
STqReadHandle* pReader = pExec->pExecReader[workerId];
|
||||||
|
tqReadHandleSetMsg(pReader, pReq, 0);
|
||||||
|
while (tqNextDataBlockFilterOut(pReader, pExec->exec.execDb.pFilterOutTbUid)) {
|
||||||
|
SSDataBlock block = {0};
|
||||||
|
if (tqRetrieveDataBlock(&block.pDataBlock, pReader, &block.info.groupId, &block.info.uid, &block.info.rows,
|
||||||
|
&block.info.numOfCols) < 0) {
|
||||||
|
if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue;
|
||||||
|
ASSERT(0);
|
||||||
|
}
|
||||||
|
tqAddBlockDataToRsp(&block, pRsp);
|
||||||
|
if (pRsp->withTbName) {
|
||||||
|
tqAddTbNameToRsp(pTq, pExec, pRsp, workerId);
|
||||||
|
}
|
||||||
|
tqAddBlockSchemaToRsp(pExec, workerId, pRsp);
|
||||||
|
pRsp->blockNum++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (pRsp->blockNum == 0) {
|
||||||
|
pRsp->skipLogNum++;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
|
@ -0,0 +1,14 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
|
@ -15,6 +15,48 @@
|
||||||
|
|
||||||
#include "tq.h"
|
#include "tq.h"
|
||||||
|
|
||||||
|
int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalHead** ppHeadWithCkSum) {
|
||||||
|
int32_t code = 0;
|
||||||
|
taosThreadMutexLock(&pHandle->pWalReader->mutex);
|
||||||
|
int64_t offset = *fetchOffset;
|
||||||
|
|
||||||
|
while (1) {
|
||||||
|
if (walFetchHead(pHandle->pWalReader, offset, *ppHeadWithCkSum) < 0) {
|
||||||
|
tqDebug("tmq poll: consumer %ld (epoch %d) vg %d offset %ld, no more log to return", pHandle->consumerId,
|
||||||
|
pHandle->epoch, TD_VID(pTq->pVnode), offset);
|
||||||
|
*fetchOffset = offset - 1;
|
||||||
|
code = -1;
|
||||||
|
goto END;
|
||||||
|
}
|
||||||
|
|
||||||
|
if ((*ppHeadWithCkSum)->head.msgType == TDMT_VND_SUBMIT) {
|
||||||
|
code = walFetchBody(pHandle->pWalReader, ppHeadWithCkSum);
|
||||||
|
|
||||||
|
if (code < 0) {
|
||||||
|
ASSERT(0);
|
||||||
|
*fetchOffset = offset;
|
||||||
|
code = -1;
|
||||||
|
goto END;
|
||||||
|
}
|
||||||
|
*fetchOffset = offset;
|
||||||
|
code = 0;
|
||||||
|
goto END;
|
||||||
|
} else {
|
||||||
|
code = walSkipFetchBody(pHandle->pWalReader, *ppHeadWithCkSum);
|
||||||
|
if (code < 0) {
|
||||||
|
ASSERT(0);
|
||||||
|
*fetchOffset = offset;
|
||||||
|
code = -1;
|
||||||
|
goto END;
|
||||||
|
}
|
||||||
|
offset++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
END:
|
||||||
|
taosThreadMutexUnlock(&pHandle->pWalReader->mutex);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta) {
|
STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta) {
|
||||||
STqReadHandle* pReadHandle = taosMemoryMalloc(sizeof(STqReadHandle));
|
STqReadHandle* pReadHandle = taosMemoryMalloc(sizeof(STqReadHandle));
|
||||||
if (pReadHandle == NULL) {
|
if (pReadHandle == NULL) {
|
||||||
|
@ -24,7 +66,7 @@ STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta) {
|
||||||
pReadHandle->pMsg = NULL;
|
pReadHandle->pMsg = NULL;
|
||||||
pReadHandle->ver = -1;
|
pReadHandle->ver = -1;
|
||||||
pReadHandle->pColIdList = NULL;
|
pReadHandle->pColIdList = NULL;
|
||||||
pReadHandle->sver = -1;
|
pReadHandle->cachedSchemaVer = -1;
|
||||||
pReadHandle->cachedSchemaUid = -1;
|
pReadHandle->cachedSchemaUid = -1;
|
||||||
pReadHandle->pSchema = NULL;
|
pReadHandle->pSchema = NULL;
|
||||||
pReadHandle->pSchemaWrapper = NULL;
|
pReadHandle->pSchemaWrapper = NULL;
|
||||||
|
@ -88,11 +130,11 @@ int32_t tqRetrieveDataBlock(SArray** ppCols, STqReadHandle* pHandle, uint64_t* p
|
||||||
// TODO set to real sversion
|
// TODO set to real sversion
|
||||||
/*int32_t sversion = 1;*/
|
/*int32_t sversion = 1;*/
|
||||||
int32_t sversion = htonl(pHandle->pBlock->sversion);
|
int32_t sversion = htonl(pHandle->pBlock->sversion);
|
||||||
if (pHandle->sver != sversion || pHandle->cachedSchemaUid != pHandle->msgIter.suid) {
|
if (pHandle->cachedSchemaVer != sversion || pHandle->cachedSchemaUid != pHandle->msgIter.suid) {
|
||||||
pHandle->pSchema = metaGetTbTSchema(pHandle->pVnodeMeta, pHandle->msgIter.uid, sversion);
|
pHandle->pSchema = metaGetTbTSchema(pHandle->pVnodeMeta, pHandle->msgIter.uid, sversion);
|
||||||
if (pHandle->pSchema == NULL) {
|
if (pHandle->pSchema == NULL) {
|
||||||
tqWarn("cannot found tsschema for table: uid: %ld (suid: %ld), version %d, possibly dropped table",
|
tqWarn("cannot found tsschema for table: uid: %ld (suid: %ld), version %d, possibly dropped table",
|
||||||
pHandle->msgIter.uid, pHandle->msgIter.suid, pHandle->sver);
|
pHandle->msgIter.uid, pHandle->msgIter.suid, pHandle->cachedSchemaVer);
|
||||||
/*ASSERT(0);*/
|
/*ASSERT(0);*/
|
||||||
terrno = TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND;
|
terrno = TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND;
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -102,12 +144,12 @@ int32_t tqRetrieveDataBlock(SArray** ppCols, STqReadHandle* pHandle, uint64_t* p
|
||||||
pHandle->pSchemaWrapper = metaGetTableSchema(pHandle->pVnodeMeta, pHandle->msgIter.suid, sversion, true);
|
pHandle->pSchemaWrapper = metaGetTableSchema(pHandle->pVnodeMeta, pHandle->msgIter.suid, sversion, true);
|
||||||
if (pHandle->pSchemaWrapper == NULL) {
|
if (pHandle->pSchemaWrapper == NULL) {
|
||||||
tqWarn("cannot found schema wrapper for table: suid: %ld, version %d, possibly dropped table",
|
tqWarn("cannot found schema wrapper for table: suid: %ld, version %d, possibly dropped table",
|
||||||
pHandle->msgIter.suid, pHandle->sver);
|
pHandle->msgIter.suid, pHandle->cachedSchemaVer);
|
||||||
/*ASSERT(0);*/
|
/*ASSERT(0);*/
|
||||||
terrno = TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND;
|
terrno = TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
pHandle->sver = sversion;
|
pHandle->cachedSchemaVer = sversion;
|
||||||
pHandle->cachedSchemaUid = pHandle->msgIter.suid;
|
pHandle->cachedSchemaUid = pHandle->msgIter.suid;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -182,7 +182,7 @@ class TDTestCase:
|
||||||
tdLog.info("create topics from db")
|
tdLog.info("create topics from db")
|
||||||
topicName1 = 'topic_db1'
|
topicName1 = 'topic_db1'
|
||||||
|
|
||||||
tdSql.execute("create topic %s as %s" %(topicName1, parameterDict['dbName']))
|
tdSql.execute("create topic %s as database %s" %(topicName1, parameterDict['dbName']))
|
||||||
consumerId = 0
|
consumerId = 0
|
||||||
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"]
|
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"]
|
||||||
topicList = topicName1
|
topicList = topicName1
|
||||||
|
@ -223,7 +223,7 @@ class TDTestCase:
|
||||||
tdLog.info("create topics from db")
|
tdLog.info("create topics from db")
|
||||||
topicName1 = 'topic_db1'
|
topicName1 = 'topic_db1'
|
||||||
|
|
||||||
tdSql.execute("create topic %s as %s" %(topicName1, parameterDict['dbName']))
|
tdSql.execute("create topic %s as database %s" %(topicName1, parameterDict['dbName']))
|
||||||
consumerId = 0
|
consumerId = 0
|
||||||
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"]
|
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"]
|
||||||
topicList = topicName1
|
topicList = topicName1
|
||||||
|
@ -279,7 +279,7 @@ class TDTestCase:
|
||||||
tdLog.info("create topics from db")
|
tdLog.info("create topics from db")
|
||||||
topicName1 = 'topic_db1'
|
topicName1 = 'topic_db1'
|
||||||
|
|
||||||
tdSql.execute("create topic %s as %s" %(topicName1, parameterDict['dbName']))
|
tdSql.execute("create topic %s as database %s" %(topicName1, parameterDict['dbName']))
|
||||||
|
|
||||||
consumerId = 0
|
consumerId = 0
|
||||||
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"]
|
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"]
|
||||||
|
@ -343,7 +343,7 @@ class TDTestCase:
|
||||||
tdLog.info("create topics from db")
|
tdLog.info("create topics from db")
|
||||||
topicName1 = 'topic_db1'
|
topicName1 = 'topic_db1'
|
||||||
|
|
||||||
tdSql.execute("create topic %s as %s" %(topicName1, parameterDict['dbName']))
|
tdSql.execute("create topic %s as database %s" %(topicName1, parameterDict['dbName']))
|
||||||
|
|
||||||
consumerId = 0
|
consumerId = 0
|
||||||
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"]
|
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"]
|
||||||
|
@ -427,7 +427,7 @@ class TDTestCase:
|
||||||
tdLog.info("create topics from db")
|
tdLog.info("create topics from db")
|
||||||
topicName1 = 'topic_db1'
|
topicName1 = 'topic_db1'
|
||||||
|
|
||||||
tdSql.execute("create topic %s as %s" %(topicName1, parameterDict['dbName']))
|
tdSql.execute("create topic %s as database %s" %(topicName1, parameterDict['dbName']))
|
||||||
|
|
||||||
consumerId = 0
|
consumerId = 0
|
||||||
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] + parameterDict2["rowsPerTbl"] * parameterDict2["ctbNum"]
|
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] + parameterDict2["rowsPerTbl"] * parameterDict2["ctbNum"]
|
||||||
|
|
|
@ -195,7 +195,7 @@ class TDTestCase:
|
||||||
tdLog.info("create topics from db")
|
tdLog.info("create topics from db")
|
||||||
topicName1 = 'topic_db1'
|
topicName1 = 'topic_db1'
|
||||||
|
|
||||||
tdSql.execute("create topic %s as %s" %(topicName1, parameterDict['dbName']))
|
tdSql.execute("create topic %s as database %s" %(topicName1, parameterDict['dbName']))
|
||||||
|
|
||||||
consumerId = 0
|
consumerId = 0
|
||||||
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] + parameterDict2["rowsPerTbl"] * parameterDict2["ctbNum"]
|
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] + parameterDict2["rowsPerTbl"] * parameterDict2["ctbNum"]
|
||||||
|
@ -272,7 +272,7 @@ class TDTestCase:
|
||||||
tdLog.info("create topics from db")
|
tdLog.info("create topics from db")
|
||||||
topicName1 = 'topic_db1'
|
topicName1 = 'topic_db1'
|
||||||
|
|
||||||
tdSql.execute("create topic %s as %s" %(topicName1, parameterDict['dbName']))
|
tdSql.execute("create topic %s as database %s" %(topicName1, parameterDict['dbName']))
|
||||||
|
|
||||||
consumerId = 0
|
consumerId = 0
|
||||||
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] + parameterDict2["rowsPerTbl"] * parameterDict2["ctbNum"]
|
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] + parameterDict2["rowsPerTbl"] * parameterDict2["ctbNum"]
|
||||||
|
@ -358,8 +358,8 @@ class TDTestCase:
|
||||||
topicName1 = 'topic_db60'
|
topicName1 = 'topic_db60'
|
||||||
topicName2 = 'topic_db61'
|
topicName2 = 'topic_db61'
|
||||||
|
|
||||||
tdSql.execute("create topic %s as %s" %(topicName1, parameterDict['dbName']))
|
tdSql.execute("create topic %s as database %s" %(topicName1, parameterDict['dbName']))
|
||||||
tdSql.execute("create topic %s as %s" %(topicName2, parameterDict2['dbName']))
|
tdSql.execute("create topic %s as database %s" %(topicName2, parameterDict2['dbName']))
|
||||||
|
|
||||||
consumerId = 0
|
consumerId = 0
|
||||||
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] + parameterDict2["rowsPerTbl"] * parameterDict2["ctbNum"]
|
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] + parameterDict2["rowsPerTbl"] * parameterDict2["ctbNum"]
|
||||||
|
@ -443,8 +443,8 @@ class TDTestCase:
|
||||||
topicName1 = 'topic_db60'
|
topicName1 = 'topic_db60'
|
||||||
topicName2 = 'topic_db61'
|
topicName2 = 'topic_db61'
|
||||||
|
|
||||||
tdSql.execute("create topic %s as %s" %(topicName1, parameterDict['dbName']))
|
tdSql.execute("create topic %s as database %s" %(topicName1, parameterDict['dbName']))
|
||||||
tdSql.execute("create topic %s as %s" %(topicName2, parameterDict2['dbName']))
|
tdSql.execute("create topic %s as database %s" %(topicName2, parameterDict2['dbName']))
|
||||||
|
|
||||||
consumerId = 0
|
consumerId = 0
|
||||||
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] + parameterDict2["rowsPerTbl"] * parameterDict2["ctbNum"]
|
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] + parameterDict2["rowsPerTbl"] * parameterDict2["ctbNum"]
|
||||||
|
|
|
@ -183,7 +183,7 @@ class TDTestCase:
|
||||||
tdLog.info("create topics from db")
|
tdLog.info("create topics from db")
|
||||||
topicName1 = 'topic_db1'
|
topicName1 = 'topic_db1'
|
||||||
|
|
||||||
tdSql.execute("create topic %s as %s" %(topicName1, parameterDict['dbName']))
|
tdSql.execute("create topic %s as database %s" %(topicName1, parameterDict['dbName']))
|
||||||
consumerId = 0
|
consumerId = 0
|
||||||
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] / 2
|
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] / 2
|
||||||
topicList = topicName1
|
topicList = topicName1
|
||||||
|
@ -261,7 +261,7 @@ class TDTestCase:
|
||||||
tdLog.info("create topics from db")
|
tdLog.info("create topics from db")
|
||||||
topicName1 = 'topic_db1'
|
topicName1 = 'topic_db1'
|
||||||
|
|
||||||
tdSql.execute("create topic %s as %s" %(topicName1, parameterDict['dbName']))
|
tdSql.execute("create topic %s as database %s" %(topicName1, parameterDict['dbName']))
|
||||||
consumerId = 0
|
consumerId = 0
|
||||||
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] / 2
|
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] / 2
|
||||||
topicList = topicName1
|
topicList = topicName1
|
||||||
|
@ -339,7 +339,7 @@ class TDTestCase:
|
||||||
tdLog.info("create topics from db")
|
tdLog.info("create topics from db")
|
||||||
topicName1 = 'topic_db1'
|
topicName1 = 'topic_db1'
|
||||||
|
|
||||||
tdSql.execute("create topic %s as %s" %(topicName1, parameterDict['dbName']))
|
tdSql.execute("create topic %s as database %s" %(topicName1, parameterDict['dbName']))
|
||||||
consumerId = 0
|
consumerId = 0
|
||||||
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"]
|
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"]
|
||||||
topicList = topicName1
|
topicList = topicName1
|
||||||
|
@ -411,7 +411,7 @@ class TDTestCase:
|
||||||
tdLog.info("create topics from db")
|
tdLog.info("create topics from db")
|
||||||
topicName1 = 'topic_db1'
|
topicName1 = 'topic_db1'
|
||||||
|
|
||||||
tdSql.execute("create topic %s as %s" %(topicName1, parameterDict['dbName']))
|
tdSql.execute("create topic %s as database %s" %(topicName1, parameterDict['dbName']))
|
||||||
consumerId = 0
|
consumerId = 0
|
||||||
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"]
|
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"]
|
||||||
topicList = topicName1
|
topicList = topicName1
|
||||||
|
|
Loading…
Reference in New Issue