refactor: do some internal refactor and add the sample code.
This commit is contained in:
parent
72ddd1676d
commit
62dad38d5c
|
@ -20,7 +20,8 @@
|
||||||
#include <time.h>
|
#include <time.h>
|
||||||
#include "taos.h"
|
#include "taos.h"
|
||||||
|
|
||||||
static int running = 1;
|
static int running = 1;
|
||||||
|
const char* topic_name = "topicname";
|
||||||
|
|
||||||
static int32_t msg_process(TAOS_RES* msg) {
|
static int32_t msg_process(TAOS_RES* msg) {
|
||||||
char buf[1024];
|
char buf[1024];
|
||||||
|
@ -243,7 +244,7 @@ _end:
|
||||||
|
|
||||||
tmq_list_t* build_topic_list() {
|
tmq_list_t* build_topic_list() {
|
||||||
tmq_list_t* topicList = tmq_list_new();
|
tmq_list_t* topicList = tmq_list_new();
|
||||||
int32_t code = tmq_list_append(topicList, "topicname");
|
int32_t code = tmq_list_append(topicList, topic_name);
|
||||||
if (code) {
|
if (code) {
|
||||||
tmq_list_destroy(topicList);
|
tmq_list_destroy(topicList);
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -269,6 +270,31 @@ void basic_consume_loop(tmq_t* tmq) {
|
||||||
fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows);
|
fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void consume_repeatly(tmq_t* tmq) {
|
||||||
|
int32_t numOfAssignment = 0;
|
||||||
|
tmq_topic_assignment* pAssign = NULL;
|
||||||
|
|
||||||
|
int32_t code = tmq_get_topic_assignment(tmq, topic_name, &pAssign, &numOfAssignment);
|
||||||
|
if (code != 0) {
|
||||||
|
fprintf(stderr, "failed to get assignment, reason:%s", tmq_err2str(code));
|
||||||
|
}
|
||||||
|
|
||||||
|
// seek to the earliest offset
|
||||||
|
for(int32_t i = 0; i < numOfAssignment; ++i) {
|
||||||
|
tmq_topic_assignment* p = &pAssign[i];
|
||||||
|
|
||||||
|
code = tmq_offset_seek(tmq, topic_name, p->vgId, p->begin);
|
||||||
|
if (code != 0) {
|
||||||
|
fprintf(stderr, "failed to seek to %ld, reason:%s", p->begin, tmq_err2str(code));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
free(pAssign);
|
||||||
|
|
||||||
|
// let's do it again
|
||||||
|
basic_consume_loop(tmq);
|
||||||
|
}
|
||||||
|
|
||||||
int main(int argc, char* argv[]) {
|
int main(int argc, char* argv[]) {
|
||||||
int32_t code;
|
int32_t code;
|
||||||
|
|
||||||
|
@ -294,10 +320,13 @@ int main(int argc, char* argv[]) {
|
||||||
if ((code = tmq_subscribe(tmq, topic_list))) {
|
if ((code = tmq_subscribe(tmq, topic_list))) {
|
||||||
fprintf(stderr, "Failed to tmq_subscribe(): %s\n", tmq_err2str(code));
|
fprintf(stderr, "Failed to tmq_subscribe(): %s\n", tmq_err2str(code));
|
||||||
}
|
}
|
||||||
|
|
||||||
tmq_list_destroy(topic_list);
|
tmq_list_destroy(topic_list);
|
||||||
|
|
||||||
basic_consume_loop(tmq);
|
basic_consume_loop(tmq);
|
||||||
|
|
||||||
|
consume_repeatly(tmq);
|
||||||
|
|
||||||
code = tmq_consumer_close(tmq);
|
code = tmq_consumer_close(tmq);
|
||||||
if (code) {
|
if (code) {
|
||||||
fprintf(stderr, "Failed to close consumer: %s\n", tmq_err2str(code));
|
fprintf(stderr, "Failed to close consumer: %s\n", tmq_err2str(code));
|
||||||
|
|
|
@ -263,7 +263,7 @@ DLL_EXPORT const char *tmq_err2str(int32_t code);
|
||||||
|
|
||||||
/* ------------------------TMQ CONSUMER INTERFACE------------------------ */
|
/* ------------------------TMQ CONSUMER INTERFACE------------------------ */
|
||||||
typedef struct tmq_topic_assignment {
|
typedef struct tmq_topic_assignment {
|
||||||
int32_t vgroupHandle;
|
int32_t vgId;
|
||||||
int64_t currentOffset;
|
int64_t currentOffset;
|
||||||
int64_t begin;
|
int64_t begin;
|
||||||
int64_t end;
|
int64_t end;
|
||||||
|
@ -277,7 +277,7 @@ DLL_EXPORT int32_t tmq_consumer_close(tmq_t *tmq);
|
||||||
DLL_EXPORT int32_t tmq_commit_sync(tmq_t *tmq, const TAOS_RES *msg);
|
DLL_EXPORT int32_t tmq_commit_sync(tmq_t *tmq, const TAOS_RES *msg);
|
||||||
DLL_EXPORT void tmq_commit_async(tmq_t *tmq, const TAOS_RES *msg, tmq_commit_cb *cb, void *param);
|
DLL_EXPORT void tmq_commit_async(tmq_t *tmq, const TAOS_RES *msg, tmq_commit_cb *cb, void *param);
|
||||||
DLL_EXPORT int32_t tmq_get_topic_assignment(tmq_t *tmq, const char* pTopicName, tmq_topic_assignment **assignment, int32_t *numOfAssignment);
|
DLL_EXPORT int32_t tmq_get_topic_assignment(tmq_t *tmq, const char* pTopicName, tmq_topic_assignment **assignment, int32_t *numOfAssignment);
|
||||||
DLL_EXPORT int32_t tmq_offset_seek(tmq_t *tmq, const char* pTopicName, int32_t vgroupHandle, int64_t offset);
|
DLL_EXPORT int32_t tmq_offset_seek(tmq_t *tmq, const char* pTopicName, int32_t vgId, int64_t offset);
|
||||||
|
|
||||||
/* ----------------------TMQ CONFIGURATION INTERFACE---------------------- */
|
/* ----------------------TMQ CONFIGURATION INTERFACE---------------------- */
|
||||||
|
|
||||||
|
|
|
@ -2357,7 +2357,7 @@ static int32_t tmqGetWalInfoCb(void* param, SDataBuf* pMsg, int32_t code) {
|
||||||
tmq_topic_assignment assignment = {.begin = pHead->walsver,
|
tmq_topic_assignment assignment = {.begin = pHead->walsver,
|
||||||
.end = pHead->walever,
|
.end = pHead->walever,
|
||||||
.currentOffset = rsp.rspOffset.version,
|
.currentOffset = rsp.rspOffset.version,
|
||||||
.vgroupHandle = pParam->vgId};
|
.vgId = pParam->vgId};
|
||||||
|
|
||||||
taosThreadMutexLock(&pCommon->mutex);
|
taosThreadMutexLock(&pCommon->mutex);
|
||||||
taosArrayPush(pCommon->pList, &assignment);
|
taosArrayPush(pCommon->pList, &assignment);
|
||||||
|
@ -2422,7 +2422,7 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a
|
||||||
|
|
||||||
pAssignment->begin = pClientVg->offsetInfo.walVerBegin;
|
pAssignment->begin = pClientVg->offsetInfo.walVerBegin;
|
||||||
pAssignment->end = pClientVg->offsetInfo.walVerEnd;
|
pAssignment->end = pClientVg->offsetInfo.walVerEnd;
|
||||||
pAssignment->vgroupHandle = pClientVg->vgId;
|
pAssignment->vgId = pClientVg->vgId;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (needFetch) {
|
if (needFetch) {
|
||||||
|
@ -2524,7 +2524,7 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgroupHandle, int64_t offset) {
|
int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_t offset) {
|
||||||
if (tmq == NULL) {
|
if (tmq == NULL) {
|
||||||
tscError("invalid tmq handle, null");
|
tscError("invalid tmq handle, null");
|
||||||
return TSDB_CODE_INVALID_PARA;
|
return TSDB_CODE_INVALID_PARA;
|
||||||
|
@ -2544,14 +2544,14 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgroupHandle
|
||||||
int32_t numOfVgs = taosArrayGetSize(pTopic->vgs);
|
int32_t numOfVgs = taosArrayGetSize(pTopic->vgs);
|
||||||
for (int32_t i = 0; i < numOfVgs; ++i) {
|
for (int32_t i = 0; i < numOfVgs; ++i) {
|
||||||
SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, i);
|
SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, i);
|
||||||
if (pClientVg->vgId == vgroupHandle) {
|
if (pClientVg->vgId == vgId) {
|
||||||
pVg = pClientVg;
|
pVg = pClientVg;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pVg == NULL) {
|
if (pVg == NULL) {
|
||||||
tscError("consumer:0x%" PRIx64 " invalid vgroup id:%d", tmq->consumerId, vgroupHandle);
|
tscError("consumer:0x%" PRIx64 " invalid vgroup id:%d", tmq->consumerId, vgId);
|
||||||
return TSDB_CODE_INVALID_PARA;
|
return TSDB_CODE_INVALID_PARA;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue