commit
7412491c91
|
@ -173,6 +173,12 @@ int32_t tmq_list_append(tmq_list_t* ptr, const char* src) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
tmq_resp_err_t tmq_reset_offset(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets) {
|
||||||
|
// build msg
|
||||||
|
// send to mnode
|
||||||
|
return TMQ_RESP_ERR__SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t tmqSubscribeCb(void* param, const SDataBuf* pMsg, int32_t code) {
|
int32_t tmqSubscribeCb(void* param, const SDataBuf* pMsg, int32_t code) {
|
||||||
SMqSubscribeCbParam* pParam = (SMqSubscribeCbParam*)param;
|
SMqSubscribeCbParam* pParam = (SMqSubscribeCbParam*)param;
|
||||||
pParam->rspErr = code;
|
pParam->rspErr = code;
|
||||||
|
@ -704,7 +710,7 @@ tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) {
|
||||||
pTopic->nextVgIdx = (pTopic->nextVgIdx + 1) % taosArrayGetSize(pTopic->vgs);
|
pTopic->nextVgIdx = (pTopic->nextVgIdx + 1) % taosArrayGetSize(pTopic->vgs);
|
||||||
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, pTopic->nextVgIdx);
|
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, pTopic->nextVgIdx);
|
||||||
/*printf("consume vg %d, offset %ld\n", pVg->vgId, pVg->currentOffset);*/
|
/*printf("consume vg %d, offset %ld\n", pVg->vgId, pVg->currentOffset);*/
|
||||||
int32_t reqType = tmq->autoCommit ? TMQ_REQ_TYPE_CONSUME_AND_COMMIT : TMQ_REQ_TYPE_COMMIT_ONLY;
|
int32_t reqType = tmq->autoCommit ? TMQ_REQ_TYPE_CONSUME_AND_COMMIT : TMQ_REQ_TYPE_CONSUME_ONLY;
|
||||||
SMqConsumeReq* pReq = tmqBuildConsumeReqImpl(tmq, blocking_time, reqType, pTopic, pVg);
|
SMqConsumeReq* pReq = tmqBuildConsumeReqImpl(tmq, blocking_time, reqType, pTopic, pVg);
|
||||||
if (pReq == NULL) {
|
if (pReq == NULL) {
|
||||||
ASSERT(false);
|
ASSERT(false);
|
||||||
|
|
Loading…
Reference in New Issue