add tmq reset offset
This commit is contained in:
parent
2541d19124
commit
7bbca5e4b5
|
@ -28,7 +28,7 @@ int32_t init_env() {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
TAOS_RES* pRes = taos_query(pConn, "create database if not exists abc1 vgroups 1");
|
TAOS_RES* pRes = taos_query(pConn, "create database if not exists abc1 vgroups 2");
|
||||||
if (taos_errno(pRes) != 0) {
|
if (taos_errno(pRes) != 0) {
|
||||||
printf("error in create db, reason:%s\n", taos_errstr(pRes));
|
printf("error in create db, reason:%s\n", taos_errstr(pRes));
|
||||||
return -1;
|
return -1;
|
||||||
|
|
|
@ -1733,7 +1733,7 @@ typedef struct {
|
||||||
int32_t vgId;
|
int32_t vgId;
|
||||||
int64_t consumerId;
|
int64_t consumerId;
|
||||||
char topicName[TSDB_TOPIC_FNAME_LEN];
|
char topicName[TSDB_TOPIC_FNAME_LEN];
|
||||||
char cGroup[TSDB_CONSUMER_GROUP_LEN];
|
char cgroup[TSDB_CONSUMER_GROUP_LEN];
|
||||||
} SMqSetCVgRsp;
|
} SMqSetCVgRsp;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
@ -1741,9 +1741,42 @@ typedef struct {
|
||||||
int32_t vgId;
|
int32_t vgId;
|
||||||
int64_t consumerId;
|
int64_t consumerId;
|
||||||
char topicName[TSDB_TOPIC_FNAME_LEN];
|
char topicName[TSDB_TOPIC_FNAME_LEN];
|
||||||
char cGroup[TSDB_CONSUMER_GROUP_LEN];
|
char cgroup[TSDB_CONSUMER_GROUP_LEN];
|
||||||
} SMqMVRebRsp;
|
} SMqMVRebRsp;
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
int32_t vgId;
|
||||||
|
int64_t offset;
|
||||||
|
char topicName[TSDB_TOPIC_FNAME_LEN];
|
||||||
|
char cgroup[TSDB_CONSUMER_GROUP_LEN];
|
||||||
|
} SMqOffset;
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
int32_t num;
|
||||||
|
SMqOffset* offsets;
|
||||||
|
} SMqCMResetOffsetReq;
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
int32_t reserved;
|
||||||
|
} SMqCMResetOffsetRsp;
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
int32_t num;
|
||||||
|
SMqOffset* offsets;
|
||||||
|
} SMqMVResetOffsetReq;
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
int32_t reserved;
|
||||||
|
} SMqMVResetOffsetRsp;
|
||||||
|
|
||||||
|
int32_t tEncodeSMqOffset(SCoder* encoder, const SMqOffset* pOffset);
|
||||||
|
int32_t tDecodeSMqOffset(SCoder* decoder, SMqOffset* pOffset);
|
||||||
|
int32_t tEncodeSMqCMResetOffsetReq(SCoder* encoder, const SMqCMResetOffsetReq* pReq);
|
||||||
|
int32_t tDecodeSMqCMResetOffsetReq(SCoder* decoder, SMqCMResetOffsetReq* pReq);
|
||||||
|
|
||||||
|
int32_t tEncodeSMqMVResetOffsetReq(SCoder* encoder, const SMqMVResetOffsetReq* pReq);
|
||||||
|
int32_t tDecodeSMqMVResetOffsetReq(SCoder* decoder, SMqMVResetOffsetReq* pReq);
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
uint32_t nCols;
|
uint32_t nCols;
|
||||||
SSchema* pSchema;
|
SSchema* pSchema;
|
||||||
|
|
|
@ -142,6 +142,7 @@ enum {
|
||||||
TD_DEF_MSG_TYPE(TDMT_MND_ALTER_TOPIC, "mnode-alter-topic", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_MND_ALTER_TOPIC, "mnode-alter-topic", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_MND_DROP_TOPIC, "mnode-drop-topic", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_MND_DROP_TOPIC, "mnode-drop-topic", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_MND_SUBSCRIBE, "mnode-subscribe", SCMSubscribeReq, SCMSubscribeRsp)
|
TD_DEF_MSG_TYPE(TDMT_MND_SUBSCRIBE, "mnode-subscribe", SCMSubscribeReq, SCMSubscribeRsp)
|
||||||
|
TD_DEF_MSG_TYPE(TDMT_MND_RESET_OFFSET, "mnode-reset-offset", SMqCMResetOffsetReq, SMqCMResetOffsetRsp)
|
||||||
TD_DEF_MSG_TYPE(TDMT_MND_GET_SUB_EP, "mnode-get-sub-ep", SMqCMGetSubEpReq, SMqCMGetSubEpRsp)
|
TD_DEF_MSG_TYPE(TDMT_MND_GET_SUB_EP, "mnode-get-sub-ep", SMqCMGetSubEpReq, SMqCMGetSubEpRsp)
|
||||||
TD_DEF_MSG_TYPE(TDMT_MND_MQ_TIMER, "mnode-mq-timer", SMTimerReq, SMTimerReq)
|
TD_DEF_MSG_TYPE(TDMT_MND_MQ_TIMER, "mnode-mq-timer", SMTimerReq, SMTimerReq)
|
||||||
TD_DEF_MSG_TYPE(TDMT_MND_MQ_DO_REBALANCE, "mnode-mq-do-rebalance", SMqDoRebalanceMsg, SMqDoRebalanceMsg)
|
TD_DEF_MSG_TYPE(TDMT_MND_MQ_DO_REBALANCE, "mnode-mq-do-rebalance", SMqDoRebalanceMsg, SMqDoRebalanceMsg)
|
||||||
|
|
|
@ -35,9 +35,7 @@ struct tmq_list_t {
|
||||||
};
|
};
|
||||||
|
|
||||||
struct tmq_topic_vgroup_t {
|
struct tmq_topic_vgroup_t {
|
||||||
char* topic;
|
SMqOffset offset;
|
||||||
int32_t vgId;
|
|
||||||
int64_t offset;
|
|
||||||
};
|
};
|
||||||
|
|
||||||
struct tmq_topic_vgroup_list_t {
|
struct tmq_topic_vgroup_list_t {
|
||||||
|
@ -123,6 +121,12 @@ typedef struct {
|
||||||
tsem_t rspSem;
|
tsem_t rspSem;
|
||||||
} SMqCommitCbParam;
|
} SMqCommitCbParam;
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
tmq_t* tmq;
|
||||||
|
tsem_t rspSem;
|
||||||
|
tmq_resp_err_t rspErr;
|
||||||
|
} SMqResetOffsetParam;
|
||||||
|
|
||||||
tmq_conf_t* tmq_conf_new() {
|
tmq_conf_t* tmq_conf_new() {
|
||||||
tmq_conf_t* conf = calloc(1, sizeof(tmq_conf_t));
|
tmq_conf_t* conf = calloc(1, sizeof(tmq_conf_t));
|
||||||
conf->auto_commit = false;
|
conf->auto_commit = false;
|
||||||
|
@ -173,12 +177,6 @@ int32_t tmq_list_append(tmq_list_t* ptr, const char* src) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
tmq_resp_err_t tmq_reset_offset(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets) {
|
|
||||||
// build msg
|
|
||||||
// send to mnode
|
|
||||||
return TMQ_RESP_ERR__SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t tmqSubscribeCb(void* param, const SDataBuf* pMsg, int32_t code) {
|
int32_t tmqSubscribeCb(void* param, const SDataBuf* pMsg, int32_t code) {
|
||||||
SMqSubscribeCbParam* pParam = (SMqSubscribeCbParam*)param;
|
SMqSubscribeCbParam* pParam = (SMqSubscribeCbParam*)param;
|
||||||
pParam->rspErr = code;
|
pParam->rspErr = code;
|
||||||
|
@ -196,6 +194,13 @@ int32_t tmqCommitCb(void* param, const SDataBuf* pMsg, int32_t code) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t tmqResetOffsetCb(void* param, const SDataBuf* pMsg, int32_t code) {
|
||||||
|
SMqResetOffsetParam* pParam = (SMqResetOffsetParam*)param;
|
||||||
|
pParam->rspErr = code;
|
||||||
|
tsem_post(&pParam->rspSem);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
tmq_t* tmq_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
|
tmq_t* tmq_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
|
||||||
tmq_t* pTmq = calloc(sizeof(tmq_t), 1);
|
tmq_t* pTmq = calloc(sizeof(tmq_t), 1);
|
||||||
if (pTmq == NULL) {
|
if (pTmq == NULL) {
|
||||||
|
@ -218,6 +223,55 @@ tmq_t* tmq_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errs
|
||||||
return pTmq;
|
return pTmq;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
tmq_resp_err_t tmq_reset_offset(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets) {
|
||||||
|
SRequestObj* pRequest = NULL;
|
||||||
|
// build msg
|
||||||
|
// send to mnode
|
||||||
|
SMqCMResetOffsetReq req;
|
||||||
|
req.num = offsets->cnt;
|
||||||
|
req.offsets = (SMqOffset*)offsets->elems;
|
||||||
|
|
||||||
|
SCoder encoder;
|
||||||
|
|
||||||
|
tCoderInit(&encoder, TD_LITTLE_ENDIAN, NULL, 0, TD_ENCODER);
|
||||||
|
tEncodeSMqCMResetOffsetReq(&encoder, &req);
|
||||||
|
int32_t tlen = encoder.pos;
|
||||||
|
void* buf = malloc(tlen);
|
||||||
|
if (buf == NULL) {
|
||||||
|
tCoderClear(&encoder);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
tCoderClear(&encoder);
|
||||||
|
|
||||||
|
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, tlen, TD_ENCODER);
|
||||||
|
tEncodeSMqCMResetOffsetReq(&encoder, &req);
|
||||||
|
tCoderClear(&encoder);
|
||||||
|
|
||||||
|
pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_MND_RESET_OFFSET);
|
||||||
|
if (pRequest == NULL) {
|
||||||
|
tscError("failed to malloc request");
|
||||||
|
}
|
||||||
|
|
||||||
|
SMqResetOffsetParam param = {0};
|
||||||
|
tsem_init(¶m.rspSem, 0, 0);
|
||||||
|
param.tmq = tmq;
|
||||||
|
|
||||||
|
pRequest->body.requestMsg = (SDataBuf){.pData = buf, .len = tlen};
|
||||||
|
|
||||||
|
SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest);
|
||||||
|
sendInfo->param = ¶m;
|
||||||
|
sendInfo->fp = tmqResetOffsetCb;
|
||||||
|
SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);
|
||||||
|
|
||||||
|
int64_t transporterId = 0;
|
||||||
|
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
|
||||||
|
|
||||||
|
tsem_wait(¶m.rspSem);
|
||||||
|
tsem_destroy(¶m.rspSem);
|
||||||
|
|
||||||
|
return TMQ_RESP_ERR__SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
tmq_resp_err_t tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) {
|
tmq_resp_err_t tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) {
|
||||||
SRequestObj* pRequest = NULL;
|
SRequestObj* pRequest = NULL;
|
||||||
int32_t sz = topic_list->cnt;
|
int32_t sz = topic_list->cnt;
|
||||||
|
@ -244,6 +298,7 @@ tmq_resp_err_t tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) {
|
||||||
|
|
||||||
char* topicFname = calloc(1, TSDB_TOPIC_FNAME_LEN);
|
char* topicFname = calloc(1, TSDB_TOPIC_FNAME_LEN);
|
||||||
if (topicFname == NULL) {
|
if (topicFname == NULL) {
|
||||||
|
goto _return;
|
||||||
}
|
}
|
||||||
tNameExtractFullName(&name, topicFname);
|
tNameExtractFullName(&name, topicFname);
|
||||||
tscDebug("subscribe topic: %s", topicFname);
|
tscDebug("subscribe topic: %s", topicFname);
|
||||||
|
@ -251,9 +306,6 @@ tmq_resp_err_t tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) {
|
||||||
.nextVgIdx = 0, .sql = NULL, .sqlLen = 0, .topicId = 0, .topicName = topicFname, .vgs = NULL};
|
.nextVgIdx = 0, .sql = NULL, .sqlLen = 0, .topicId = 0, .topicName = topicFname, .vgs = NULL};
|
||||||
topic.vgs = taosArrayInit(0, sizeof(SMqClientVg));
|
topic.vgs = taosArrayInit(0, sizeof(SMqClientVg));
|
||||||
taosArrayPush(tmq->clientTopics, &topic);
|
taosArrayPush(tmq->clientTopics, &topic);
|
||||||
/*SMqClientTopic topic = {*/
|
|
||||||
/*.*/
|
|
||||||
/*};*/
|
|
||||||
taosArrayPush(req.topicNames, &topicFname);
|
taosArrayPush(req.topicNames, &topicFname);
|
||||||
free(dbName);
|
free(dbName);
|
||||||
}
|
}
|
||||||
|
@ -270,7 +322,7 @@ tmq_resp_err_t tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) {
|
||||||
|
|
||||||
pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_MND_SUBSCRIBE);
|
pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_MND_SUBSCRIBE);
|
||||||
if (pRequest == NULL) {
|
if (pRequest == NULL) {
|
||||||
tscError("failed to malloc sqlObj");
|
tscError("failed to malloc request");
|
||||||
}
|
}
|
||||||
|
|
||||||
SMqSubscribeCbParam param = {.rspErr = TMQ_RESP_ERR__SUCCESS, .tmq = tmq};
|
SMqSubscribeCbParam param = {.rspErr = TMQ_RESP_ERR__SUCCESS, .tmq = tmq};
|
||||||
|
|
|
@ -2306,3 +2306,57 @@ int32_t tDeserializeSAuthReq(void *buf, int32_t bufLen, SAuthReq *pReq) {
|
||||||
tCoderClear(&decoder);
|
tCoderClear(&decoder);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t tEncodeSMqOffset(SCoder *encoder, const SMqOffset *pOffset) {
|
||||||
|
if (tEncodeI32(encoder, pOffset->vgId) < 0) return -1;
|
||||||
|
if (tEncodeI64(encoder, pOffset->offset) < 0) return -1;
|
||||||
|
if (tEncodeCStr(encoder, pOffset->topicName) < 0) return -1;
|
||||||
|
if (tEncodeCStr(encoder, pOffset->cgroup) < 0) return -1;
|
||||||
|
return encoder->pos;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tDecodeSMqOffset(SCoder *decoder, SMqOffset *pOffset) {
|
||||||
|
if (tDecodeI32(decoder, &pOffset->vgId) < 0) return -1;
|
||||||
|
if (tDecodeI64(decoder, &pOffset->offset) < 0) return -1;
|
||||||
|
if (tDecodeCStrTo(decoder, pOffset->topicName) < 0) return -1;
|
||||||
|
if (tDecodeCStrTo(decoder, pOffset->cgroup) < 0) return -1;
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tEncodeSMqCMResetOffsetReq(SCoder *encoder, const SMqCMResetOffsetReq *pReq) {
|
||||||
|
if (tStartEncode(encoder) < 0) return -1;
|
||||||
|
if (tEncodeI32(encoder, pReq->num) < 0) return -1;
|
||||||
|
for (int32_t i = 0; i < pReq->num; i++) {
|
||||||
|
tEncodeSMqOffset(encoder, &pReq->offsets[i]);
|
||||||
|
}
|
||||||
|
tEndEncode(encoder);
|
||||||
|
return encoder->pos;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tDecodeSMqCMResetOffsetReq(SCoder *decoder, SMqCMResetOffsetReq *pReq) {
|
||||||
|
if (tDecodeI32(decoder, &pReq->num) < 0) return -1;
|
||||||
|
pReq->offsets = TCODER_MALLOC(pReq->num * sizeof(SMqOffset), decoder);
|
||||||
|
if (pReq->offsets == NULL) return -1;
|
||||||
|
for (int32_t i = 0; i < pReq->num; i++) {
|
||||||
|
tDecodeSMqOffset(decoder, &pReq->offsets[i]);
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tEncodeSMqMVResetOffsetReq(SCoder *encoder, const SMqMVResetOffsetReq *pReq) {
|
||||||
|
if (tEncodeI32(encoder, pReq->num) < 0) return -1;
|
||||||
|
for (int32_t i = 0; i < pReq->num; i++) {
|
||||||
|
tEncodeSMqOffset(encoder, &pReq->offsets[i]);
|
||||||
|
}
|
||||||
|
return encoder->pos;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tDecodeSMqMVResetOffsetReq(SCoder *decoder, SMqMVResetOffsetReq *pReq) {
|
||||||
|
if (tDecodeI32(decoder, &pReq->num) < 0) return -1;
|
||||||
|
pReq->offsets = TCODER_MALLOC(pReq->num * sizeof(SMqOffset), decoder);
|
||||||
|
if (pReq->offsets == NULL) return -1;
|
||||||
|
for (int32_t i = 0; i < pReq->num; i++) {
|
||||||
|
tDecodeSMqOffset(decoder, &pReq->offsets[i]);
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
|
@ -345,6 +345,14 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) {
|
||||||
taosArrayPush(pRebSub->removedConsumers, &pConsumer->consumerId);
|
taosArrayPush(pRebSub->removedConsumers, &pConsumer->consumerId);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if (status == MQ_CONSUMER_STATUS__MODIFY) {
|
||||||
|
int32_t removeSz = taosArrayGetSize(pConsumer->recentRemovedTopics);
|
||||||
|
for (int32_t i = 0; i < removeSz; i++) {
|
||||||
|
char *topicName = taosArrayGet(pConsumer->recentRemovedTopics, i);
|
||||||
|
free(topicName);
|
||||||
|
}
|
||||||
|
taosArrayClear(pConsumer->recentRemovedTopics);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (taosHashGetSize(pRebMsg->rebSubHash) != 0) {
|
if (taosHashGetSize(pRebMsg->rebSubHash) != 0) {
|
||||||
|
@ -1013,6 +1021,8 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
char *oldTopicNameDup = strdup(oldTopicName);
|
||||||
|
taosArrayPush(pConsumer->recentRemovedTopics, &oldTopicNameDup);
|
||||||
atomic_store_32(&pConsumer->status, MQ_CONSUMER_STATUS__MODIFY);
|
atomic_store_32(&pConsumer->status, MQ_CONSUMER_STATUS__MODIFY);
|
||||||
/*pSub->status = MQ_SUBSCRIBE_STATUS__DELETED;*/
|
/*pSub->status = MQ_SUBSCRIBE_STATUS__DELETED;*/
|
||||||
} else if (newTopicName != NULL) {
|
} else if (newTopicName != NULL) {
|
||||||
|
|
|
@ -192,12 +192,12 @@ int32_t tqDeserializeConsumer(STQ* pTq, const STqSerializedHead* pHead, STqConsu
|
||||||
if (pTopic->pReadhandle == NULL) {
|
if (pTopic->pReadhandle == NULL) {
|
||||||
ASSERT(false);
|
ASSERT(false);
|
||||||
}
|
}
|
||||||
for (int i = 0; i < TQ_BUFFER_SIZE; i++) {
|
for (int j = 0; j < TQ_BUFFER_SIZE; j++) {
|
||||||
pTopic->buffer.output[i].status = 0;
|
pTopic->buffer.output[j].status = 0;
|
||||||
STqReadHandle* pReadHandle = tqInitSubmitMsgScanner(pTq->pMeta);
|
STqReadHandle* pReadHandle = tqInitSubmitMsgScanner(pTq->pMeta);
|
||||||
SReadHandle handle = {.reader = pReadHandle, .meta = pTq->pMeta};
|
SReadHandle handle = {.reader = pReadHandle, .meta = pTq->pMeta};
|
||||||
pTopic->buffer.output[i].pReadHandle = pReadHandle;
|
pTopic->buffer.output[j].pReadHandle = pReadHandle;
|
||||||
pTopic->buffer.output[i].task = qCreateStreamExecTaskInfo(pTopic->qmsg, &handle);
|
pTopic->buffer.output[j].task = qCreateStreamExecTaskInfo(pTopic->qmsg, &handle);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue