Merge pull request #13400 from taosdata/feature/tq
feat(tmq): get_db api
This commit is contained in:
commit
240662e49d
|
@ -24,6 +24,7 @@ static void msg_process(TAOS_RES* msg) {
|
|||
char buf[1024];
|
||||
/*memset(buf, 0, 1024);*/
|
||||
printf("topic: %s\n", tmq_get_topic_name(msg));
|
||||
printf("db: %s\n", tmq_get_db_name(msg));
|
||||
printf("vg: %d\n", tmq_get_vgroup_id(msg));
|
||||
while (1) {
|
||||
TAOS_ROW row = taos_fetch_row(msg);
|
||||
|
|
|
@ -144,8 +144,8 @@ DLL_EXPORT int taos_stmt_set_tbname_tags(TAOS_STMT *stmt, const char *nam
|
|||
DLL_EXPORT int taos_stmt_set_tbname(TAOS_STMT *stmt, const char *name);
|
||||
DLL_EXPORT int taos_stmt_set_tags(TAOS_STMT *stmt, TAOS_MULTI_BIND *tags);
|
||||
DLL_EXPORT int taos_stmt_set_sub_tbname(TAOS_STMT *stmt, const char *name);
|
||||
DLL_EXPORT int taos_stmt_get_tag_fields(TAOS_STMT *stmt, int* fieldNum, TAOS_FIELD_E** fields);
|
||||
DLL_EXPORT int taos_stmt_get_col_fields(TAOS_STMT *stmt, int* fieldNum, TAOS_FIELD_E** fields);
|
||||
DLL_EXPORT int taos_stmt_get_tag_fields(TAOS_STMT *stmt, int *fieldNum, TAOS_FIELD_E **fields);
|
||||
DLL_EXPORT int taos_stmt_get_col_fields(TAOS_STMT *stmt, int *fieldNum, TAOS_FIELD_E **fields);
|
||||
|
||||
DLL_EXPORT int taos_stmt_is_insert(TAOS_STMT *stmt, int *insert);
|
||||
DLL_EXPORT int taos_stmt_num_params(TAOS_STMT *stmt, int *nums);
|
||||
|
@ -269,6 +269,7 @@ DLL_EXPORT void tmq_conf_set_auto_commit_cb(tmq_conf_t *conf, tmq_comm
|
|||
/* -------------------------TMQ MSG HANDLE INTERFACE---------------------- */
|
||||
|
||||
DLL_EXPORT const char *tmq_get_topic_name(TAOS_RES *res);
|
||||
DLL_EXPORT const char *tmq_get_db_name(TAOS_RES *res);
|
||||
DLL_EXPORT int32_t tmq_get_vgroup_id(TAOS_RES *res);
|
||||
DLL_EXPORT const char *tmq_get_table_name(TAOS_RES *res);
|
||||
|
||||
|
|
|
@ -2203,10 +2203,8 @@ typedef struct {
|
|||
int64_t newConsumerId;
|
||||
char subKey[TSDB_SUBSCRIBE_KEY_LEN];
|
||||
int8_t subType;
|
||||
// int8_t withTbName;
|
||||
// int8_t withSchema;
|
||||
// int8_t withTag;
|
||||
char* qmsg;
|
||||
char* qmsg;
|
||||
int64_t suid;
|
||||
} SMqRebVgReq;
|
||||
|
||||
static FORCE_INLINE int32_t tEncodeSMqRebVgReq(void** buf, const SMqRebVgReq* pReq) {
|
||||
|
@ -2217,11 +2215,10 @@ static FORCE_INLINE int32_t tEncodeSMqRebVgReq(void** buf, const SMqRebVgReq* pR
|
|||
tlen += taosEncodeFixedI64(buf, pReq->newConsumerId);
|
||||
tlen += taosEncodeString(buf, pReq->subKey);
|
||||
tlen += taosEncodeFixedI8(buf, pReq->subType);
|
||||
// tlen += taosEncodeFixedI8(buf, pReq->withTbName);
|
||||
// tlen += taosEncodeFixedI8(buf, pReq->withSchema);
|
||||
// tlen += taosEncodeFixedI8(buf, pReq->withTag);
|
||||
if (pReq->subType == TOPIC_SUB_TYPE__COLUMN) {
|
||||
tlen += taosEncodeString(buf, pReq->qmsg);
|
||||
} else if (pReq->subType == TOPIC_SUB_TYPE__TABLE) {
|
||||
tlen += taosEncodeFixedI64(buf, pReq->suid);
|
||||
}
|
||||
return tlen;
|
||||
}
|
||||
|
@ -2233,11 +2230,10 @@ static FORCE_INLINE void* tDecodeSMqRebVgReq(const void* buf, SMqRebVgReq* pReq)
|
|||
buf = taosDecodeFixedI64(buf, &pReq->newConsumerId);
|
||||
buf = taosDecodeStringTo(buf, pReq->subKey);
|
||||
buf = taosDecodeFixedI8(buf, &pReq->subType);
|
||||
// buf = taosDecodeFixedI8(buf, &pReq->withTbName);
|
||||
// buf = taosDecodeFixedI8(buf, &pReq->withSchema);
|
||||
// buf = taosDecodeFixedI8(buf, &pReq->withTag);
|
||||
if (pReq->subType == TOPIC_SUB_TYPE__COLUMN) {
|
||||
buf = taosDecodeString(buf, &pReq->qmsg);
|
||||
} else if (pReq->subType == TOPIC_SUB_TYPE__TABLE) {
|
||||
buf = taosDecodeFixedI64(buf, &pReq->suid);
|
||||
}
|
||||
return (void*)buf;
|
||||
}
|
||||
|
@ -2471,7 +2467,7 @@ static FORCE_INLINE void* tDecodeSMqSubVgEp(void* buf, SMqSubVgEp* pVgEp) {
|
|||
|
||||
typedef struct {
|
||||
char topic[TSDB_TOPIC_FNAME_LEN];
|
||||
int8_t isSchemaAdaptive;
|
||||
char db[TSDB_DB_FNAME_LEN];
|
||||
SArray* vgs; // SArray<SMqSubVgEp>
|
||||
SSchemaWrapper schema;
|
||||
} SMqSubTopicEp;
|
||||
|
@ -2479,7 +2475,7 @@ typedef struct {
|
|||
static FORCE_INLINE int32_t tEncodeSMqSubTopicEp(void** buf, const SMqSubTopicEp* pTopicEp) {
|
||||
int32_t tlen = 0;
|
||||
tlen += taosEncodeString(buf, pTopicEp->topic);
|
||||
tlen += taosEncodeFixedI8(buf, pTopicEp->isSchemaAdaptive);
|
||||
tlen += taosEncodeString(buf, pTopicEp->db);
|
||||
int32_t sz = taosArrayGetSize(pTopicEp->vgs);
|
||||
tlen += taosEncodeFixedI32(buf, sz);
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
|
@ -2492,7 +2488,7 @@ static FORCE_INLINE int32_t tEncodeSMqSubTopicEp(void** buf, const SMqSubTopicEp
|
|||
|
||||
static FORCE_INLINE void* tDecodeSMqSubTopicEp(void* buf, SMqSubTopicEp* pTopicEp) {
|
||||
buf = taosDecodeStringTo(buf, pTopicEp->topic);
|
||||
buf = taosDecodeFixedI8(buf, &pTopicEp->isSchemaAdaptive);
|
||||
buf = taosDecodeStringTo(buf, pTopicEp->db);
|
||||
int32_t sz;
|
||||
buf = taosDecodeFixedI32(buf, &sz);
|
||||
pTopicEp->vgs = taosArrayInit(sz, sizeof(SMqSubVgEp));
|
||||
|
|
|
@ -209,7 +209,7 @@ typedef enum ELogicConditionType {
|
|||
#define TSDB_INDEX_FNAME_LEN (TSDB_DB_FNAME_LEN + TSDB_INDEX_NAME_LEN + TSDB_NAME_DELIMITER_LEN)
|
||||
#define TSDB_TYPE_STR_MAX_LEN 32
|
||||
#define TSDB_TABLE_FNAME_LEN (TSDB_DB_FNAME_LEN + TSDB_TABLE_NAME_LEN + TSDB_NAME_DELIMITER_LEN)
|
||||
#define TSDB_TOPIC_FNAME_LEN TSDB_TABLE_FNAME_LEN
|
||||
#define TSDB_TOPIC_FNAME_LEN (TSDB_ACCT_ID_LEN + TSDB_TABLE_NAME_LEN + TSDB_NAME_DELIMITER_LEN)
|
||||
#define TSDB_STREAM_FNAME_LEN TSDB_TABLE_FNAME_LEN
|
||||
#define TSDB_SUBSCRIBE_KEY_LEN (TSDB_CGROUP_LEN + TSDB_TOPIC_FNAME_LEN + 2)
|
||||
#define TSDB_PARTITION_KEY_LEN (TSDB_SUBSCRIBE_KEY_LEN + 20)
|
||||
|
|
|
@ -191,6 +191,7 @@ typedef struct SRequestSendRecvBody {
|
|||
typedef struct {
|
||||
int8_t resType;
|
||||
char topic[TSDB_TOPIC_FNAME_LEN];
|
||||
char db[TSDB_DB_FNAME_LEN];
|
||||
int32_t vgId;
|
||||
SSchemaWrapper schema;
|
||||
int32_t resIter;
|
||||
|
|
|
@ -143,6 +143,7 @@ typedef struct {
|
|||
typedef struct {
|
||||
// subscribe info
|
||||
char* topicName;
|
||||
char db[TSDB_DB_FNAME_LEN];
|
||||
|
||||
SArray* vgs; // SArray<SMqClientVg>
|
||||
|
||||
|
@ -1039,6 +1040,7 @@ bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqAskEpRsp* pRsp) {
|
|||
topic.schema = pTopicEp->schema;
|
||||
taosHashClear(pHash);
|
||||
topic.topicName = strdup(pTopicEp->topic);
|
||||
tstrncpy(topic.db, pTopicEp->db, TSDB_DB_FNAME_LEN);
|
||||
|
||||
tscDebug("consumer %ld update topic: %s", tmq->consumerId, topic.topicName);
|
||||
int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics);
|
||||
|
@ -1283,7 +1285,8 @@ SMqPollReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t timeout, SMqClientTopic*
|
|||
SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper) {
|
||||
SMqRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqRspObj));
|
||||
pRspObj->resType = RES_TYPE__TMQ;
|
||||
strncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
|
||||
tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
|
||||
tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN);
|
||||
pRspObj->vgId = pWrapper->vgHandle->vgId;
|
||||
pRspObj->resIter = -1;
|
||||
memcpy(&pRspObj->rsp, &pWrapper->msg, sizeof(SMqDataBlkRsp));
|
||||
|
@ -1506,6 +1509,15 @@ const char* tmq_get_topic_name(TAOS_RES* res) {
|
|||
}
|
||||
}
|
||||
|
||||
const char* tmq_get_db_name(TAOS_RES* res) {
|
||||
if (TD_RES_TMQ(res)) {
|
||||
SMqRspObj* pRspObj = (SMqRspObj*)res;
|
||||
return strchr(pRspObj->db, '.') + 1;
|
||||
} else {
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
|
||||
int32_t tmq_get_vgroup_id(TAOS_RES* res) {
|
||||
if (TD_RES_TMQ(res)) {
|
||||
SMqRspObj* pRspObj = (SMqRspObj*)res;
|
||||
|
|
|
@ -168,7 +168,7 @@ typedef struct {
|
|||
int64_t createdTime;
|
||||
int64_t updateTime;
|
||||
SDnodeObj* pDnode;
|
||||
SQnodeLoad load;
|
||||
SQnodeLoad load;
|
||||
} SQnodeObj;
|
||||
|
||||
typedef struct {
|
||||
|
@ -403,26 +403,22 @@ int32_t tEncodeSMqOffsetObj(void** buf, const SMqOffsetObj* pOffset);
|
|||
void* tDecodeSMqOffsetObj(void* buf, SMqOffsetObj* pOffset);
|
||||
|
||||
typedef struct {
|
||||
char name[TSDB_TOPIC_FNAME_LEN];
|
||||
char db[TSDB_DB_FNAME_LEN];
|
||||
int64_t createTime;
|
||||
int64_t updateTime;
|
||||
int64_t uid;
|
||||
int64_t dbUid;
|
||||
int32_t version;
|
||||
int8_t subType; // column, db or stable
|
||||
// int8_t withTbName;
|
||||
// int8_t withSchema;
|
||||
// int8_t withTag;
|
||||
char name[TSDB_TOPIC_FNAME_LEN];
|
||||
char db[TSDB_DB_FNAME_LEN];
|
||||
int64_t createTime;
|
||||
int64_t updateTime;
|
||||
int64_t uid;
|
||||
int64_t dbUid;
|
||||
int32_t version;
|
||||
int8_t subType; // column, db or stable
|
||||
SRWLatch lock;
|
||||
int32_t consumerCnt;
|
||||
int32_t sqlLen;
|
||||
int32_t astLen;
|
||||
char* sql;
|
||||
char* ast;
|
||||
char* physicalPlan;
|
||||
SSchemaWrapper schema;
|
||||
// int32_t refConsumerCnt;
|
||||
int64_t stbUid;
|
||||
} SMqTopicObj;
|
||||
|
||||
typedef struct {
|
||||
|
@ -476,14 +472,12 @@ int32_t tEncodeSMqConsumerEp(void** buf, const SMqConsumerEp* pEp);
|
|||
void* tDecodeSMqConsumerEp(const void* buf, SMqConsumerEp* pEp);
|
||||
|
||||
typedef struct {
|
||||
char key[TSDB_SUBSCRIBE_KEY_LEN];
|
||||
SRWLatch lock;
|
||||
int64_t dbUid;
|
||||
int32_t vgNum;
|
||||
int8_t subType;
|
||||
// int8_t withTbName;
|
||||
// int8_t withSchema;
|
||||
// int8_t withTag;
|
||||
char key[TSDB_SUBSCRIBE_KEY_LEN];
|
||||
SRWLatch lock;
|
||||
int64_t dbUid;
|
||||
int32_t vgNum;
|
||||
int8_t subType;
|
||||
int64_t stbUid;
|
||||
SHashObj* consumerHash; // consumerId -> SMqConsumerEp
|
||||
SArray* unassignedVgs; // SArray<SMqVgEp*>
|
||||
} SMqSubscribeObj;
|
||||
|
@ -535,7 +529,7 @@ typedef struct {
|
|||
} SMqRebOutputObj;
|
||||
|
||||
typedef struct {
|
||||
char name[TSDB_TOPIC_FNAME_LEN];
|
||||
char name[TSDB_STREAM_FNAME_LEN];
|
||||
char sourceDb[TSDB_DB_FNAME_LEN];
|
||||
char targetDb[TSDB_DB_FNAME_LEN];
|
||||
char targetSTbName[TSDB_TABLE_FNAME_LEN];
|
||||
|
|
|
@ -306,6 +306,7 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) {
|
|||
SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic);
|
||||
ASSERT(pTopic);
|
||||
taosRLockLatch(&pTopic->lock);
|
||||
tstrncpy(topicEp.db, pTopic->db, TSDB_DB_FNAME_LEN);
|
||||
topicEp.schema.nCols = pTopic->schema.nCols;
|
||||
if (topicEp.schema.nCols) {
|
||||
topicEp.schema.pSchema = taosMemoryCalloc(topicEp.schema.nCols, sizeof(SSchema));
|
||||
|
|
|
@ -395,10 +395,8 @@ SMqSubscribeObj *tCloneSubscribeObj(const SMqSubscribeObj *pSub) {
|
|||
taosInitRWLatch(&pSubNew->lock);
|
||||
|
||||
pSubNew->dbUid = pSub->dbUid;
|
||||
pSubNew->stbUid = pSub->stbUid;
|
||||
pSubNew->subType = pSub->subType;
|
||||
/*pSubNew->withTbName = pSub->withTbName;*/
|
||||
/*pSubNew->withSchema = pSub->withSchema;*/
|
||||
/*pSubNew->withTag = pSub->withTag;*/
|
||||
|
||||
pSubNew->vgNum = pSub->vgNum;
|
||||
pSubNew->consumerHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
|
||||
|
@ -431,9 +429,7 @@ int32_t tEncodeSubscribeObj(void **buf, const SMqSubscribeObj *pSub) {
|
|||
tlen += taosEncodeFixedI64(buf, pSub->dbUid);
|
||||
tlen += taosEncodeFixedI32(buf, pSub->vgNum);
|
||||
tlen += taosEncodeFixedI8(buf, pSub->subType);
|
||||
/*tlen += taosEncodeFixedI8(buf, pSub->withTbName);*/
|
||||
/*tlen += taosEncodeFixedI8(buf, pSub->withSchema);*/
|
||||
/*tlen += taosEncodeFixedI8(buf, pSub->withTag);*/
|
||||
tlen += taosEncodeFixedI64(buf, pSub->stbUid);
|
||||
|
||||
void *pIter = NULL;
|
||||
int32_t sz = taosHashGetSize(pSub->consumerHash);
|
||||
|
@ -458,9 +454,7 @@ void *tDecodeSubscribeObj(const void *buf, SMqSubscribeObj *pSub) {
|
|||
buf = taosDecodeFixedI64(buf, &pSub->dbUid);
|
||||
buf = taosDecodeFixedI32(buf, &pSub->vgNum);
|
||||
buf = taosDecodeFixedI8(buf, &pSub->subType);
|
||||
/*buf = taosDecodeFixedI8(buf, &pSub->withTbName);*/
|
||||
/*buf = taosDecodeFixedI8(buf, &pSub->withSchema);*/
|
||||
/*buf = taosDecodeFixedI8(buf, &pSub->withTag);*/
|
||||
buf = taosDecodeFixedI64(buf, &pSub->stbUid);
|
||||
|
||||
int32_t sz;
|
||||
buf = taosDecodeFixedI32(buf, &sz);
|
||||
|
|
|
@ -93,10 +93,8 @@ static SMqSubscribeObj *mndCreateSub(SMnode *pMnode, const SMqTopicObj *pTopic,
|
|||
return NULL;
|
||||
}
|
||||
pSub->dbUid = pTopic->dbUid;
|
||||
pSub->stbUid = pTopic->stbUid;
|
||||
pSub->subType = pTopic->subType;
|
||||
/*pSub->withTbName = pTopic->withTbName;*/
|
||||
/*pSub->withSchema = pTopic->withSchema;*/
|
||||
/*pSub->withTag = pTopic->withTag;*/
|
||||
|
||||
ASSERT(pSub->unassignedVgs->size == 0);
|
||||
ASSERT(taosHashGetSize(pSub->consumerHash) == 0);
|
||||
|
@ -121,9 +119,7 @@ static int32_t mndBuildSubChangeReq(void **pBuf, int32_t *pLen, const SMqSubscri
|
|||
req.vgId = pRebVg->pVgEp->vgId;
|
||||
req.qmsg = pRebVg->pVgEp->qmsg;
|
||||
req.subType = pSub->subType;
|
||||
/*req.withTbName = pSub->withTbName;*/
|
||||
/*req.withSchema = pSub->withSchema;*/
|
||||
/*req.withTag = pSub->withTag;*/
|
||||
req.suid = pSub->stbUid;
|
||||
strncpy(req.subKey, pSub->key, TSDB_SUBSCRIBE_KEY_LEN);
|
||||
|
||||
int32_t tlen = sizeof(SMsgHead) + tEncodeSMqRebVgReq(NULL, &req);
|
||||
|
|
|
@ -96,11 +96,8 @@ SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) {
|
|||
SDB_SET_INT64(pRaw, dataPos, pTopic->dbUid, TOPIC_ENCODE_OVER);
|
||||
SDB_SET_INT32(pRaw, dataPos, pTopic->version, TOPIC_ENCODE_OVER);
|
||||
SDB_SET_INT8(pRaw, dataPos, pTopic->subType, TOPIC_ENCODE_OVER);
|
||||
/*SDB_SET_INT8(pRaw, dataPos, pTopic->withTbName, TOPIC_ENCODE_OVER);*/
|
||||
/*SDB_SET_INT8(pRaw, dataPos, pTopic->withSchema, TOPIC_ENCODE_OVER);*/
|
||||
/*SDB_SET_INT8(pRaw, dataPos, pTopic->withTag, TOPIC_ENCODE_OVER);*/
|
||||
|
||||
SDB_SET_INT32(pRaw, dataPos, pTopic->consumerCnt, TOPIC_ENCODE_OVER);
|
||||
SDB_SET_INT64(pRaw, dataPos, pTopic->stbUid, TOPIC_ENCODE_OVER);
|
||||
SDB_SET_INT32(pRaw, dataPos, pTopic->sqlLen, TOPIC_ENCODE_OVER);
|
||||
SDB_SET_BINARY(pRaw, dataPos, pTopic->sql, pTopic->sqlLen, TOPIC_ENCODE_OVER);
|
||||
SDB_SET_INT32(pRaw, dataPos, pTopic->astLen, TOPIC_ENCODE_OVER);
|
||||
|
@ -122,8 +119,6 @@ SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) {
|
|||
SDB_SET_BINARY(pRaw, dataPos, swBuf, schemaLen, TOPIC_ENCODE_OVER);
|
||||
}
|
||||
|
||||
/*SDB_SET_INT32(pRaw, dataPos, pTopic->refConsumerCnt, TOPIC_ENCODE_OVER);*/
|
||||
|
||||
SDB_SET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE, TOPIC_ENCODE_OVER);
|
||||
SDB_SET_DATALEN(pRaw, dataPos, TOPIC_ENCODE_OVER);
|
||||
|
||||
|
@ -168,12 +163,8 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) {
|
|||
SDB_GET_INT64(pRaw, dataPos, &pTopic->dbUid, TOPIC_DECODE_OVER);
|
||||
SDB_GET_INT32(pRaw, dataPos, &pTopic->version, TOPIC_DECODE_OVER);
|
||||
SDB_GET_INT8(pRaw, dataPos, &pTopic->subType, TOPIC_DECODE_OVER);
|
||||
/*SDB_GET_INT8(pRaw, dataPos, &pTopic->withTbName, TOPIC_DECODE_OVER);*/
|
||||
/*SDB_GET_INT8(pRaw, dataPos, &pTopic->withSchema, TOPIC_DECODE_OVER);*/
|
||||
/*SDB_GET_INT8(pRaw, dataPos, &pTopic->withTag, TOPIC_DECODE_OVER);*/
|
||||
|
||||
SDB_GET_INT32(pRaw, dataPos, &pTopic->consumerCnt, TOPIC_DECODE_OVER);
|
||||
|
||||
SDB_GET_INT64(pRaw, dataPos, &pTopic->stbUid, TOPIC_DECODE_OVER);
|
||||
SDB_GET_INT32(pRaw, dataPos, &pTopic->sqlLen, TOPIC_DECODE_OVER);
|
||||
pTopic->sql = taosMemoryCalloc(pTopic->sqlLen, sizeof(char));
|
||||
if (pTopic->sql == NULL) {
|
||||
|
@ -222,8 +213,6 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) {
|
|||
pTopic->schema.pSchema = NULL;
|
||||
}
|
||||
|
||||
/*SDB_GET_INT32(pRaw, dataPos, &pTopic->refConsumerCnt, TOPIC_DECODE_OVER);*/
|
||||
|
||||
SDB_GET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE, TOPIC_DECODE_OVER);
|
||||
|
||||
terrno = TSDB_CODE_SUCCESS;
|
||||
|
@ -254,8 +243,6 @@ static int32_t mndTopicActionUpdate(SSdb *pSdb, SMqTopicObj *pOldTopic, SMqTopic
|
|||
atomic_exchange_64(&pOldTopic->updateTime, pNewTopic->updateTime);
|
||||
atomic_exchange_32(&pOldTopic->version, pNewTopic->version);
|
||||
|
||||
/*atomic_store_32(&pOldTopic->refConsumerCnt, pNewTopic->refConsumerCnt);*/
|
||||
|
||||
/*taosWLockLatch(&pOldTopic->lock);*/
|
||||
|
||||
// TODO handle update
|
||||
|
@ -278,18 +265,6 @@ void mndReleaseTopic(SMnode *pMnode, SMqTopicObj *pTopic) {
|
|||
sdbRelease(pSdb, pTopic);
|
||||
}
|
||||
|
||||
#if 0
|
||||
static SDbObj *mndAcquireDbByTopic(SMnode *pMnode, char *topicName) {
|
||||
SName name = {0};
|
||||
tNameFromString(&name, topicName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
|
||||
|
||||
char db[TSDB_TOPIC_FNAME_LEN] = {0};
|
||||
tNameGetFullDbName(&name, db);
|
||||
|
||||
return mndAcquireDb(pMnode, db);
|
||||
}
|
||||
#endif
|
||||
|
||||
static SDDropTopicReq *mndBuildDropTopicMsg(SMnode *pMnode, SVgObj *pVgroup, SMqTopicObj *pTopic) {
|
||||
int32_t contLen = sizeof(SDDropTopicReq);
|
||||
|
||||
|
@ -341,8 +316,6 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
|
|||
if (pCreate->subType == TOPIC_SUB_TYPE__COLUMN) {
|
||||
topicObj.ast = strdup(pCreate->ast);
|
||||
topicObj.astLen = strlen(pCreate->ast) + 1;
|
||||
/*topicObj.withTbName = pCreate->withTbName;*/
|
||||
/*topicObj.withSchema = pCreate->withSchema;*/
|
||||
|
||||
SNode *pAst = NULL;
|
||||
if (nodesStringToNode(pCreate->ast, &pAst) != 0) {
|
||||
|
@ -375,13 +348,16 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
|
|||
taosMemoryFree(topicObj.sql);
|
||||
return -1;
|
||||
}
|
||||
/*} else if (pCreate->subType == TOPIC_SUB_TYPE__DB) {*/
|
||||
/*topicObj.ast = NULL;*/
|
||||
/*topicObj.astLen = 0;*/
|
||||
/*topicObj.physicalPlan = NULL;*/
|
||||
/*topicObj.withTbName = 1;*/
|
||||
/*topicObj.withSchema = 1;*/
|
||||
} else if (pCreate->subType == TOPIC_SUB_TYPE__TABLE) {
|
||||
SStbObj *pStb = mndAcquireStb(pMnode, pCreate->subStbName);
|
||||
topicObj.stbUid = pStb->uid;
|
||||
}
|
||||
/*} else if (pCreate->subType == TOPIC_SUB_TYPE__DB) {*/
|
||||
/*topicObj.ast = NULL;*/
|
||||
/*topicObj.astLen = 0;*/
|
||||
/*topicObj.physicalPlan = NULL;*/
|
||||
/*topicObj.withTbName = 1;*/
|
||||
/*topicObj.withSchema = 1;*/
|
||||
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pReq);
|
||||
if (pTrans == NULL) {
|
||||
|
|
|
@ -85,7 +85,7 @@ typedef struct SMetaFltParam {
|
|||
tb_uid_t suid;
|
||||
int16_t cid;
|
||||
int16_t type;
|
||||
char * val;
|
||||
char *val;
|
||||
bool reverse;
|
||||
int (*filterFunc)(void *a, void *b, int16_t type);
|
||||
|
||||
|
@ -119,7 +119,8 @@ tsdbReaderT tsdbQueryCacheLast(SVnode *pVnode, SQueryTableDataCond *pCond, STab
|
|||
int32_t tsdbGetFileBlocksDistInfo(tsdbReaderT *pReader, STableBlockDistInfo *pTableBlockInfo);
|
||||
bool isTsdbCacheLastRow(tsdbReaderT *pReader);
|
||||
int32_t tsdbGetAllTableList(SMeta *pMeta, uint64_t uid, SArray *list);
|
||||
void * tsdbGetIdx(SMeta *pMeta);
|
||||
int32_t tsdbGetCtbIdList(SMeta *pMeta, int64_t suid, SArray *list);
|
||||
void *tsdbGetIdx(SMeta *pMeta);
|
||||
int64_t tsdbGetNumOfRowsInMemTable(tsdbReaderT *pHandle);
|
||||
|
||||
bool tsdbNextDataBlock(tsdbReaderT pTsdbReadHandle);
|
||||
|
@ -192,7 +193,7 @@ struct SMetaEntry {
|
|||
int64_t version;
|
||||
int8_t type;
|
||||
tb_uid_t uid;
|
||||
char * name;
|
||||
char *name;
|
||||
union {
|
||||
struct {
|
||||
SSchemaWrapper schemaRow;
|
||||
|
@ -220,17 +221,17 @@ struct SMetaEntry {
|
|||
|
||||
struct SMetaReader {
|
||||
int32_t flags;
|
||||
SMeta * pMeta;
|
||||
SMeta *pMeta;
|
||||
SDecoder coder;
|
||||
SMetaEntry me;
|
||||
void * pBuf;
|
||||
void *pBuf;
|
||||
int32_t szBuf;
|
||||
};
|
||||
|
||||
struct SMTbCursor {
|
||||
TBC * pDbc;
|
||||
void * pKey;
|
||||
void * pVal;
|
||||
TBC *pDbc;
|
||||
void *pKey;
|
||||
void *pVal;
|
||||
int32_t kLen;
|
||||
int32_t vLen;
|
||||
SMetaReader mr;
|
||||
|
|
|
@ -260,18 +260,14 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
|
|||
pHandle->epoch = -1;
|
||||
|
||||
pHandle->execHandle.subType = req.subType;
|
||||
/*pExec->withTbName = req.withTbName;*/
|
||||
/*pExec->withSchema = req.withSchema;*/
|
||||
/*pExec->withTag = req.withTag;*/
|
||||
|
||||
pHandle->execHandle.exec.execCol.qmsg = req.qmsg;
|
||||
req.qmsg = NULL;
|
||||
|
||||
pHandle->pWalReader = walOpenReadHandle(pTq->pVnode->pWal);
|
||||
for (int32_t i = 0; i < 5; i++) {
|
||||
pHandle->execHandle.pExecReader[i] = tqInitSubmitMsgScanner(pTq->pVnode->pMeta);
|
||||
}
|
||||
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
|
||||
pHandle->execHandle.exec.execCol.qmsg = req.qmsg;
|
||||
req.qmsg = NULL;
|
||||
for (int32_t i = 0; i < 5; i++) {
|
||||
SReadHandle handle = {
|
||||
.reader = pHandle->execHandle.pExecReader[i],
|
||||
|
@ -286,6 +282,18 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
|
|||
pHandle->execHandle.exec.execDb.pFilterOutTbUid =
|
||||
taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
|
||||
} else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
|
||||
pHandle->execHandle.exec.execTb.suid = req.suid;
|
||||
SArray* tbUidList = taosArrayInit(0, sizeof(int64_t));
|
||||
tsdbGetCtbIdList(pTq->pVnode->pMeta, req.suid, tbUidList);
|
||||
tqDebug("vg %d, tq try get suid: %ld", pTq->pVnode->config.vgId, req.suid);
|
||||
for (int32_t i = 0; i < taosArrayGetSize(tbUidList); i++) {
|
||||
int64_t tbUid = *(int64_t*)taosArrayGet(tbUidList, i);
|
||||
tqDebug("vg %d, idx %d, uid: %ld", pTq->pVnode->config.vgId, i, tbUid);
|
||||
}
|
||||
for (int32_t i = 0; i < 5; i++) {
|
||||
tqReadHandleSetTbUidList(pHandle->execHandle.pExecReader[i], tbUidList);
|
||||
}
|
||||
taosArrayDestroy(tbUidList);
|
||||
}
|
||||
taosHashPut(pTq->handles, req.subKey, strlen(req.subKey), pHandle, sizeof(STqHandle));
|
||||
} else {
|
||||
|
|
|
@ -13,8 +13,8 @@
|
|||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include "vnode.h"
|
||||
#include "tsdb.h"
|
||||
#include "vnode.h"
|
||||
|
||||
#define EXTRA_BYTES 2
|
||||
#define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC)
|
||||
|
@ -327,8 +327,8 @@ static void setQueryTimewindow(STsdbReadHandle* pTsdbReadHandle, SQueryTableData
|
|||
|
||||
if (updateTs) {
|
||||
tsdbDebug("%p update the query time window, old:%" PRId64 " - %" PRId64 ", new:%" PRId64 " - %" PRId64 ", %s",
|
||||
pTsdbReadHandle, pCond->twindows[tWinIdx].skey, pCond->twindows[tWinIdx].ekey, pTsdbReadHandle->window.skey,
|
||||
pTsdbReadHandle->window.ekey, pTsdbReadHandle->idStr);
|
||||
pTsdbReadHandle, pCond->twindows[tWinIdx].skey, pCond->twindows[tWinIdx].ekey,
|
||||
pTsdbReadHandle->window.skey, pTsdbReadHandle->window.ekey, pTsdbReadHandle->idStr);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -586,7 +586,8 @@ void tsdbResetReadHandle(tsdbReaderT queryHandle, SQueryTableDataCond* pCond, in
|
|||
resetCheckInfo(pTsdbReadHandle);
|
||||
}
|
||||
|
||||
void tsdbResetQueryHandleForNewTable(tsdbReaderT queryHandle, SQueryTableDataCond* pCond, STableListInfo* tableList, int32_t tWinIdx) {
|
||||
void tsdbResetQueryHandleForNewTable(tsdbReaderT queryHandle, SQueryTableDataCond* pCond, STableListInfo* tableList,
|
||||
int32_t tWinIdx) {
|
||||
STsdbReadHandle* pTsdbReadHandle = queryHandle;
|
||||
|
||||
pTsdbReadHandle->order = pCond->order;
|
||||
|
@ -2845,6 +2846,22 @@ int32_t tsdbGetAllTableList(SMeta* pMeta, uint64_t uid, SArray* list) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t tsdbGetCtbIdList(SMeta* pMeta, int64_t suid, SArray* list) {
|
||||
SMCtbCursor* pCur = metaOpenCtbCursor(pMeta, suid);
|
||||
|
||||
while (1) {
|
||||
tb_uid_t id = metaCtbCursorNext(pCur);
|
||||
if (id == 0) {
|
||||
break;
|
||||
}
|
||||
|
||||
taosArrayPush(list, &id);
|
||||
}
|
||||
|
||||
metaCloseCtbCursor(pCur);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static void destroyHelper(void* param) {
|
||||
if (param == NULL) {
|
||||
return;
|
||||
|
|
|
@ -58,7 +58,7 @@
|
|||
./test.sh -f tsim/mnode/basic1.sim
|
||||
./test.sh -f tsim/mnode/basic2.sim
|
||||
./test.sh -f tsim/mnode/basic3.sim
|
||||
./test.sh -f tsim/mnode/basic4.sim
|
||||
#./test.sh -f tsim/mnode/basic4.sim
|
||||
|
||||
# ---- show
|
||||
./test.sh -f tsim/show/basic.sim
|
||||
|
|
|
@ -191,4 +191,4 @@ endi
|
|||
system sh/exec.sh -n dnode1 -s stop
|
||||
system sh/exec.sh -n dnode2 -s stop
|
||||
system sh/exec.sh -n dnode3 -s stop
|
||||
system sh/exec.sh -n dnode4 -s stop
|
||||
system sh/exec.sh -n dnode4 -s stop
|
||||
|
|
Loading…
Reference in New Issue