feat(tmq): get_db api
This commit is contained in:
parent
58a0691abe
commit
e1fd4a6640
|
@ -24,6 +24,7 @@ static void msg_process(TAOS_RES* msg) {
|
|||
char buf[1024];
|
||||
/*memset(buf, 0, 1024);*/
|
||||
printf("topic: %s\n", tmq_get_topic_name(msg));
|
||||
printf("db: %s\n", tmq_get_db_name(msg));
|
||||
printf("vg: %d\n", tmq_get_vgroup_id(msg));
|
||||
while (1) {
|
||||
TAOS_ROW row = taos_fetch_row(msg);
|
||||
|
|
|
@ -144,8 +144,8 @@ DLL_EXPORT int taos_stmt_set_tbname_tags(TAOS_STMT *stmt, const char *nam
|
|||
DLL_EXPORT int taos_stmt_set_tbname(TAOS_STMT *stmt, const char *name);
|
||||
DLL_EXPORT int taos_stmt_set_tags(TAOS_STMT *stmt, TAOS_MULTI_BIND *tags);
|
||||
DLL_EXPORT int taos_stmt_set_sub_tbname(TAOS_STMT *stmt, const char *name);
|
||||
DLL_EXPORT int taos_stmt_get_tag_fields(TAOS_STMT *stmt, int* fieldNum, TAOS_FIELD_E** fields);
|
||||
DLL_EXPORT int taos_stmt_get_col_fields(TAOS_STMT *stmt, int* fieldNum, TAOS_FIELD_E** fields);
|
||||
DLL_EXPORT int taos_stmt_get_tag_fields(TAOS_STMT *stmt, int *fieldNum, TAOS_FIELD_E **fields);
|
||||
DLL_EXPORT int taos_stmt_get_col_fields(TAOS_STMT *stmt, int *fieldNum, TAOS_FIELD_E **fields);
|
||||
|
||||
DLL_EXPORT int taos_stmt_is_insert(TAOS_STMT *stmt, int *insert);
|
||||
DLL_EXPORT int taos_stmt_num_params(TAOS_STMT *stmt, int *nums);
|
||||
|
@ -269,6 +269,7 @@ DLL_EXPORT void tmq_conf_set_auto_commit_cb(tmq_conf_t *conf, tmq_comm
|
|||
/* -------------------------TMQ MSG HANDLE INTERFACE---------------------- */
|
||||
|
||||
DLL_EXPORT const char *tmq_get_topic_name(TAOS_RES *res);
|
||||
DLL_EXPORT const char *tmq_get_db_name(TAOS_RES *res);
|
||||
DLL_EXPORT int32_t tmq_get_vgroup_id(TAOS_RES *res);
|
||||
DLL_EXPORT const char *tmq_get_table_name(TAOS_RES *res);
|
||||
|
||||
|
|
|
@ -2203,10 +2203,8 @@ typedef struct {
|
|||
int64_t newConsumerId;
|
||||
char subKey[TSDB_SUBSCRIBE_KEY_LEN];
|
||||
int8_t subType;
|
||||
// int8_t withTbName;
|
||||
// int8_t withSchema;
|
||||
// int8_t withTag;
|
||||
char* qmsg;
|
||||
char* qmsg;
|
||||
int64_t suid;
|
||||
} SMqRebVgReq;
|
||||
|
||||
static FORCE_INLINE int32_t tEncodeSMqRebVgReq(void** buf, const SMqRebVgReq* pReq) {
|
||||
|
@ -2217,11 +2215,10 @@ static FORCE_INLINE int32_t tEncodeSMqRebVgReq(void** buf, const SMqRebVgReq* pR
|
|||
tlen += taosEncodeFixedI64(buf, pReq->newConsumerId);
|
||||
tlen += taosEncodeString(buf, pReq->subKey);
|
||||
tlen += taosEncodeFixedI8(buf, pReq->subType);
|
||||
// tlen += taosEncodeFixedI8(buf, pReq->withTbName);
|
||||
// tlen += taosEncodeFixedI8(buf, pReq->withSchema);
|
||||
// tlen += taosEncodeFixedI8(buf, pReq->withTag);
|
||||
if (pReq->subType == TOPIC_SUB_TYPE__COLUMN) {
|
||||
tlen += taosEncodeString(buf, pReq->qmsg);
|
||||
} else if (pReq->subType == TOPIC_SUB_TYPE__TABLE) {
|
||||
tlen += taosEncodeFixedI64(buf, pReq->suid);
|
||||
}
|
||||
return tlen;
|
||||
}
|
||||
|
@ -2233,11 +2230,10 @@ static FORCE_INLINE void* tDecodeSMqRebVgReq(const void* buf, SMqRebVgReq* pReq)
|
|||
buf = taosDecodeFixedI64(buf, &pReq->newConsumerId);
|
||||
buf = taosDecodeStringTo(buf, pReq->subKey);
|
||||
buf = taosDecodeFixedI8(buf, &pReq->subType);
|
||||
// buf = taosDecodeFixedI8(buf, &pReq->withTbName);
|
||||
// buf = taosDecodeFixedI8(buf, &pReq->withSchema);
|
||||
// buf = taosDecodeFixedI8(buf, &pReq->withTag);
|
||||
if (pReq->subType == TOPIC_SUB_TYPE__COLUMN) {
|
||||
buf = taosDecodeString(buf, &pReq->qmsg);
|
||||
} else if (pReq->subType == TOPIC_SUB_TYPE__TABLE) {
|
||||
buf = taosDecodeFixedI64(buf, &pReq->suid);
|
||||
}
|
||||
return (void*)buf;
|
||||
}
|
||||
|
@ -2471,7 +2467,7 @@ static FORCE_INLINE void* tDecodeSMqSubVgEp(void* buf, SMqSubVgEp* pVgEp) {
|
|||
|
||||
typedef struct {
|
||||
char topic[TSDB_TOPIC_FNAME_LEN];
|
||||
int8_t isSchemaAdaptive;
|
||||
char db[TSDB_DB_FNAME_LEN];
|
||||
SArray* vgs; // SArray<SMqSubVgEp>
|
||||
SSchemaWrapper schema;
|
||||
} SMqSubTopicEp;
|
||||
|
@ -2479,7 +2475,7 @@ typedef struct {
|
|||
static FORCE_INLINE int32_t tEncodeSMqSubTopicEp(void** buf, const SMqSubTopicEp* pTopicEp) {
|
||||
int32_t tlen = 0;
|
||||
tlen += taosEncodeString(buf, pTopicEp->topic);
|
||||
tlen += taosEncodeFixedI8(buf, pTopicEp->isSchemaAdaptive);
|
||||
tlen += taosEncodeString(buf, pTopicEp->db);
|
||||
int32_t sz = taosArrayGetSize(pTopicEp->vgs);
|
||||
tlen += taosEncodeFixedI32(buf, sz);
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
|
@ -2492,7 +2488,7 @@ static FORCE_INLINE int32_t tEncodeSMqSubTopicEp(void** buf, const SMqSubTopicEp
|
|||
|
||||
static FORCE_INLINE void* tDecodeSMqSubTopicEp(void* buf, SMqSubTopicEp* pTopicEp) {
|
||||
buf = taosDecodeStringTo(buf, pTopicEp->topic);
|
||||
buf = taosDecodeFixedI8(buf, &pTopicEp->isSchemaAdaptive);
|
||||
buf = taosDecodeStringTo(buf, pTopicEp->db);
|
||||
int32_t sz;
|
||||
buf = taosDecodeFixedI32(buf, &sz);
|
||||
pTopicEp->vgs = taosArrayInit(sz, sizeof(SMqSubVgEp));
|
||||
|
|
|
@ -209,7 +209,7 @@ typedef enum ELogicConditionType {
|
|||
#define TSDB_INDEX_FNAME_LEN (TSDB_DB_FNAME_LEN + TSDB_INDEX_NAME_LEN + TSDB_NAME_DELIMITER_LEN)
|
||||
#define TSDB_TYPE_STR_MAX_LEN 32
|
||||
#define TSDB_TABLE_FNAME_LEN (TSDB_DB_FNAME_LEN + TSDB_TABLE_NAME_LEN + TSDB_NAME_DELIMITER_LEN)
|
||||
#define TSDB_TOPIC_FNAME_LEN TSDB_TABLE_FNAME_LEN
|
||||
#define TSDB_TOPIC_FNAME_LEN (TSDB_ACCT_ID_LEN + TSDB_TABLE_NAME_LEN + TSDB_NAME_DELIMITER_LEN)
|
||||
#define TSDB_STREAM_FNAME_LEN TSDB_TABLE_FNAME_LEN
|
||||
#define TSDB_SUBSCRIBE_KEY_LEN (TSDB_CGROUP_LEN + TSDB_TOPIC_FNAME_LEN + 2)
|
||||
#define TSDB_PARTITION_KEY_LEN (TSDB_SUBSCRIBE_KEY_LEN + 20)
|
||||
|
|
|
@ -191,6 +191,7 @@ typedef struct SRequestSendRecvBody {
|
|||
typedef struct {
|
||||
int8_t resType;
|
||||
char topic[TSDB_TOPIC_FNAME_LEN];
|
||||
char db[TSDB_DB_FNAME_LEN];
|
||||
int32_t vgId;
|
||||
SSchemaWrapper schema;
|
||||
int32_t resIter;
|
||||
|
|
|
@ -143,6 +143,7 @@ typedef struct {
|
|||
typedef struct {
|
||||
// subscribe info
|
||||
char* topicName;
|
||||
char db[TSDB_DB_FNAME_LEN];
|
||||
|
||||
SArray* vgs; // SArray<SMqClientVg>
|
||||
|
||||
|
@ -1039,6 +1040,7 @@ bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqAskEpRsp* pRsp) {
|
|||
topic.schema = pTopicEp->schema;
|
||||
taosHashClear(pHash);
|
||||
topic.topicName = strdup(pTopicEp->topic);
|
||||
tstrncpy(topic.db, pTopicEp->db, TSDB_DB_FNAME_LEN);
|
||||
|
||||
tscDebug("consumer %ld update topic: %s", tmq->consumerId, topic.topicName);
|
||||
int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics);
|
||||
|
@ -1283,7 +1285,8 @@ SMqPollReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t timeout, SMqClientTopic*
|
|||
SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper) {
|
||||
SMqRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqRspObj));
|
||||
pRspObj->resType = RES_TYPE__TMQ;
|
||||
strncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
|
||||
tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
|
||||
tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN);
|
||||
pRspObj->vgId = pWrapper->vgHandle->vgId;
|
||||
pRspObj->resIter = -1;
|
||||
memcpy(&pRspObj->rsp, &pWrapper->msg, sizeof(SMqDataBlkRsp));
|
||||
|
@ -1506,6 +1509,15 @@ const char* tmq_get_topic_name(TAOS_RES* res) {
|
|||
}
|
||||
}
|
||||
|
||||
const char* tmq_get_db_name(TAOS_RES* res) {
|
||||
if (TD_RES_TMQ(res)) {
|
||||
SMqRspObj* pRspObj = (SMqRspObj*)res;
|
||||
return strchr(pRspObj->db, '.') + 1;
|
||||
} else {
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
|
||||
int32_t tmq_get_vgroup_id(TAOS_RES* res) {
|
||||
if (TD_RES_TMQ(res)) {
|
||||
SMqRspObj* pRspObj = (SMqRspObj*)res;
|
||||
|
|
|
@ -422,6 +422,7 @@ typedef struct {
|
|||
char* ast;
|
||||
char* physicalPlan;
|
||||
SSchemaWrapper schema;
|
||||
int64_t stbUid;
|
||||
// int32_t refConsumerCnt;
|
||||
} SMqTopicObj;
|
||||
|
||||
|
@ -535,7 +536,7 @@ typedef struct {
|
|||
} SMqRebOutputObj;
|
||||
|
||||
typedef struct {
|
||||
char name[TSDB_TOPIC_FNAME_LEN];
|
||||
char name[TSDB_STREAM_FNAME_LEN];
|
||||
char sourceDb[TSDB_DB_FNAME_LEN];
|
||||
char targetDb[TSDB_DB_FNAME_LEN];
|
||||
char targetSTbName[TSDB_TABLE_FNAME_LEN];
|
||||
|
|
|
@ -306,6 +306,7 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) {
|
|||
SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic);
|
||||
ASSERT(pTopic);
|
||||
taosRLockLatch(&pTopic->lock);
|
||||
tstrncpy(topicEp.db, pTopic->db, TSDB_DB_FNAME_LEN);
|
||||
topicEp.schema.nCols = pTopic->schema.nCols;
|
||||
if (topicEp.schema.nCols) {
|
||||
topicEp.schema.pSchema = taosMemoryCalloc(topicEp.schema.nCols, sizeof(SSchema));
|
||||
|
|
|
@ -121,9 +121,6 @@ static int32_t mndBuildSubChangeReq(void **pBuf, int32_t *pLen, const SMqSubscri
|
|||
req.vgId = pRebVg->pVgEp->vgId;
|
||||
req.qmsg = pRebVg->pVgEp->qmsg;
|
||||
req.subType = pSub->subType;
|
||||
/*req.withTbName = pSub->withTbName;*/
|
||||
/*req.withSchema = pSub->withSchema;*/
|
||||
/*req.withTag = pSub->withTag;*/
|
||||
strncpy(req.subKey, pSub->key, TSDB_SUBSCRIBE_KEY_LEN);
|
||||
|
||||
int32_t tlen = sizeof(SMsgHead) + tEncodeSMqRebVgReq(NULL, &req);
|
||||
|
|
|
@ -375,13 +375,14 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
|
|||
taosMemoryFree(topicObj.sql);
|
||||
return -1;
|
||||
}
|
||||
/*} else if (pCreate->subType == TOPIC_SUB_TYPE__DB) {*/
|
||||
/*topicObj.ast = NULL;*/
|
||||
/*topicObj.astLen = 0;*/
|
||||
/*topicObj.physicalPlan = NULL;*/
|
||||
/*topicObj.withTbName = 1;*/
|
||||
/*topicObj.withSchema = 1;*/
|
||||
} else if (pCreate->subType == TOPIC_SUB_TYPE__TABLE) {
|
||||
}
|
||||
/*} else if (pCreate->subType == TOPIC_SUB_TYPE__DB) {*/
|
||||
/*topicObj.ast = NULL;*/
|
||||
/*topicObj.astLen = 0;*/
|
||||
/*topicObj.physicalPlan = NULL;*/
|
||||
/*topicObj.withTbName = 1;*/
|
||||
/*topicObj.withSchema = 1;*/
|
||||
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pReq);
|
||||
if (pTrans == NULL) {
|
||||
|
|
|
@ -264,14 +264,13 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
|
|||
/*pExec->withSchema = req.withSchema;*/
|
||||
/*pExec->withTag = req.withTag;*/
|
||||
|
||||
pHandle->execHandle.exec.execCol.qmsg = req.qmsg;
|
||||
req.qmsg = NULL;
|
||||
|
||||
pHandle->pWalReader = walOpenReadHandle(pTq->pVnode->pWal);
|
||||
for (int32_t i = 0; i < 5; i++) {
|
||||
pHandle->execHandle.pExecReader[i] = tqInitSubmitMsgScanner(pTq->pVnode->pMeta);
|
||||
}
|
||||
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
|
||||
pHandle->execHandle.exec.execCol.qmsg = req.qmsg;
|
||||
req.qmsg = NULL;
|
||||
for (int32_t i = 0; i < 5; i++) {
|
||||
SReadHandle handle = {
|
||||
.reader = pHandle->execHandle.pExecReader[i],
|
||||
|
@ -286,6 +285,13 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
|
|||
pHandle->execHandle.exec.execDb.pFilterOutTbUid =
|
||||
taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
|
||||
} else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
|
||||
int64_t suid = 0;
|
||||
/*pHandle->execHandle.exec.execTb.suid = req.suid;*/
|
||||
SArray* tbUidList = taosArrayInit(0, sizeof(int16_t));
|
||||
tsdbGetAllTableList(pTq->pVnode->pMeta, suid, tbUidList);
|
||||
for (int32_t i = 0; i < 5; i++) {
|
||||
tqReadHandleSetTbUidList(pHandle->execHandle.pExecReader[i], tbUidList);
|
||||
}
|
||||
}
|
||||
taosHashPut(pTq->handles, req.subKey, strlen(req.subKey), pHandle, sizeof(STqHandle));
|
||||
} else {
|
||||
|
|
Loading…
Reference in New Issue