Merge pull request #10368 from taosdata/feature/tq
new offset management
This commit is contained in:
commit
3662e6b0ce
|
@ -66,7 +66,7 @@ int32_t init_env() {
|
|||
}
|
||||
|
||||
int32_t create_topic() {
|
||||
printf("create topic");
|
||||
printf("create topic\n");
|
||||
TAOS_RES* pRes;
|
||||
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||
if (pConn == NULL) {
|
||||
|
@ -91,6 +91,10 @@ int32_t create_topic() {
|
|||
return 0;
|
||||
}
|
||||
|
||||
void tmq_commit_cb_print(tmq_t* tmq, tmq_resp_err_t resp, tmq_topic_vgroup_list_t* offsets, void* param) {
|
||||
printf("commit %d\n", resp);
|
||||
}
|
||||
|
||||
tmq_t* build_consumer() {
|
||||
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||
assert(pConn != NULL);
|
||||
|
@ -103,6 +107,7 @@ tmq_t* build_consumer() {
|
|||
|
||||
tmq_conf_t* conf = tmq_conf_new();
|
||||
tmq_conf_set(conf, "group.id", "tg2");
|
||||
tmq_conf_set_offset_commit_cb(conf, tmq_commit_cb_print);
|
||||
tmq_t* tmq = tmq_consumer_new(pConn, conf, NULL, 0);
|
||||
return tmq;
|
||||
}
|
||||
|
@ -144,7 +149,7 @@ void basic_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
|
|||
}
|
||||
|
||||
void sync_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
|
||||
static const int MIN_COMMIT_COUNT = 1000;
|
||||
static const int MIN_COMMIT_COUNT = 1;
|
||||
|
||||
int msg_count = 0;
|
||||
tmq_resp_err_t err;
|
||||
|
@ -214,6 +219,6 @@ int main(int argc, char* argv[]) {
|
|||
tmq_t* tmq = build_consumer();
|
||||
tmq_list_t* topic_list = build_topic_list();
|
||||
/*perf_loop(tmq, topic_list);*/
|
||||
basic_consume_loop(tmq, topic_list);
|
||||
/*sync_consume_loop(tmq, topic_list);*/
|
||||
/*basic_consume_loop(tmq, topic_list);*/
|
||||
sync_consume_loop(tmq, topic_list);
|
||||
}
|
||||
|
|
|
@ -198,8 +198,8 @@ DLL_EXPORT TAOS_RES *taos_schemaless_insert(TAOS *taos, char *lines[], int numLi
|
|||
/* --------------------------TMQ INTERFACE------------------------------- */
|
||||
|
||||
enum tmq_resp_err_t {
|
||||
TMQ_RESP_ERR__FAIL = -1,
|
||||
TMQ_RESP_ERR__SUCCESS = 0,
|
||||
TMQ_RESP_ERR__FAIL = 1,
|
||||
};
|
||||
|
||||
typedef enum tmq_resp_err_t tmq_resp_err_t;
|
||||
|
@ -226,7 +226,7 @@ DLL_EXPORT const char *tmq_err2str(tmq_resp_err_t);
|
|||
DLL_EXPORT tmq_resp_err_t tmq_subscribe(tmq_t *tmq, tmq_list_t *topic_list);
|
||||
#if 0
|
||||
DLL_EXPORT tmq_resp_err_t tmq_unsubscribe(tmq_t* tmq);
|
||||
DLL_EXPORT tmq_resp_err_t tmq_subscription(tmq_t* tmq, tmq_topic_vgroup_list_t** topics);
|
||||
DLL_EXPORT tmq_resp_err_t tmq_subscription(tmq_t* tmq, tmq_list_t** topics);
|
||||
#endif
|
||||
DLL_EXPORT tmq_message_t *tmq_consumer_poll(tmq_t *tmq, int64_t blocking_time);
|
||||
DLL_EXPORT tmq_resp_err_t tmq_consumer_close(tmq_t *tmq);
|
||||
|
@ -238,6 +238,7 @@ DLL_EXPORT tmq_resp_err_t tmq_commit(tmq_t *tmq, const tmq_topic_vgroup_list_t *
|
|||
#if 0
|
||||
DLL_EXPORT tmq_resp_err_t tmq_commit_message(tmq_t* tmq, const tmq_message_t* tmqmessage, int32_t async);
|
||||
#endif
|
||||
DLL_EXPORT tmq_resp_err_t tmq_seek(tmq_t *tmq, const tmq_topic_vgroup_t *offset);
|
||||
/* ----------------------TMQ CONFIGURATION INTERFACE---------------------- */
|
||||
|
||||
enum tmq_conf_res_t {
|
||||
|
|
|
@ -16,7 +16,6 @@
|
|||
#ifndef TDENGINE_COMMON_H
|
||||
#define TDENGINE_COMMON_H
|
||||
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
|
@ -43,14 +42,16 @@ extern "C" {
|
|||
// int16_t bytes;
|
||||
// } SSchema;
|
||||
|
||||
#define TMQ_REQ_TYPE_COMMIT_ONLY 0
|
||||
#define TMQ_REQ_TYPE_CONSUME_ONLY 1
|
||||
#define TMQ_REQ_TYPE_CONSUME_AND_COMMIT 2
|
||||
enum {
|
||||
TMQ_CONF__RESET_OFFSET__LATEST = -1,
|
||||
TMQ_CONF__RESET_OFFSET__EARLIEAST = -2,
|
||||
TMQ_CONF__RESET_OFFSET__NONE = -3,
|
||||
};
|
||||
|
||||
typedef struct {
|
||||
uint32_t numOfTables;
|
||||
SArray *pGroupList;
|
||||
SHashObj *map; // speedup acquire the tableQueryInfo by table uid
|
||||
SArray* pGroupList;
|
||||
SHashObj* map; // speedup acquire the tableQueryInfo by table uid
|
||||
} STableGroupInfo;
|
||||
|
||||
typedef struct SColumnDataAgg {
|
||||
|
@ -79,14 +80,14 @@ typedef struct SConstantItem {
|
|||
|
||||
// info.numOfCols = taosArrayGetSize(pDataBlock) + taosArrayGetSize(pConstantList);
|
||||
typedef struct SSDataBlock {
|
||||
SColumnDataAgg *pBlockAgg;
|
||||
SArray *pDataBlock; // SArray<SColumnInfoData>
|
||||
SArray *pConstantList; // SArray<SConstantItem>, it is a constant/tags value of the corresponding result value.
|
||||
SDataBlockInfo info;
|
||||
SColumnDataAgg* pBlockAgg;
|
||||
SArray* pDataBlock; // SArray<SColumnInfoData>
|
||||
SArray* pConstantList; // SArray<SConstantItem>, it is a constant/tags value of the corresponding result value.
|
||||
SDataBlockInfo info;
|
||||
} SSDataBlock;
|
||||
|
||||
typedef struct SVarColAttr {
|
||||
int32_t *offset; // start position for each entry in the list
|
||||
int32_t* offset; // start position for each entry in the list
|
||||
uint32_t length; // used buffer size that contain the valid data
|
||||
uint32_t allocLen; // allocated buffer size
|
||||
} SVarColAttr;
|
||||
|
@ -94,11 +95,11 @@ typedef struct SVarColAttr {
|
|||
// pBlockAgg->numOfNull == info.rows, all data are null
|
||||
// pBlockAgg->numOfNull == 0, no data are null.
|
||||
typedef struct SColumnInfoData {
|
||||
SColumnInfo info; // TODO filter info needs to be removed
|
||||
bool hasNull;// if current column data has null value.
|
||||
char *pData; // the corresponding block data in memory
|
||||
SColumnInfo info; // TODO filter info needs to be removed
|
||||
bool hasNull; // if current column data has null value.
|
||||
char* pData; // the corresponding block data in memory
|
||||
union {
|
||||
char *nullbitmap; // bitmap, one bit for each item in the list
|
||||
char* nullbitmap; // bitmap, one bit for each item in the list
|
||||
SVarColAttr varmeta;
|
||||
};
|
||||
} SColumnInfoData;
|
||||
|
@ -149,7 +150,6 @@ static FORCE_INLINE int32_t tEncodeSMqConsumeRsp(void** buf, const SMqConsumeRsp
|
|||
int32_t tlen = 0;
|
||||
int32_t sz = 0;
|
||||
tlen += taosEncodeFixedI64(buf, pRsp->consumerId);
|
||||
tlen += taosEncodeFixedI64(buf, pRsp->committedOffset);
|
||||
tlen += taosEncodeFixedI64(buf, pRsp->reqOffset);
|
||||
tlen += taosEncodeFixedI64(buf, pRsp->rspOffset);
|
||||
tlen += taosEncodeFixedI32(buf, pRsp->skipLogNum);
|
||||
|
@ -170,7 +170,6 @@ static FORCE_INLINE int32_t tEncodeSMqConsumeRsp(void** buf, const SMqConsumeRsp
|
|||
static FORCE_INLINE void* tDecodeSMqConsumeRsp(void* buf, SMqConsumeRsp* pRsp) {
|
||||
int32_t sz;
|
||||
buf = taosDecodeFixedI64(buf, &pRsp->consumerId);
|
||||
buf = taosDecodeFixedI64(buf, &pRsp->committedOffset);
|
||||
buf = taosDecodeFixedI64(buf, &pRsp->reqOffset);
|
||||
buf = taosDecodeFixedI64(buf, &pRsp->rspOffset);
|
||||
buf = taosDecodeFixedI32(buf, &pRsp->skipLogNum);
|
||||
|
@ -250,11 +249,11 @@ typedef struct SSqlExpr {
|
|||
char token[TSDB_COL_NAME_LEN]; // original token
|
||||
SSchema resSchema;
|
||||
|
||||
int32_t numOfCols;
|
||||
SColumn* pColumns; // data columns that are required by query
|
||||
int32_t interBytes; // inter result buffer size
|
||||
int16_t numOfParams; // argument value of each function
|
||||
SVariant param[3]; // parameters are not more than 3
|
||||
int32_t numOfCols;
|
||||
SColumn* pColumns; // data columns that are required by query
|
||||
int32_t interBytes; // inter result buffer size
|
||||
int16_t numOfParams; // argument value of each function
|
||||
SVariant param[3]; // parameters are not more than 3
|
||||
} SSqlExpr;
|
||||
|
||||
typedef struct SExprInfo {
|
||||
|
@ -271,7 +270,7 @@ typedef struct SSessionWindow {
|
|||
SColumn col;
|
||||
} SSessionWindow;
|
||||
|
||||
#define QUERY_ASC_FORWARD_STEP 1
|
||||
#define QUERY_ASC_FORWARD_STEP 1
|
||||
#define QUERY_DESC_FORWARD_STEP -1
|
||||
|
||||
#define GET_FORWARD_DIRECTION_FACTOR(ord) (((ord) == TSDB_ORDER_ASC) ? QUERY_ASC_FORWARD_STEP : QUERY_DESC_FORWARD_STEP)
|
||||
|
|
|
@ -422,8 +422,8 @@ typedef struct {
|
|||
} SColumnInfo;
|
||||
|
||||
typedef struct {
|
||||
uint64_t uid;
|
||||
TSKEY key; // last accessed ts, for subscription
|
||||
int64_t uid;
|
||||
TSKEY key; // last accessed ts, for subscription
|
||||
} STableIdInfo;
|
||||
|
||||
typedef struct STimeWindow {
|
||||
|
@ -554,8 +554,8 @@ int32_t tSerializeSDropDbReq(void* buf, int32_t bufLen, SDropDbReq* pReq);
|
|||
int32_t tDeserializeSDropDbReq(void* buf, int32_t bufLen, SDropDbReq* pReq);
|
||||
|
||||
typedef struct {
|
||||
char db[TSDB_DB_FNAME_LEN];
|
||||
uint64_t uid;
|
||||
char db[TSDB_DB_FNAME_LEN];
|
||||
int64_t uid;
|
||||
} SDropDbRsp;
|
||||
|
||||
int32_t tSerializeSDropDbRsp(void* buf, int32_t bufLen, SDropDbRsp* pRsp);
|
||||
|
@ -570,12 +570,12 @@ int32_t tSerializeSUseDbReq(void* buf, int32_t bufLen, SUseDbReq* pReq);
|
|||
int32_t tDeserializeSUseDbReq(void* buf, int32_t bufLen, SUseDbReq* pReq);
|
||||
|
||||
typedef struct {
|
||||
char db[TSDB_DB_FNAME_LEN];
|
||||
uint64_t uid;
|
||||
int32_t vgVersion;
|
||||
int32_t vgNum;
|
||||
int8_t hashMethod;
|
||||
SArray* pVgroupInfos; // Array of SVgroupInfo
|
||||
char db[TSDB_DB_FNAME_LEN];
|
||||
int64_t uid;
|
||||
int32_t vgVersion;
|
||||
int32_t vgNum;
|
||||
int8_t hashMethod;
|
||||
SArray* pVgroupInfos; // Array of SVgroupInfo
|
||||
} SUseDbRsp;
|
||||
|
||||
int32_t tSerializeSUseDbRsp(void* buf, int32_t bufLen, SUseDbRsp* pRsp);
|
||||
|
@ -725,7 +725,7 @@ typedef struct {
|
|||
int32_t vgId;
|
||||
int32_t dnodeId;
|
||||
char db[TSDB_DB_FNAME_LEN];
|
||||
uint64_t dbUid;
|
||||
int64_t dbUid;
|
||||
int32_t vgVersion;
|
||||
int32_t cacheBlockSize;
|
||||
int32_t totalBlocks;
|
||||
|
@ -753,10 +753,10 @@ int32_t tSerializeSCreateVnodeReq(void* buf, int32_t bufLen, SCreateVnodeReq* pR
|
|||
int32_t tDeserializeSCreateVnodeReq(void* buf, int32_t bufLen, SCreateVnodeReq* pReq);
|
||||
|
||||
typedef struct {
|
||||
int32_t vgId;
|
||||
int32_t dnodeId;
|
||||
uint64_t dbUid;
|
||||
char db[TSDB_DB_FNAME_LEN];
|
||||
int32_t vgId;
|
||||
int32_t dnodeId;
|
||||
int64_t dbUid;
|
||||
char db[TSDB_DB_FNAME_LEN];
|
||||
} SDropVnodeReq, SSyncVnodeReq, SCompactVnodeReq;
|
||||
|
||||
int32_t tSerializeSDropVnodeReq(void* buf, int32_t bufLen, SDropVnodeReq* pReq);
|
||||
|
@ -796,7 +796,7 @@ typedef struct {
|
|||
char tbName[TSDB_TABLE_NAME_LEN];
|
||||
char stbName[TSDB_TABLE_NAME_LEN];
|
||||
char dbFName[TSDB_DB_FNAME_LEN];
|
||||
uint64_t dbId;
|
||||
int64_t dbId;
|
||||
int32_t numOfTags;
|
||||
int32_t numOfColumns;
|
||||
int8_t precision;
|
||||
|
@ -1268,7 +1268,7 @@ typedef struct {
|
|||
typedef struct {
|
||||
SMsgHead head;
|
||||
char name[TSDB_TABLE_FNAME_LEN];
|
||||
uint64_t tuid;
|
||||
int64_t tuid;
|
||||
int32_t sverson;
|
||||
int32_t execLen;
|
||||
char* executor;
|
||||
|
@ -1279,11 +1279,11 @@ typedef struct {
|
|||
typedef struct {
|
||||
SMsgHead head;
|
||||
char name[TSDB_TABLE_FNAME_LEN];
|
||||
uint64_t tuid;
|
||||
int64_t tuid;
|
||||
} SDDropTopicReq;
|
||||
|
||||
typedef struct SVCreateTbReq {
|
||||
uint64_t ver; // use a general definition
|
||||
int64_t ver; // use a general definition
|
||||
char* name;
|
||||
uint32_t ttl;
|
||||
uint32_t keep;
|
||||
|
@ -1314,8 +1314,8 @@ int32_t tSerializeSVCreateTbReq(void** buf, SVCreateTbReq* pReq);
|
|||
void* tDeserializeSVCreateTbReq(void* buf, SVCreateTbReq* pReq);
|
||||
|
||||
typedef struct {
|
||||
uint64_t ver; // use a general definition
|
||||
SArray* pArray;
|
||||
int64_t ver; // use a general definition
|
||||
SArray* pArray;
|
||||
} SVCreateTbBatchReq;
|
||||
|
||||
typedef struct {
|
||||
|
@ -1325,7 +1325,7 @@ int32_t tSerializeSVCreateTbBatchReq(void** buf, SVCreateTbBatchReq* pReq);
|
|||
void* tDeserializeSVCreateTbBatchReq(void* buf, SVCreateTbBatchReq* pReq);
|
||||
|
||||
typedef struct {
|
||||
uint64_t ver;
|
||||
int64_t ver;
|
||||
char* name;
|
||||
uint8_t type;
|
||||
tb_uid_t suid;
|
||||
|
@ -1760,35 +1760,19 @@ typedef struct {
|
|||
char cgroup[TSDB_CONSUMER_GROUP_LEN];
|
||||
} SMqOffset;
|
||||
|
||||
typedef struct {
|
||||
int32_t vgId;
|
||||
SArray* offsets; // SArray<SMqOffset>
|
||||
} SMqVgOffsets;
|
||||
|
||||
typedef struct {
|
||||
int32_t num;
|
||||
SMqOffset* offsets;
|
||||
} SMqCMResetOffsetReq;
|
||||
} SMqCMCommitOffsetReq;
|
||||
|
||||
typedef struct {
|
||||
int32_t reserved;
|
||||
} SMqCMResetOffsetRsp;
|
||||
|
||||
typedef struct {
|
||||
int64_t leftForVer;
|
||||
SMqVgOffsets offsets;
|
||||
} SMqMVResetOffsetReq;
|
||||
|
||||
typedef struct {
|
||||
int32_t reserved;
|
||||
} SMqMVResetOffsetRsp;
|
||||
} SMqCMCommitOffsetRsp;
|
||||
|
||||
int32_t tEncodeSMqOffset(SCoder* encoder, const SMqOffset* pOffset);
|
||||
int32_t tDecodeSMqOffset(SCoder* decoder, SMqOffset* pOffset);
|
||||
int32_t tEncodeSMqCMResetOffsetReq(SCoder* encoder, const SMqCMResetOffsetReq* pReq);
|
||||
int32_t tDecodeSMqCMResetOffsetReq(SCoder* decoder, SMqCMResetOffsetReq* pReq);
|
||||
int32_t tEncodeSMqMVResetOffsetReq(SCoder* encoder, const SMqMVResetOffsetReq* pReq);
|
||||
int32_t tDecodeSMqMVResetOffsetReq(SCoder* decoder, SMqMVResetOffsetReq* pReq);
|
||||
int32_t tEncodeSMqCMCommitOffsetReq(SCoder* encoder, const SMqCMCommitOffsetReq* pReq);
|
||||
int32_t tDecodeSMqCMCommitOffsetReq(SCoder* decoder, SMqCMCommitOffsetReq* pReq);
|
||||
|
||||
typedef struct {
|
||||
uint32_t nCols;
|
||||
|
@ -1870,7 +1854,6 @@ typedef struct {
|
|||
typedef struct {
|
||||
int64_t consumerId;
|
||||
SSchemaWrapper* schemas;
|
||||
int64_t committedOffset;
|
||||
int64_t reqOffset;
|
||||
int64_t rspOffset;
|
||||
int32_t skipLogNum;
|
||||
|
@ -1881,22 +1864,18 @@ typedef struct {
|
|||
// one req for one vg+topic
|
||||
typedef struct {
|
||||
SMsgHead head;
|
||||
// 0: commit only, current offset
|
||||
// 1: consume only, poll next offset
|
||||
// 2: commit current and consume next offset
|
||||
int32_t reqType;
|
||||
|
||||
int64_t reqId;
|
||||
int64_t consumerId;
|
||||
int64_t blockingTime;
|
||||
char cgroup[TSDB_CONSUMER_GROUP_LEN];
|
||||
|
||||
int64_t offset;
|
||||
int64_t currentOffset;
|
||||
char topic[TSDB_TOPIC_FNAME_LEN];
|
||||
} SMqConsumeReq;
|
||||
|
||||
typedef struct {
|
||||
int32_t vgId;
|
||||
int64_t offset;
|
||||
SEpSet epSet;
|
||||
} SMqSubVgEp;
|
||||
|
||||
|
@ -1917,12 +1896,14 @@ static FORCE_INLINE void tDeleteSMqSubTopicEp(SMqSubTopicEp* pSubTopicEp) { taos
|
|||
static FORCE_INLINE int32_t tEncodeSMqSubVgEp(void** buf, const SMqSubVgEp* pVgEp) {
|
||||
int32_t tlen = 0;
|
||||
tlen += taosEncodeFixedI32(buf, pVgEp->vgId);
|
||||
tlen += taosEncodeFixedI64(buf, pVgEp->offset);
|
||||
tlen += taosEncodeSEpSet(buf, &pVgEp->epSet);
|
||||
return tlen;
|
||||
}
|
||||
|
||||
static FORCE_INLINE void* tDecodeSMqSubVgEp(void* buf, SMqSubVgEp* pVgEp) {
|
||||
buf = taosDecodeFixedI32(buf, &pVgEp->vgId);
|
||||
buf = taosDecodeFixedI64(buf, &pVgEp->offset);
|
||||
buf = taosDecodeSEpSet(buf, &pVgEp->epSet);
|
||||
return buf;
|
||||
}
|
||||
|
|
|
@ -143,10 +143,10 @@ enum {
|
|||
TD_DEF_MSG_TYPE(TDMT_MND_ALTER_TOPIC, "mnode-alter-topic", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_DROP_TOPIC, "mnode-drop-topic", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_SUBSCRIBE, "mnode-subscribe", SCMSubscribeReq, SCMSubscribeRsp)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_RESET_OFFSET, "mnode-reset-offset", SMqCMResetOffsetReq, SMqCMResetOffsetRsp)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_GET_SUB_EP, "mnode-get-sub-ep", SMqCMGetSubEpReq, SMqCMGetSubEpRsp)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_MQ_TIMER, "mnode-mq-timer", SMTimerReq, SMTimerReq)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_MQ_DO_REBALANCE, "mnode-mq-do-rebalance", SMqDoRebalanceMsg, SMqDoRebalanceMsg)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_MQ_COMMIT_OFFSET, "mnode-mq-commit-offset", SMqCMCommitOffsetReq, SMqCMCommitOffsetRsp)
|
||||
|
||||
// Requests handled by VNODE
|
||||
TD_NEW_MSG_SEG(TDMT_VND_MSG)
|
||||
|
@ -194,4 +194,3 @@ enum {
|
|||
TDMT_MAX
|
||||
#endif
|
||||
};
|
||||
// clang-format on
|
||||
|
|
|
@ -113,14 +113,15 @@ typedef enum {
|
|||
SDB_USER = 7,
|
||||
SDB_AUTH = 8,
|
||||
SDB_ACCT = 9,
|
||||
SDB_SUBSCRIBE = 10,
|
||||
SDB_CONSUMER = 11,
|
||||
SDB_TOPIC = 12,
|
||||
SDB_VGROUP = 13,
|
||||
SDB_STB = 14,
|
||||
SDB_DB = 15,
|
||||
SDB_FUNC = 16,
|
||||
SDB_MAX = 17
|
||||
SDB_OFFSET = 10,
|
||||
SDB_SUBSCRIBE = 11,
|
||||
SDB_CONSUMER = 12,
|
||||
SDB_TOPIC = 13,
|
||||
SDB_VGROUP = 14,
|
||||
SDB_STB = 15,
|
||||
SDB_DB = 16,
|
||||
SDB_FUNC = 17,
|
||||
SDB_MAX = 18
|
||||
} ESdbType;
|
||||
|
||||
typedef struct SSdb SSdb;
|
||||
|
|
|
@ -20,6 +20,8 @@
|
|||
extern "C" {
|
||||
#endif
|
||||
|
||||
// clang-format off
|
||||
|
||||
#define TAOS_DEF_ERROR_CODE(mod, code) ((int32_t)((0x80000000 | ((mod)<<16) | (code))))
|
||||
|
||||
#define TAOS_SYSTEM_ERROR(code) (0x80ff0000 | (code))
|
||||
|
@ -260,6 +262,7 @@ int32_t* taosGetErrno();
|
|||
#define TSDB_CODE_MND_CONSUMER_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x03E7)
|
||||
#define TSDB_CODE_MND_UNSUPPORTED_TOPIC TAOS_DEF_ERROR_CODE(0, 0x03E8)
|
||||
#define TSDB_CODE_MND_SUBSCRIBE_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x03E9)
|
||||
#define TSDB_CODE_MND_OFFSET_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x03EA)
|
||||
#define TSDB_CODE_MND_MQ_PLACEHOLDER TAOS_DEF_ERROR_CODE(0, 0x03F0)
|
||||
|
||||
// dnode
|
||||
|
|
|
@ -13,6 +13,8 @@
|
|||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
// clang-format off
|
||||
|
||||
#ifndef _TD_UTIL_DEF_H
|
||||
#define _TD_UTIL_DEF_H
|
||||
|
||||
|
@ -196,6 +198,7 @@ typedef enum ELogicConditionType {
|
|||
#define TSDB_TOPIC_FNAME_LEN TSDB_TABLE_FNAME_LEN
|
||||
#define TSDB_CONSUMER_GROUP_LEN 192
|
||||
#define TSDB_SUBSCRIBE_KEY_LEN (TSDB_CONSUMER_GROUP_LEN + TSDB_TOPIC_FNAME_LEN + 2)
|
||||
#define TSDB_PARTITION_KEY_LEN (TSDB_CONSUMER_GROUP_LEN + TSDB_TOPIC_FNAME_LEN + 2)
|
||||
#define TSDB_COL_NAME_LEN 65
|
||||
#define TSDB_MAX_SAVED_SQL_LEN TSDB_MAX_COLUMNS * 64
|
||||
#define TSDB_MAX_SQL_LEN TSDB_PAYLOAD_SIZE
|
||||
|
@ -349,9 +352,6 @@ typedef enum ELogicConditionType {
|
|||
#define TSDB_QUERY_TYPE_NON_TYPE 0x00u // none type
|
||||
#define TSDB_QUERY_TYPE_FREE_RESOURCE 0x01u // free qhandle at vnode
|
||||
|
||||
#define TSDB_QUERY_TYPE_NON_TYPE 0x00u // none type
|
||||
#define TSDB_QUERY_TYPE_FREE_RESOURCE 0x01u // free qhandle at vnode
|
||||
|
||||
#define TSDB_META_COMPACT_RATIO 0 // disable tsdb meta compact by default
|
||||
|
||||
|
||||
|
|
|
@ -45,22 +45,24 @@ struct tmq_topic_vgroup_list_t {
|
|||
};
|
||||
|
||||
struct tmq_conf_t {
|
||||
char clientId[256];
|
||||
char groupId[256];
|
||||
bool auto_commit;
|
||||
char clientId[256];
|
||||
char groupId[256];
|
||||
int8_t auto_commit;
|
||||
int8_t resetOffset;
|
||||
tmq_commit_cb* commit_cb;
|
||||
/*char* ip;*/
|
||||
/*uint16_t port;*/
|
||||
tmq_commit_cb* commit_cb;
|
||||
};
|
||||
|
||||
struct tmq_t {
|
||||
// conf
|
||||
char groupId[256];
|
||||
char clientId[256];
|
||||
bool autoCommit;
|
||||
int8_t autoCommit;
|
||||
SRWLatch lock;
|
||||
int64_t consumerId;
|
||||
int32_t epoch;
|
||||
int32_t resetOffsetCfg;
|
||||
int64_t status;
|
||||
tsem_t rspSem;
|
||||
STscObj* pTscObj;
|
||||
|
@ -79,7 +81,6 @@ typedef struct {
|
|||
// statistics
|
||||
int64_t pollCnt;
|
||||
// offset
|
||||
int64_t committedOffset;
|
||||
int64_t currentOffset;
|
||||
// connection info
|
||||
int32_t vgId;
|
||||
|
@ -115,21 +116,17 @@ typedef struct {
|
|||
} SMqConsumeCbParam;
|
||||
|
||||
typedef struct {
|
||||
tmq_t* tmq;
|
||||
SMqClientVg* pVg;
|
||||
int32_t async;
|
||||
tsem_t rspSem;
|
||||
} SMqCommitCbParam;
|
||||
|
||||
typedef struct {
|
||||
tmq_t* tmq;
|
||||
tmq_t* tmq;
|
||||
/*SMqClientVg* pVg;*/
|
||||
int32_t async;
|
||||
tsem_t rspSem;
|
||||
tmq_resp_err_t rspErr;
|
||||
} SMqResetOffsetParam;
|
||||
} SMqCommitCbParam;
|
||||
|
||||
tmq_conf_t* tmq_conf_new() {
|
||||
tmq_conf_t* conf = calloc(1, sizeof(tmq_conf_t));
|
||||
conf->auto_commit = false;
|
||||
conf->resetOffset = TMQ_CONF__RESET_OFFSET__LATEST;
|
||||
return conf;
|
||||
}
|
||||
|
||||
|
@ -157,6 +154,20 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value
|
|||
return TMQ_CONF_INVALID;
|
||||
}
|
||||
}
|
||||
if (strcmp(key, "auto.offset.reset") == 0) {
|
||||
if (strcmp(value, "none") == 0) {
|
||||
conf->resetOffset = TMQ_CONF__RESET_OFFSET__NONE;
|
||||
return TMQ_CONF_OK;
|
||||
} else if (strcmp(value, "earliest") == 0) {
|
||||
conf->resetOffset = TMQ_CONF__RESET_OFFSET__EARLIEAST;
|
||||
return TMQ_CONF_OK;
|
||||
} else if (strcmp(value, "latest") == 0) {
|
||||
conf->resetOffset = TMQ_CONF__RESET_OFFSET__LATEST;
|
||||
return TMQ_CONF_OK;
|
||||
} else {
|
||||
return TMQ_CONF_INVALID;
|
||||
}
|
||||
}
|
||||
return TMQ_CONF_UNKNOWN;
|
||||
}
|
||||
|
||||
|
@ -190,14 +201,12 @@ int32_t tmqCommitCb(void* param, const SDataBuf* pMsg, int32_t code) {
|
|||
if (pParam->tmq->commit_cb) {
|
||||
pParam->tmq->commit_cb(pParam->tmq, rspErr, NULL, NULL);
|
||||
}
|
||||
if (!pParam->async) tsem_post(&pParam->rspSem);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t tmqResetOffsetCb(void* param, const SDataBuf* pMsg, int32_t code) {
|
||||
SMqResetOffsetParam* pParam = (SMqResetOffsetParam*)param;
|
||||
pParam->rspErr = code;
|
||||
tsem_post(&pParam->rspSem);
|
||||
if (!pParam->async)
|
||||
tsem_post(&pParam->rspSem);
|
||||
else {
|
||||
tsem_destroy(&pParam->rspSem);
|
||||
free(param);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -216,6 +225,7 @@ tmq_t* tmq_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errs
|
|||
strcpy(pTmq->groupId, conf->groupId);
|
||||
pTmq->autoCommit = conf->auto_commit;
|
||||
pTmq->commit_cb = conf->commit_cb;
|
||||
pTmq->resetOffsetCfg = conf->resetOffset;
|
||||
|
||||
tsem_init(&pTmq->rspSem, 0, 0);
|
||||
pTmq->consumerId = generateRequestId() & (((uint64_t)-1) >> 1);
|
||||
|
@ -223,18 +233,40 @@ tmq_t* tmq_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errs
|
|||
return pTmq;
|
||||
}
|
||||
|
||||
tmq_resp_err_t tmq_reset_offset(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets) {
|
||||
SRequestObj* pRequest = NULL;
|
||||
tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, int32_t async) {
|
||||
// TODO: add read write lock
|
||||
SRequestObj* pRequest = NULL;
|
||||
tmq_resp_err_t resp = TMQ_RESP_ERR__SUCCESS;
|
||||
// build msg
|
||||
// send to mnode
|
||||
SMqCMResetOffsetReq req;
|
||||
req.num = offsets->cnt;
|
||||
req.offsets = (SMqOffset*)offsets->elems;
|
||||
SMqCMCommitOffsetReq req;
|
||||
SArray* pArray = NULL;
|
||||
|
||||
if (offsets == NULL) {
|
||||
pArray = taosArrayInit(0, sizeof(SMqOffset));
|
||||
for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
|
||||
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
|
||||
for (int j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
|
||||
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
|
||||
SMqOffset offset;
|
||||
strcpy(offset.topicName, pTopic->topicName);
|
||||
strcpy(offset.cgroup, tmq->groupId);
|
||||
offset.vgId = pVg->vgId;
|
||||
offset.offset = pVg->currentOffset;
|
||||
taosArrayPush(pArray, &offset);
|
||||
}
|
||||
}
|
||||
req.num = pArray->size;
|
||||
req.offsets = pArray->pData;
|
||||
} else {
|
||||
req.num = offsets->cnt;
|
||||
req.offsets = (SMqOffset*)offsets->elems;
|
||||
}
|
||||
|
||||
SCoder encoder;
|
||||
|
||||
tCoderInit(&encoder, TD_LITTLE_ENDIAN, NULL, 0, TD_ENCODER);
|
||||
tEncodeSMqCMResetOffsetReq(&encoder, &req);
|
||||
tEncodeSMqCMCommitOffsetReq(&encoder, &req);
|
||||
int32_t tlen = encoder.pos;
|
||||
void* buf = malloc(tlen);
|
||||
if (buf == NULL) {
|
||||
|
@ -244,32 +276,41 @@ tmq_resp_err_t tmq_reset_offset(tmq_t* tmq, const tmq_topic_vgroup_list_t* offse
|
|||
tCoderClear(&encoder);
|
||||
|
||||
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, tlen, TD_ENCODER);
|
||||
tEncodeSMqCMResetOffsetReq(&encoder, &req);
|
||||
tEncodeSMqCMCommitOffsetReq(&encoder, &req);
|
||||
tCoderClear(&encoder);
|
||||
|
||||
pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_MND_RESET_OFFSET);
|
||||
pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_MND_MQ_COMMIT_OFFSET);
|
||||
if (pRequest == NULL) {
|
||||
tscError("failed to malloc request");
|
||||
}
|
||||
|
||||
SMqResetOffsetParam param = {0};
|
||||
tsem_init(¶m.rspSem, 0, 0);
|
||||
param.tmq = tmq;
|
||||
SMqCommitCbParam* pParam = malloc(sizeof(SMqCommitCbParam));
|
||||
if (pParam == NULL) {
|
||||
return -1;
|
||||
}
|
||||
pParam->tmq = tmq;
|
||||
tsem_init(&pParam->rspSem, 0, 0);
|
||||
|
||||
pRequest->body.requestMsg = (SDataBuf){.pData = buf, .len = tlen};
|
||||
|
||||
SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest);
|
||||
sendInfo->param = ¶m;
|
||||
sendInfo->fp = tmqResetOffsetCb;
|
||||
sendInfo->param = pParam;
|
||||
sendInfo->fp = tmqCommitCb;
|
||||
SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);
|
||||
|
||||
int64_t transporterId = 0;
|
||||
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
|
||||
|
||||
tsem_wait(¶m.rspSem);
|
||||
tsem_destroy(¶m.rspSem);
|
||||
if (!async) {
|
||||
tsem_wait(&pParam->rspSem);
|
||||
resp = pParam->rspErr;
|
||||
}
|
||||
|
||||
return param.rspErr;
|
||||
if (pArray) {
|
||||
taosArrayDestroy(pArray);
|
||||
}
|
||||
|
||||
return resp;
|
||||
}
|
||||
|
||||
tmq_resp_err_t tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) {
|
||||
|
@ -641,8 +682,7 @@ int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) {
|
|||
// clang-format off
|
||||
SMqClientVg clientVg = {
|
||||
.pollCnt = 0,
|
||||
.committedOffset = -1,
|
||||
.currentOffset = -1,
|
||||
.currentOffset = pVgEp->offset,
|
||||
.vgId = pVgEp->vgId,
|
||||
.epSet = pVgEp->epSet
|
||||
};
|
||||
|
@ -708,23 +748,51 @@ END:
|
|||
return 0;
|
||||
}
|
||||
|
||||
SMqConsumeReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t blocking_time, int32_t type, SMqClientTopic* pTopic,
|
||||
SMqClientVg* pVg) {
|
||||
tmq_resp_err_t tmq_seek(tmq_t* tmq, const tmq_topic_vgroup_t* offset) {
|
||||
const SMqOffset* pOffset = &offset->offset;
|
||||
if (strcmp(pOffset->cgroup, tmq->groupId) != 0) {
|
||||
return TMQ_RESP_ERR__FAIL;
|
||||
}
|
||||
int32_t sz = taosArrayGetSize(tmq->clientTopics);
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
SMqClientTopic* clientTopic = taosArrayGet(tmq->clientTopics, i);
|
||||
if (strcmp(clientTopic->topicName, pOffset->topicName) == 0) {
|
||||
int32_t vgSz = taosArrayGetSize(clientTopic->vgs);
|
||||
for (int32_t j = 0; j < vgSz; j++) {
|
||||
SMqClientVg* pVg = taosArrayGet(clientTopic->vgs, j);
|
||||
if (pVg->vgId == pOffset->vgId) {
|
||||
pVg->currentOffset = pOffset->offset;
|
||||
return TMQ_RESP_ERR__SUCCESS;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return TMQ_RESP_ERR__FAIL;
|
||||
}
|
||||
|
||||
SMqConsumeReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t blocking_time, SMqClientTopic* pTopic, SMqClientVg* pVg) {
|
||||
int64_t reqOffset;
|
||||
if (pVg->currentOffset >= 0) {
|
||||
reqOffset = pVg->currentOffset;
|
||||
} else {
|
||||
if (tmq->resetOffsetCfg == TMQ_CONF__RESET_OFFSET__NONE) {
|
||||
tscError("unable to poll since no committed offset but reset offset is set to none");
|
||||
return NULL;
|
||||
}
|
||||
reqOffset = tmq->resetOffsetCfg;
|
||||
}
|
||||
|
||||
SMqConsumeReq* pReq = malloc(sizeof(SMqConsumeReq));
|
||||
if (pReq == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
pReq->reqType = type;
|
||||
|
||||
strcpy(pReq->topic, pTopic->topicName);
|
||||
pReq->blockingTime = blocking_time;
|
||||
pReq->consumerId = tmq->consumerId;
|
||||
strcpy(pReq->cgroup, tmq->groupId);
|
||||
|
||||
if (type == TMQ_REQ_TYPE_COMMIT_ONLY) {
|
||||
pReq->offset = pVg->currentOffset;
|
||||
} else {
|
||||
pReq->offset = pVg->currentOffset + 1;
|
||||
}
|
||||
pReq->blockingTime = blocking_time;
|
||||
pReq->consumerId = tmq->consumerId;
|
||||
pReq->currentOffset = reqOffset;
|
||||
|
||||
pReq->head.vgId = htonl(pVg->vgId);
|
||||
pReq->head.contLen = htonl(sizeof(SMqConsumeReq));
|
||||
|
@ -743,13 +811,13 @@ tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) {
|
|||
|
||||
if (taosArrayGetSize(tmq->clientTopics) == 0) {
|
||||
tscDebug("consumer:%ld poll but not assigned", tmq->consumerId);
|
||||
printf("over1\n");
|
||||
/*printf("over1\n");*/
|
||||
usleep(blocking_time * 1000);
|
||||
return NULL;
|
||||
}
|
||||
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, tmq->nextTopicIdx);
|
||||
if (taosArrayGetSize(pTopic->vgs) == 0) {
|
||||
printf("over2\n");
|
||||
/*printf("over2\n");*/
|
||||
usleep(blocking_time * 1000);
|
||||
return NULL;
|
||||
}
|
||||
|
@ -760,8 +828,7 @@ tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) {
|
|||
pTopic->nextVgIdx = (pTopic->nextVgIdx + 1) % taosArrayGetSize(pTopic->vgs);
|
||||
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, pTopic->nextVgIdx);
|
||||
/*printf("consume vg %d, offset %ld\n", pVg->vgId, pVg->currentOffset);*/
|
||||
int32_t reqType = tmq->autoCommit ? TMQ_REQ_TYPE_CONSUME_AND_COMMIT : TMQ_REQ_TYPE_CONSUME_ONLY;
|
||||
SMqConsumeReq* pReq = tmqBuildConsumeReqImpl(tmq, blocking_time, reqType, pTopic, pVg);
|
||||
SMqConsumeReq* pReq = tmqBuildConsumeReqImpl(tmq, blocking_time, pTopic, pVg);
|
||||
if (pReq == NULL) {
|
||||
ASSERT(false);
|
||||
usleep(blocking_time * 1000);
|
||||
|
@ -821,6 +888,7 @@ tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) {
|
|||
/*return pRequest;*/
|
||||
}
|
||||
|
||||
#if 0
|
||||
tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* tmq_topic_vgroup_list, int32_t async) {
|
||||
if (tmq_topic_vgroup_list != NULL) {
|
||||
// TODO
|
||||
|
@ -831,7 +899,7 @@ tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* tmq_topic_v
|
|||
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
|
||||
for (int j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
|
||||
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
|
||||
SMqConsumeReq* pReq = tmqBuildConsumeReqImpl(tmq, 0, TMQ_REQ_TYPE_COMMIT_ONLY, pTopic, pVg);
|
||||
SMqConsumeReq* pReq = tmqBuildConsumeReqImpl(tmq, 0, pTopic, pVg);
|
||||
|
||||
SRequestObj* pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_VND_CONSUME);
|
||||
pRequest->body.requestMsg = (SDataBuf){.pData = pReq, .len = sizeof(SMqConsumeReq)};
|
||||
|
@ -858,6 +926,7 @@ tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* tmq_topic_v
|
|||
|
||||
return 0;
|
||||
}
|
||||
#endif
|
||||
|
||||
void tmq_message_destroy(tmq_message_t* tmq_message) {
|
||||
if (tmq_message == NULL) return;
|
||||
|
|
|
@ -285,7 +285,7 @@ int32_t tDeserializeSClientHbBatchRsp(void *buf, int32_t bufLen, SClientHbBatchR
|
|||
int32_t tSerializeSVCreateTbReq(void **buf, SVCreateTbReq *pReq) {
|
||||
int32_t tlen = 0;
|
||||
|
||||
tlen += taosEncodeFixedU64(buf, pReq->ver);
|
||||
tlen += taosEncodeFixedI64(buf, pReq->ver);
|
||||
tlen += taosEncodeString(buf, pReq->name);
|
||||
tlen += taosEncodeFixedU32(buf, pReq->ttl);
|
||||
tlen += taosEncodeFixedU32(buf, pReq->keep);
|
||||
|
@ -330,7 +330,7 @@ int32_t tSerializeSVCreateTbReq(void **buf, SVCreateTbReq *pReq) {
|
|||
}
|
||||
|
||||
void *tDeserializeSVCreateTbReq(void *buf, SVCreateTbReq *pReq) {
|
||||
buf = taosDecodeFixedU64(buf, &(pReq->ver));
|
||||
buf = taosDecodeFixedI64(buf, &(pReq->ver));
|
||||
buf = taosDecodeString(buf, &(pReq->name));
|
||||
buf = taosDecodeFixedU32(buf, &(pReq->ttl));
|
||||
buf = taosDecodeFixedU32(buf, &(pReq->keep));
|
||||
|
@ -380,7 +380,7 @@ void *tDeserializeSVCreateTbReq(void *buf, SVCreateTbReq *pReq) {
|
|||
int32_t tSerializeSVCreateTbBatchReq(void **buf, SVCreateTbBatchReq *pReq) {
|
||||
int32_t tlen = 0;
|
||||
|
||||
tlen += taosEncodeFixedU64(buf, pReq->ver);
|
||||
tlen += taosEncodeFixedI64(buf, pReq->ver);
|
||||
tlen += taosEncodeFixedU32(buf, taosArrayGetSize(pReq->pArray));
|
||||
for (size_t i = 0; i < taosArrayGetSize(pReq->pArray); i++) {
|
||||
SVCreateTbReq *pCreateTbReq = taosArrayGet(pReq->pArray, i);
|
||||
|
@ -393,7 +393,7 @@ int32_t tSerializeSVCreateTbBatchReq(void **buf, SVCreateTbBatchReq *pReq) {
|
|||
void *tDeserializeSVCreateTbBatchReq(void *buf, SVCreateTbBatchReq *pReq) {
|
||||
uint32_t nsize = 0;
|
||||
|
||||
buf = taosDecodeFixedU64(buf, &pReq->ver);
|
||||
buf = taosDecodeFixedI64(buf, &pReq->ver);
|
||||
buf = taosDecodeFixedU32(buf, &nsize);
|
||||
pReq->pArray = taosArrayInit(nsize, sizeof(SVCreateTbReq));
|
||||
for (size_t i = 0; i < nsize; i++) {
|
||||
|
@ -407,14 +407,14 @@ void *tDeserializeSVCreateTbBatchReq(void *buf, SVCreateTbBatchReq *pReq) {
|
|||
|
||||
int32_t tSerializeSVDropTbReq(void **buf, SVDropTbReq *pReq) {
|
||||
int32_t tlen = 0;
|
||||
tlen += taosEncodeFixedU64(buf, pReq->ver);
|
||||
tlen += taosEncodeFixedI64(buf, pReq->ver);
|
||||
tlen += taosEncodeString(buf, pReq->name);
|
||||
tlen += taosEncodeFixedU8(buf, pReq->type);
|
||||
return tlen;
|
||||
}
|
||||
|
||||
void *tDeserializeSVDropTbReq(void *buf, SVDropTbReq *pReq) {
|
||||
buf = taosDecodeFixedU64(buf, &pReq->ver);
|
||||
buf = taosDecodeFixedI64(buf, &pReq->ver);
|
||||
buf = taosDecodeString(buf, &pReq->name);
|
||||
buf = taosDecodeFixedU8(buf, &pReq->type);
|
||||
return buf;
|
||||
|
@ -1393,7 +1393,7 @@ int32_t tSerializeSDropDbRsp(void *buf, int32_t bufLen, SDropDbRsp *pRsp) {
|
|||
|
||||
if (tStartEncode(&encoder) < 0) return -1;
|
||||
if (tEncodeCStr(&encoder, pRsp->db) < 0) return -1;
|
||||
if (tEncodeU64(&encoder, pRsp->uid) < 0) return -1;
|
||||
if (tEncodeI64(&encoder, pRsp->uid) < 0) return -1;
|
||||
tEndEncode(&encoder);
|
||||
|
||||
int32_t tlen = encoder.pos;
|
||||
|
@ -1407,7 +1407,7 @@ int32_t tDeserializeSDropDbRsp(void *buf, int32_t bufLen, SDropDbRsp *pRsp) {
|
|||
|
||||
if (tStartDecode(&decoder) < 0) return -1;
|
||||
if (tDecodeCStrTo(&decoder, pRsp->db) < 0) return -1;
|
||||
if (tDecodeU64(&decoder, &pRsp->uid) < 0) return -1;
|
||||
if (tDecodeI64(&decoder, &pRsp->uid) < 0) return -1;
|
||||
tEndDecode(&decoder);
|
||||
|
||||
tCoderClear(&decoder);
|
||||
|
@ -1468,7 +1468,7 @@ int32_t tDeserializeSSyncDbReq(void *buf, int32_t bufLen, SSyncDbReq *pReq) {
|
|||
|
||||
static int32_t tSerializeSUseDbRspImp(SCoder *pEncoder, SUseDbRsp *pRsp) {
|
||||
if (tEncodeCStr(pEncoder, pRsp->db) < 0) return -1;
|
||||
if (tEncodeU64(pEncoder, pRsp->uid) < 0) return -1;
|
||||
if (tEncodeI64(pEncoder, pRsp->uid) < 0) return -1;
|
||||
if (tEncodeI32(pEncoder, pRsp->vgVersion) < 0) return -1;
|
||||
if (tEncodeI32(pEncoder, pRsp->vgNum) < 0) return -1;
|
||||
if (tEncodeI8(pEncoder, pRsp->hashMethod) < 0) return -1;
|
||||
|
@ -1518,7 +1518,7 @@ int32_t tSerializeSUseDbBatchRsp(void *buf, int32_t bufLen, SUseDbBatchRsp *pRsp
|
|||
|
||||
int32_t tDeserializeSUseDbRspImp(SCoder *pDecoder, SUseDbRsp *pRsp) {
|
||||
if (tDecodeCStrTo(pDecoder, pRsp->db) < 0) return -1;
|
||||
if (tDecodeU64(pDecoder, &pRsp->uid) < 0) return -1;
|
||||
if (tDecodeI64(pDecoder, &pRsp->uid) < 0) return -1;
|
||||
if (tDecodeI32(pDecoder, &pRsp->vgVersion) < 0) return -1;
|
||||
if (tDecodeI32(pDecoder, &pRsp->vgNum) < 0) return -1;
|
||||
if (tDecodeI8(pDecoder, &pRsp->hashMethod) < 0) return -1;
|
||||
|
@ -1661,7 +1661,7 @@ static int32_t tEncodeSTableMetaRsp(SCoder *pEncoder, STableMetaRsp *pRsp) {
|
|||
if (tEncodeCStr(pEncoder, pRsp->tbName) < 0) return -1;
|
||||
if (tEncodeCStr(pEncoder, pRsp->stbName) < 0) return -1;
|
||||
if (tEncodeCStr(pEncoder, pRsp->dbFName) < 0) return -1;
|
||||
if (tEncodeU64(pEncoder, pRsp->dbId) < 0) return -1;
|
||||
if (tEncodeI64(pEncoder, pRsp->dbId) < 0) return -1;
|
||||
if (tEncodeI32(pEncoder, pRsp->numOfTags) < 0) return -1;
|
||||
if (tEncodeI32(pEncoder, pRsp->numOfColumns) < 0) return -1;
|
||||
if (tEncodeI8(pEncoder, pRsp->precision) < 0) return -1;
|
||||
|
@ -1684,7 +1684,7 @@ static int32_t tDecodeSTableMetaRsp(SCoder *pDecoder, STableMetaRsp *pRsp) {
|
|||
if (tDecodeCStrTo(pDecoder, pRsp->tbName) < 0) return -1;
|
||||
if (tDecodeCStrTo(pDecoder, pRsp->stbName) < 0) return -1;
|
||||
if (tDecodeCStrTo(pDecoder, pRsp->dbFName) < 0) return -1;
|
||||
if (tDecodeU64(pDecoder, &pRsp->dbId) < 0) return -1;
|
||||
if (tDecodeI64(pDecoder, &pRsp->dbId) < 0) return -1;
|
||||
if (tDecodeI32(pDecoder, &pRsp->numOfTags) < 0) return -1;
|
||||
if (tDecodeI32(pDecoder, &pRsp->numOfColumns) < 0) return -1;
|
||||
if (tDecodeI8(pDecoder, &pRsp->precision) < 0) return -1;
|
||||
|
@ -2093,7 +2093,7 @@ int32_t tSerializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq *pR
|
|||
if (tEncodeI32(&encoder, pReq->vgId) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, pReq->dnodeId) < 0) return -1;
|
||||
if (tEncodeCStr(&encoder, pReq->db) < 0) return -1;
|
||||
if (tEncodeU64(&encoder, pReq->dbUid) < 0) return -1;
|
||||
if (tEncodeI64(&encoder, pReq->dbUid) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, pReq->vgVersion) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, pReq->cacheBlockSize) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, pReq->totalBlocks) < 0) return -1;
|
||||
|
@ -2133,7 +2133,7 @@ int32_t tDeserializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq *
|
|||
if (tDecodeI32(&decoder, &pReq->vgId) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &pReq->dnodeId) < 0) return -1;
|
||||
if (tDecodeCStrTo(&decoder, pReq->db) < 0) return -1;
|
||||
if (tDecodeU64(&decoder, &pReq->dbUid) < 0) return -1;
|
||||
if (tDecodeI64(&decoder, &pReq->dbUid) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &pReq->vgVersion) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &pReq->cacheBlockSize) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &pReq->totalBlocks) < 0) return -1;
|
||||
|
@ -2171,7 +2171,7 @@ int32_t tSerializeSDropVnodeReq(void *buf, int32_t bufLen, SDropVnodeReq *pReq)
|
|||
if (tStartEncode(&encoder) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, pReq->vgId) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, pReq->dnodeId) < 0) return -1;
|
||||
if (tEncodeU64(&encoder, pReq->dbUid) < 0) return -1;
|
||||
if (tEncodeI64(&encoder, pReq->dbUid) < 0) return -1;
|
||||
if (tEncodeCStr(&encoder, pReq->db) < 0) return -1;
|
||||
tEndEncode(&encoder);
|
||||
|
||||
|
@ -2187,7 +2187,7 @@ int32_t tDeserializeSDropVnodeReq(void *buf, int32_t bufLen, SDropVnodeReq *pReq
|
|||
if (tStartDecode(&decoder) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &pReq->vgId) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &pReq->dnodeId) < 0) return -1;
|
||||
if (tDecodeU64(&decoder, &pReq->dbUid) < 0) return -1;
|
||||
if (tDecodeI64(&decoder, &pReq->dbUid) < 0) return -1;
|
||||
if (tDecodeCStrTo(&decoder, pReq->db) < 0) return -1;
|
||||
tEndDecode(&decoder);
|
||||
|
||||
|
@ -2356,35 +2356,7 @@ int32_t tDecodeSMqOffset(SCoder *decoder, SMqOffset *pOffset) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t tEncodeSMqVgOffsets(SCoder *encoder, const SMqVgOffsets *pOffsets) {
|
||||
if (tStartEncode(encoder) < 0) return -1;
|
||||
if (tEncodeI32(encoder, pOffsets->vgId) < 0) return -1;
|
||||
int32_t sz = taosArrayGetSize(pOffsets->offsets);
|
||||
if (tEncodeI32(encoder, sz) < 0) return -1;
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
SMqOffset *offset = taosArrayGet(pOffsets->offsets, i);
|
||||
if (tEncodeSMqOffset(encoder, offset) < 0) return -1;
|
||||
}
|
||||
tEndEncode(encoder);
|
||||
return encoder->pos;
|
||||
}
|
||||
|
||||
int32_t tDecodeSMqVgOffsets(SCoder *decoder, SMqVgOffsets *pOffsets) {
|
||||
int32_t sz;
|
||||
if (tStartDecode(decoder) < 0) return -1;
|
||||
if (tDecodeI32(decoder, &pOffsets->vgId) < 0) return -1;
|
||||
if (tDecodeI32(decoder, &sz) < 0) return -1;
|
||||
pOffsets->offsets = taosArrayInit(sz, sizeof(SMqOffset));
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
SMqOffset offset;
|
||||
if (tDecodeSMqOffset(decoder, &offset) < 0) return -1;
|
||||
taosArrayPush(pOffsets->offsets, &offset);
|
||||
}
|
||||
tEndDecode(decoder);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t tEncodeSMqCMResetOffsetReq(SCoder *encoder, const SMqCMResetOffsetReq *pReq) {
|
||||
int32_t tEncodeSMqCMCommitOffsetReq(SCoder *encoder, const SMqCMCommitOffsetReq *pReq) {
|
||||
if (tStartEncode(encoder) < 0) return -1;
|
||||
if (tEncodeI32(encoder, pReq->num) < 0) return -1;
|
||||
for (int32_t i = 0; i < pReq->num; i++) {
|
||||
|
@ -2394,7 +2366,7 @@ int32_t tEncodeSMqCMResetOffsetReq(SCoder *encoder, const SMqCMResetOffsetReq *p
|
|||
return encoder->pos;
|
||||
}
|
||||
|
||||
int32_t tDecodeSMqCMResetOffsetReq(SCoder *decoder, SMqCMResetOffsetReq *pReq) {
|
||||
int32_t tDecodeSMqCMCommitOffsetReq(SCoder *decoder, SMqCMCommitOffsetReq *pReq) {
|
||||
if (tStartDecode(decoder) < 0) return -1;
|
||||
if (tDecodeI32(decoder, &pReq->num) < 0) return -1;
|
||||
pReq->offsets = TCODER_MALLOC(pReq->num * sizeof(SMqOffset), decoder);
|
||||
|
@ -2405,23 +2377,3 @@ int32_t tDecodeSMqCMResetOffsetReq(SCoder *decoder, SMqCMResetOffsetReq *pReq) {
|
|||
tEndDecode(decoder);
|
||||
return 0;
|
||||
}
|
||||
|
||||
#if 0
|
||||
int32_t tEncodeSMqMVResetOffsetReq(SCoder *encoder, const SMqMVResetOffsetReq *pReq) {
|
||||
if (tEncodeI64(encoder, pReq->leftForVer) < 0) return -1;
|
||||
for (int32_t i = 0; i < pReq->num; i++) {
|
||||
tEncodeSMqOffset(encoder, &pReq->offsets[i]);
|
||||
}
|
||||
return encoder->pos;
|
||||
}
|
||||
|
||||
int32_t tDecodeSMqMVResetOffsetReq(SCoder *decoder, SMqMVResetOffsetReq *pReq) {
|
||||
if (tDecodeI32(decoder, &pReq->num) < 0) return -1;
|
||||
pReq->offsets = TCODER_MALLOC(pReq->num * sizeof(SMqOffset), decoder);
|
||||
if (pReq->offsets == NULL) return -1;
|
||||
for (int32_t i = 0; i < pReq->num; i++) {
|
||||
tDecodeSMqOffset(decoder, &pReq->offsets[i]);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -25,8 +25,8 @@
|
|||
#include "dndMnode.h"
|
||||
#include "dndVnodes.h"
|
||||
|
||||
#define INTERNAL_USER "_dnd"
|
||||
#define INTERNAL_CKEY "_key"
|
||||
#define INTERNAL_USER "_dnd"
|
||||
#define INTERNAL_CKEY "_key"
|
||||
#define INTERNAL_SECRET "_pwd"
|
||||
|
||||
static void dndInitMsgFp(STransMgmt *pMgmt) {
|
||||
|
@ -113,6 +113,7 @@ static void dndInitMsgFp(STransMgmt *pMgmt) {
|
|||
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_ALTER_TOPIC)] = dndProcessMnodeWriteMsg;
|
||||
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_DROP_TOPIC)] = dndProcessMnodeWriteMsg;
|
||||
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_SUBSCRIBE)] = dndProcessMnodeWriteMsg;
|
||||
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_MQ_COMMIT_OFFSET)] = dndProcessMnodeWriteMsg;
|
||||
/*pMgmt->msgFp[TMSG_INDEX(TDMT_VND_SUBSCRIBE_RSP)] = dndProcessMnodeWriteMsg;*/
|
||||
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_SET_CONN_RSP)] = dndProcessMnodeWriteMsg;
|
||||
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_REB_RSP)] = dndProcessMnodeWriteMsg;
|
||||
|
@ -155,7 +156,7 @@ static void dndInitMsgFp(STransMgmt *pMgmt) {
|
|||
}
|
||||
|
||||
static void dndProcessResponse(void *parent, SRpcMsg *pRsp, SEpSet *pEpSet) {
|
||||
SDnode * pDnode = parent;
|
||||
SDnode *pDnode = parent;
|
||||
STransMgmt *pMgmt = &pDnode->tmgmt;
|
||||
|
||||
tmsg_t msgType = pRsp->msgType;
|
||||
|
@ -219,7 +220,7 @@ static void dndCleanupClient(SDnode *pDnode) {
|
|||
}
|
||||
|
||||
static void dndProcessRequest(void *param, SRpcMsg *pReq, SEpSet *pEpSet) {
|
||||
SDnode * pDnode = param;
|
||||
SDnode *pDnode = param;
|
||||
STransMgmt *pMgmt = &pDnode->tmgmt;
|
||||
|
||||
tmsg_t msgType = pReq->msgType;
|
||||
|
@ -313,7 +314,7 @@ static int32_t dndRetrieveUserAuthInfo(void *parent, char *user, char *spi, char
|
|||
SAuthReq authReq = {0};
|
||||
tstrncpy(authReq.user, user, TSDB_USER_LEN);
|
||||
int32_t contLen = tSerializeSAuthReq(NULL, 0, &authReq);
|
||||
void * pReq = rpcMallocCont(contLen);
|
||||
void *pReq = rpcMallocCont(contLen);
|
||||
tSerializeSAuthReq(pReq, contLen, &authReq);
|
||||
|
||||
SRpcMsg rpcMsg = {.pCont = pReq, .contLen = contLen, .msgType = TDMT_MND_AUTH, .ahandle = (void *)9528};
|
||||
|
|
|
@ -123,6 +123,7 @@ typedef enum {
|
|||
TRN_TYPE_DROP_TOPIC = 1015,
|
||||
TRN_TYPE_SUBSCRIBE = 1016,
|
||||
TRN_TYPE_REBALANCE = 1017,
|
||||
TRN_TYPE_COMMIT_OFFSET = 1018,
|
||||
TRN_TYPE_BASIC_SCOPE_END,
|
||||
TRN_TYPE_GLOBAL_SCOPE = 2000,
|
||||
TRN_TYPE_CREATE_DNODE = 2001,
|
||||
|
@ -176,7 +177,7 @@ typedef struct {
|
|||
SArray* undoActions;
|
||||
int64_t createdTime;
|
||||
int64_t lastExecTime;
|
||||
uint64_t dbUid;
|
||||
int64_t dbUid;
|
||||
char dbname[TSDB_DB_FNAME_LEN];
|
||||
char lastError[TSDB_TRANS_ERROR_LEN];
|
||||
} STrans;
|
||||
|
@ -304,16 +305,16 @@ typedef struct {
|
|||
} SDbCfg;
|
||||
|
||||
typedef struct {
|
||||
char name[TSDB_DB_FNAME_LEN];
|
||||
char acct[TSDB_USER_LEN];
|
||||
char createUser[TSDB_USER_LEN];
|
||||
int64_t createdTime;
|
||||
int64_t updateTime;
|
||||
uint64_t uid;
|
||||
int32_t cfgVersion;
|
||||
int32_t vgVersion;
|
||||
int8_t hashMethod; // default is 1
|
||||
SDbCfg cfg;
|
||||
char name[TSDB_DB_FNAME_LEN];
|
||||
char acct[TSDB_USER_LEN];
|
||||
char createUser[TSDB_USER_LEN];
|
||||
int64_t createdTime;
|
||||
int64_t updateTime;
|
||||
int64_t uid;
|
||||
int32_t cfgVersion;
|
||||
int32_t vgVersion;
|
||||
int8_t hashMethod; // default is 1
|
||||
SDbCfg cfg;
|
||||
} SDbObj;
|
||||
|
||||
typedef struct {
|
||||
|
@ -346,8 +347,8 @@ typedef struct {
|
|||
char db[TSDB_DB_FNAME_LEN];
|
||||
int64_t createdTime;
|
||||
int64_t updateTime;
|
||||
uint64_t uid;
|
||||
uint64_t dbUid;
|
||||
int64_t uid;
|
||||
int64_t dbUid;
|
||||
int32_t version;
|
||||
int32_t nextColId;
|
||||
int32_t numOfColumns;
|
||||
|
@ -465,6 +466,24 @@ static FORCE_INLINE void tDeleteSMqSubConsumer(SMqSubConsumer* pSubConsumer) {
|
|||
}
|
||||
}
|
||||
|
||||
typedef struct {
|
||||
char key[TSDB_PARTITION_KEY_LEN];
|
||||
int64_t offset;
|
||||
} SMqOffsetObj;
|
||||
|
||||
static FORCE_INLINE int32_t tEncodeSMqOffsetObj(void** buf, const SMqOffsetObj* pOffset) {
|
||||
int32_t tlen = 0;
|
||||
tlen += taosEncodeString(buf, pOffset->key);
|
||||
tlen += taosEncodeFixedI64(buf, pOffset->offset);
|
||||
return tlen;
|
||||
}
|
||||
|
||||
static FORCE_INLINE void* tDecodeSMqOffsetObj(void* buf, SMqOffsetObj* pOffset) {
|
||||
buf = taosDecodeStringTo(buf, pOffset->key);
|
||||
buf = taosDecodeFixedI64(buf, &pOffset->offset);
|
||||
return buf;
|
||||
}
|
||||
|
||||
typedef struct {
|
||||
char key[TSDB_SUBSCRIBE_KEY_LEN];
|
||||
int32_t status;
|
||||
|
|
|
@ -0,0 +1,44 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#ifndef _TD_MND_OFFSET_H_
|
||||
#define _TD_MND_OFFSET_H_
|
||||
|
||||
#include "mndInt.h"
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
|
||||
int32_t mndInitOffset(SMnode *pMnode);
|
||||
void mndCleanupOffset(SMnode *pMnode);
|
||||
|
||||
SMqOffsetObj *mndAcquireOffset(SMnode *pMnode, const char *key);
|
||||
void mndReleaseOffset(SMnode *pMnode, SMqOffsetObj *pOffset);
|
||||
|
||||
SSdbRaw *mndOffsetActionEncode(SMqOffsetObj *pOffset);
|
||||
SSdbRow *mndOffsetActionDecode(SSdbRaw *pRaw);
|
||||
|
||||
int32_t mndCreateOffset(STrans *pTrans, const char *cgroup, const char *topicName, const SArray *vgs);
|
||||
|
||||
static FORCE_INLINE int32_t mndMakePartitionKey(char *key, const char *cgroup, const char *topicName, int32_t vgId) {
|
||||
return snprintf(key, TSDB_PARTITION_KEY_LEN, "%d:%s:%s", vgId, cgroup, topicName);
|
||||
}
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
||||
#endif /*_TD_MND_OFFSET_H_*/
|
|
@ -0,0 +1,224 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#define _DEFAULT_SOURCE
|
||||
#include "mndOffset.h"
|
||||
#include "mndAuth.h"
|
||||
#include "mndDb.h"
|
||||
#include "mndDnode.h"
|
||||
#include "mndMnode.h"
|
||||
#include "mndShow.h"
|
||||
#include "mndStb.h"
|
||||
#include "mndTrans.h"
|
||||
#include "mndUser.h"
|
||||
#include "mndVgroup.h"
|
||||
#include "tname.h"
|
||||
|
||||
#define MND_OFFSET_VER_NUMBER 1
|
||||
#define MND_OFFSET_RESERVE_SIZE 64
|
||||
|
||||
static int32_t mndOffsetActionInsert(SSdb *pSdb, SMqOffsetObj *pOffset);
|
||||
static int32_t mndOffsetActionDelete(SSdb *pSdb, SMqOffsetObj *pOffset);
|
||||
static int32_t mndOffsetActionUpdate(SSdb *pSdb, SMqOffsetObj *pOffset, SMqOffsetObj *pNewOffset);
|
||||
static int32_t mndProcessCommitOffsetReq(SMnodeMsg *pReq);
|
||||
|
||||
int32_t mndInitOffset(SMnode *pMnode) {
|
||||
SSdbTable table = {.sdbType = SDB_OFFSET,
|
||||
.keyType = SDB_KEY_BINARY,
|
||||
.encodeFp = (SdbEncodeFp)mndOffsetActionEncode,
|
||||
.decodeFp = (SdbDecodeFp)mndOffsetActionDecode,
|
||||
.insertFp = (SdbInsertFp)mndOffsetActionInsert,
|
||||
.updateFp = (SdbUpdateFp)mndOffsetActionUpdate,
|
||||
.deleteFp = (SdbDeleteFp)mndOffsetActionDelete};
|
||||
|
||||
mndSetMsgHandle(pMnode, TDMT_MND_MQ_COMMIT_OFFSET, mndProcessCommitOffsetReq);
|
||||
|
||||
return sdbSetTable(pMnode->pSdb, table);
|
||||
}
|
||||
|
||||
void mndCleanupOffset(SMnode *pMnode) {}
|
||||
|
||||
SSdbRaw *mndOffsetActionEncode(SMqOffsetObj *pOffset) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
void *buf = NULL;
|
||||
int32_t tlen = tEncodeSMqOffsetObj(NULL, pOffset);
|
||||
int32_t size = sizeof(int32_t) + tlen + MND_OFFSET_RESERVE_SIZE;
|
||||
|
||||
SSdbRaw *pRaw = sdbAllocRaw(SDB_OFFSET, MND_OFFSET_VER_NUMBER, size);
|
||||
if (pRaw == NULL) goto OFFSET_ENCODE_OVER;
|
||||
|
||||
buf = malloc(tlen);
|
||||
if (buf == NULL) goto OFFSET_ENCODE_OVER;
|
||||
|
||||
void *abuf = buf;
|
||||
tEncodeSMqOffsetObj(&abuf, pOffset);
|
||||
|
||||
int32_t dataPos = 0;
|
||||
SDB_SET_INT32(pRaw, dataPos, tlen, OFFSET_ENCODE_OVER);
|
||||
SDB_SET_BINARY(pRaw, dataPos, buf, tlen, OFFSET_ENCODE_OVER);
|
||||
SDB_SET_RESERVE(pRaw, dataPos, MND_OFFSET_RESERVE_SIZE, OFFSET_ENCODE_OVER);
|
||||
SDB_SET_DATALEN(pRaw, dataPos, OFFSET_ENCODE_OVER);
|
||||
|
||||
terrno = TSDB_CODE_SUCCESS;
|
||||
|
||||
OFFSET_ENCODE_OVER:
|
||||
tfree(buf);
|
||||
if (terrno != TSDB_CODE_SUCCESS) {
|
||||
mError("offset:%s, failed to encode to raw:%p since %s", pOffset->key, pRaw, terrstr());
|
||||
sdbFreeRaw(pRaw);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
mTrace("offset:%s, encode to raw:%p, row:%p", pOffset->key, pRaw, pOffset);
|
||||
return pRaw;
|
||||
}
|
||||
|
||||
SSdbRow *mndOffsetActionDecode(SSdbRaw *pRaw) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
void *buf = NULL;
|
||||
|
||||
int8_t sver = 0;
|
||||
if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto OFFSET_DECODE_OVER;
|
||||
|
||||
if (sver != MND_OFFSET_VER_NUMBER) {
|
||||
terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
|
||||
goto OFFSET_DECODE_OVER;
|
||||
}
|
||||
|
||||
int32_t size = sizeof(SMqOffsetObj);
|
||||
SSdbRow *pRow = sdbAllocRow(size);
|
||||
if (pRow == NULL) goto OFFSET_DECODE_OVER;
|
||||
|
||||
SMqOffsetObj *pOffset = sdbGetRowObj(pRow);
|
||||
if (pOffset == NULL) goto OFFSET_DECODE_OVER;
|
||||
|
||||
int32_t dataPos = 0;
|
||||
int32_t tlen;
|
||||
SDB_GET_INT32(pRaw, dataPos, &tlen, OFFSET_DECODE_OVER);
|
||||
buf = malloc(tlen + 1);
|
||||
if (buf == NULL) goto OFFSET_DECODE_OVER;
|
||||
SDB_GET_BINARY(pRaw, dataPos, buf, tlen, OFFSET_DECODE_OVER);
|
||||
SDB_GET_RESERVE(pRaw, dataPos, MND_OFFSET_RESERVE_SIZE, OFFSET_DECODE_OVER);
|
||||
|
||||
if (tDecodeSMqOffsetObj(buf, pOffset) == NULL) {
|
||||
goto OFFSET_DECODE_OVER;
|
||||
}
|
||||
|
||||
terrno = TSDB_CODE_SUCCESS;
|
||||
|
||||
OFFSET_DECODE_OVER:
|
||||
tfree(buf);
|
||||
if (terrno != TSDB_CODE_SUCCESS) {
|
||||
mError("offset:%s, failed to decode from raw:%p since %s", pOffset->key, pRaw, terrstr());
|
||||
tfree(pRow);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
mTrace("offset:%s, decode from raw:%p, row:%p", pOffset->key, pRaw, pOffset);
|
||||
return pRow;
|
||||
}
|
||||
|
||||
int32_t mndCreateOffset(STrans *pTrans, const char *cgroup, const char *topicName, const SArray *vgs) {
|
||||
int32_t code = 0;
|
||||
int32_t sz = taosArrayGetSize(vgs);
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
SMqConsumerEp *pConsumerEp = taosArrayGet(vgs, i);
|
||||
SMqOffsetObj offsetObj;
|
||||
if (mndMakePartitionKey(offsetObj.key, cgroup, topicName, pConsumerEp->vgId) < 0) {
|
||||
return -1;
|
||||
}
|
||||
offsetObj.offset = -1;
|
||||
SSdbRaw *pOffsetRaw = mndOffsetActionEncode(&offsetObj);
|
||||
if (pOffsetRaw == NULL) {
|
||||
return -1;
|
||||
}
|
||||
sdbSetRawStatus(pOffsetRaw, SDB_STATUS_READY);
|
||||
if (mndTransAppendRedolog(pTrans, pOffsetRaw) < 0) {
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndProcessCommitOffsetReq(SMnodeMsg *pMsg) {
|
||||
char key[TSDB_PARTITION_KEY_LEN];
|
||||
|
||||
SMnode *pMnode = pMsg->pMnode;
|
||||
char *msgStr = pMsg->rpcMsg.pCont;
|
||||
SMqCMCommitOffsetReq commitOffsetReq;
|
||||
SCoder decoder;
|
||||
tCoderInit(&decoder, TD_LITTLE_ENDIAN, msgStr, pMsg->rpcMsg.contLen, TD_DECODER);
|
||||
|
||||
tDecodeSMqCMCommitOffsetReq(&decoder, &commitOffsetReq);
|
||||
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_COMMIT_OFFSET, &pMsg->rpcMsg);
|
||||
|
||||
for (int32_t i = 0; i < commitOffsetReq.num; i++) {
|
||||
SMqOffset *pOffset = &commitOffsetReq.offsets[i];
|
||||
if (mndMakePartitionKey(key, pOffset->cgroup, pOffset->topicName, pOffset->vgId) < 0) {
|
||||
return -1;
|
||||
}
|
||||
SMqOffsetObj *pOffsetObj = mndAcquireOffset(pMnode, key);
|
||||
ASSERT(pOffsetObj);
|
||||
pOffsetObj->offset = pOffset->offset;
|
||||
SSdbRaw *pOffsetRaw = mndOffsetActionEncode(pOffsetObj);
|
||||
sdbSetRawStatus(pOffsetRaw, SDB_STATUS_READY);
|
||||
mndTransAppendRedolog(pTrans, pOffsetRaw);
|
||||
mndReleaseOffset(pMnode, pOffsetObj);
|
||||
}
|
||||
|
||||
if (mndTransPrepare(pMnode, pTrans) != 0) {
|
||||
mError("mq-commit-offset-trans:%d, failed to prepare since %s", pTrans->id, terrstr());
|
||||
mndTransDrop(pTrans);
|
||||
return -1;
|
||||
}
|
||||
|
||||
mndTransDrop(pTrans);
|
||||
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
|
||||
}
|
||||
|
||||
static int32_t mndOffsetActionInsert(SSdb *pSdb, SMqOffsetObj *pOffset) {
|
||||
mTrace("offset:%s, perform insert action", pOffset->key);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndOffsetActionDelete(SSdb *pSdb, SMqOffsetObj *pOffset) {
|
||||
mTrace("offset:%s, perform delete action", pOffset->key);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndOffsetActionUpdate(SSdb *pSdb, SMqOffsetObj *pOldOffset, SMqOffsetObj *pNewOffset) {
|
||||
mTrace("offset:%s, perform update action", pOldOffset->key);
|
||||
return 0;
|
||||
}
|
||||
|
||||
SMqOffsetObj *mndAcquireOffset(SMnode *pMnode, const char *key) {
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
SMqOffsetObj *pOffset = sdbAcquire(pSdb, SDB_OFFSET, key);
|
||||
if (pOffset == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
|
||||
terrno = TSDB_CODE_MND_OFFSET_NOT_EXIST;
|
||||
}
|
||||
return pOffset;
|
||||
}
|
||||
|
||||
void mndReleaseOffset(SMnode *pMnode, SMqOffsetObj *pOffset) {
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
sdbRelease(pSdb, pOffset);
|
||||
}
|
||||
|
||||
static void mndCancelGetNextOffset(SMnode *pMnode, void *pIter) {
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
sdbCancelFetch(pSdb, pIter);
|
||||
}
|
|
@ -19,6 +19,7 @@
|
|||
#include "mndDb.h"
|
||||
#include "mndDnode.h"
|
||||
#include "mndMnode.h"
|
||||
#include "mndOffset.h"
|
||||
#include "mndShow.h"
|
||||
#include "mndStb.h"
|
||||
#include "mndTopic.h"
|
||||
|
@ -80,13 +81,13 @@ int32_t mndInitSubscribe(SMnode *pMnode) {
|
|||
return sdbSetTable(pMnode->pSdb, table);
|
||||
}
|
||||
|
||||
static SMqSubscribeObj *mndCreateSubscription(SMnode *pMnode, const SMqTopicObj *pTopic, const char *consumerGroup) {
|
||||
static SMqSubscribeObj *mndCreateSubscription(SMnode *pMnode, const SMqTopicObj *pTopic, const char *cgroup) {
|
||||
SMqSubscribeObj *pSub = tNewSubscribeObj();
|
||||
if (pSub == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
}
|
||||
char *key = mndMakeSubscribeKey(consumerGroup, pTopic->name);
|
||||
char *key = mndMakeSubscribeKey(cgroup, pTopic->name);
|
||||
if (key == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
tDeleteSMqSubscribeObj(pSub);
|
||||
|
@ -289,9 +290,15 @@ static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg) {
|
|||
strcpy(topicEp.topic, topicName);
|
||||
topicEp.vgs = taosArrayInit(vgsz, sizeof(SMqSubVgEp));
|
||||
for (int32_t k = 0; k < vgsz; k++) {
|
||||
char offsetKey[TSDB_PARTITION_KEY_LEN];
|
||||
SMqConsumerEp *pConsumerEp = taosArrayGet(pSubConsumer->vgInfo, k);
|
||||
|
||||
SMqSubVgEp vgEp = {.epSet = pConsumerEp->epSet, .vgId = pConsumerEp->vgId};
|
||||
SMqSubVgEp vgEp = {.epSet = pConsumerEp->epSet, .vgId = pConsumerEp->vgId, .offset = -1};
|
||||
mndMakePartitionKey(offsetKey, pConsumer->cgroup, topicName, pConsumerEp->vgId);
|
||||
SMqOffsetObj *pOffsetObj = mndAcquireOffset(pMnode, offsetKey);
|
||||
if (pOffsetObj != NULL) {
|
||||
vgEp.offset = pOffsetObj->offset;
|
||||
mndReleaseOffset(pMnode, pOffsetObj);
|
||||
}
|
||||
taosArrayPush(topicEp.vgs, &vgEp);
|
||||
}
|
||||
taosArrayPush(rsp.topics, &topicEp);
|
||||
|
@ -870,7 +877,7 @@ static SSdbRaw *mndSubActionEncode(SMqSubscribeObj *pSub) {
|
|||
|
||||
SUB_ENCODE_OVER:
|
||||
tfree(buf);
|
||||
if (terrno != 0) {
|
||||
if (terrno != TSDB_CODE_SUCCESS) {
|
||||
mError("subscribe:%s, failed to encode to raw:%p since %s", pSub->key, pRaw, terrstr());
|
||||
sdbFreeRaw(pRaw);
|
||||
return NULL;
|
||||
|
@ -1085,6 +1092,8 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
|
|||
mDebug("create new subscription by consumer %ld, group: %s, topic %s", consumerId, cgroup, newTopicName);
|
||||
pSub = mndCreateSubscription(pMnode, pTopic, cgroup);
|
||||
createSub = true;
|
||||
|
||||
mndCreateOffset(pTrans, cgroup, newTopicName, pSub->unassignedVg);
|
||||
}
|
||||
|
||||
SMqSubConsumer mqSubConsumer;
|
||||
|
|
|
@ -23,6 +23,7 @@
|
|||
#include "mndDnode.h"
|
||||
#include "mndFunc.h"
|
||||
#include "mndMnode.h"
|
||||
#include "mndOffset.h"
|
||||
#include "mndProfile.h"
|
||||
#include "mndQnode.h"
|
||||
#include "mndShow.h"
|
||||
|
@ -77,7 +78,7 @@ static void mndTransReExecute(void *param, void *tmrId) {
|
|||
SMnode *pMnode = param;
|
||||
if (mndIsMaster(pMnode)) {
|
||||
int32_t contLen = 0;
|
||||
void * pReq = mndBuildTimerMsg(&contLen);
|
||||
void *pReq = mndBuildTimerMsg(&contLen);
|
||||
SRpcMsg rpcMsg = {.msgType = TDMT_MND_TRANS, .pCont = pReq, .contLen = contLen};
|
||||
pMnode->putReqToMWriteQFp(pMnode->pDnode, &rpcMsg);
|
||||
}
|
||||
|
@ -89,7 +90,7 @@ static void mndCalMqRebalance(void *param, void *tmrId) {
|
|||
SMnode *pMnode = param;
|
||||
if (mndIsMaster(pMnode)) {
|
||||
int32_t contLen = 0;
|
||||
void * pReq = mndBuildTimerMsg(&contLen);
|
||||
void *pReq = mndBuildTimerMsg(&contLen);
|
||||
SRpcMsg rpcMsg = {.msgType = TDMT_MND_MQ_TIMER, .pCont = pReq, .contLen = contLen};
|
||||
pMnode->putReqToMReadQFp(pMnode->pDnode, &rpcMsg);
|
||||
}
|
||||
|
@ -197,6 +198,7 @@ static int32_t mndInitSteps(SMnode *pMnode) {
|
|||
if (mndAllocStep(pMnode, "mnode-topic", mndInitTopic, mndCleanupTopic) != 0) return -1;
|
||||
if (mndAllocStep(pMnode, "mnode-consumer", mndInitConsumer, mndCleanupConsumer) != 0) return -1;
|
||||
if (mndAllocStep(pMnode, "mnode-subscribe", mndInitSubscribe, mndCleanupSubscribe) != 0) return -1;
|
||||
if (mndAllocStep(pMnode, "mnode-offset", mndInitOffset, mndCleanupOffset) != 0) return -1;
|
||||
if (mndAllocStep(pMnode, "mnode-vgroup", mndInitVgroup, mndCleanupVgroup) != 0) return -1;
|
||||
if (mndAllocStep(pMnode, "mnode-stb", mndInitStb, mndCleanupStb) != 0) return -1;
|
||||
if (mndAllocStep(pMnode, "mnode-db", mndInitDb, mndCleanupDb) != 0) return -1;
|
||||
|
@ -440,7 +442,7 @@ void mndProcessMsg(SMnodeMsg *pMsg) {
|
|||
SMnode *pMnode = pMsg->pMnode;
|
||||
int32_t code = 0;
|
||||
tmsg_t msgType = pMsg->rpcMsg.msgType;
|
||||
void * ahandle = pMsg->rpcMsg.ahandle;
|
||||
void *ahandle = pMsg->rpcMsg.ahandle;
|
||||
bool isReq = (msgType & 1U);
|
||||
|
||||
mTrace("msg:%p, type:%s will be processed, app:%p", pMsg, TMSG_INFO(msgType), ahandle);
|
||||
|
|
|
@ -207,9 +207,17 @@ int32_t tqDeserializeConsumer(STQ* pTq, const STqSerializedHead* pHead, STqConsu
|
|||
int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||
SMqConsumeReq* pReq = pMsg->pCont;
|
||||
int64_t consumerId = pReq->consumerId;
|
||||
int64_t fetchOffset = pReq->offset;
|
||||
int64_t fetchOffset;
|
||||
/*int64_t blockingTime = pReq->blockingTime;*/
|
||||
|
||||
if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__EARLIEAST) {
|
||||
fetchOffset = 0;
|
||||
} else if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__LATEST) {
|
||||
fetchOffset = walGetLastVer(pTq->pWal);
|
||||
} else {
|
||||
fetchOffset = pReq->currentOffset + 1;
|
||||
}
|
||||
|
||||
SMqConsumeRsp rsp = {.consumerId = consumerId, .numOfTopics = 0, .pBlockData = NULL};
|
||||
|
||||
STqConsumer* pConsumer = tqHandleGet(pTq->tqMeta, consumerId);
|
||||
|
@ -226,31 +234,9 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) {
|
|||
ASSERT(strcmp(pTopic->topicName, pReq->topic) == 0);
|
||||
ASSERT(pConsumer->consumerId == consumerId);
|
||||
|
||||
if (pReq->reqType == TMQ_REQ_TYPE_COMMIT_ONLY) {
|
||||
pTopic->committedOffset = pReq->offset;
|
||||
/*printf("offset %ld committed\n", pTopic->committedOffset);*/
|
||||
pMsg->pCont = NULL;
|
||||
pMsg->contLen = 0;
|
||||
pMsg->code = 0;
|
||||
rpcSendResponse(pMsg);
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (pReq->reqType == TMQ_REQ_TYPE_CONSUME_AND_COMMIT) {
|
||||
if (pTopic->committedOffset < pReq->offset - 1) {
|
||||
pTopic->committedOffset = pReq->offset - 1;
|
||||
/*printf("offset %ld committed\n", pTopic->committedOffset);*/
|
||||
}
|
||||
}
|
||||
|
||||
rsp.committedOffset = pTopic->committedOffset;
|
||||
rsp.reqOffset = pReq->offset;
|
||||
rsp.reqOffset = pReq->currentOffset;
|
||||
rsp.skipLogNum = 0;
|
||||
|
||||
if (fetchOffset <= pTopic->committedOffset) {
|
||||
fetchOffset = pTopic->committedOffset + 1;
|
||||
}
|
||||
|
||||
SWalHead* pHead;
|
||||
while (1) {
|
||||
int8_t pos = fetchOffset % TQ_BUFFER_SIZE;
|
||||
|
|
|
@ -22,14 +22,15 @@ int vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs) {
|
|||
for (int i = 0; i < taosArrayGetSize(pMsgs); i++) {
|
||||
pMsg = *(SRpcMsg **)taosArrayGet(pMsgs, i);
|
||||
|
||||
// ser request version
|
||||
// set request version
|
||||
void *pBuf = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
|
||||
int64_t ver = pVnode->state.processed++;
|
||||
taosEncodeFixedI64(&pBuf, ver);
|
||||
|
||||
if (walWrite(pVnode->pWal, ver, pMsg->msgType, pMsg->pCont, pMsg->contLen) < 0) {
|
||||
/*ASSERT(false);*/
|
||||
// TODO: handle error
|
||||
/*ASSERT(false);*/
|
||||
vError("vnode:%d write wal error since %s", pVnode->vgId, terrstr());
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -35,7 +35,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, char* id)
|
|||
} else {
|
||||
SStreamBlockScanInfo* pInfo = pOperator->info;
|
||||
if (tqReadHandleSetMsg(pInfo->readerHandle, input, 0) < 0) {
|
||||
qError("submit msg error while set stream msg, %s" PRIx64, id);
|
||||
qError("submit msg messed up when initing stream block, %s" PRIx64, id);
|
||||
return TSDB_CODE_QRY_APP_ERROR;
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
|
Loading…
Reference in New Issue