Merge pull request #13940 from taosdata/feature/stream
feat(tmq): support commit one msg
This commit is contained in:
commit
5b8b75c8ee
|
@ -146,8 +146,8 @@ int32_t create_topic() {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
void tmq_commit_cb_print(tmq_t* tmq, tmq_resp_err_t resp, tmq_topic_vgroup_list_t* offsets, void* param) {
|
void tmq_commit_cb_print(tmq_t* tmq, int32_t code, void* param) {
|
||||||
printf("commit %d tmq %p offsets %p param %p\n", resp, tmq, offsets, param);
|
printf("commit %d tmq %p param %p\n", code, tmq, param);
|
||||||
}
|
}
|
||||||
|
|
||||||
tmq_t* build_consumer() {
|
tmq_t* build_consumer() {
|
||||||
|
@ -183,10 +183,10 @@ tmq_list_t* build_topic_list() {
|
||||||
}
|
}
|
||||||
|
|
||||||
void basic_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
|
void basic_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
|
||||||
tmq_resp_err_t err;
|
int32_t code;
|
||||||
|
|
||||||
if ((err = tmq_subscribe(tmq, topics))) {
|
if ((code = tmq_subscribe(tmq, topics))) {
|
||||||
fprintf(stderr, "%% Failed to start consuming topics: %s\n", tmq_err2str(err));
|
fprintf(stderr, "%% Failed to start consuming topics: %s\n", tmq_err2str(code));
|
||||||
printf("subscribe err\n");
|
printf("subscribe err\n");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -205,9 +205,9 @@ void basic_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
err = tmq_consumer_close(tmq);
|
code = tmq_consumer_close(tmq);
|
||||||
if (err)
|
if (code)
|
||||||
fprintf(stderr, "%% Failed to close consumer: %s\n", tmq_err2str(err));
|
fprintf(stderr, "%% Failed to close consumer: %s\n", tmq_err2str(code));
|
||||||
else
|
else
|
||||||
fprintf(stderr, "%% Consumer closed\n");
|
fprintf(stderr, "%% Consumer closed\n");
|
||||||
}
|
}
|
||||||
|
@ -215,11 +215,11 @@ void basic_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
|
||||||
void sync_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
|
void sync_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
|
||||||
static const int MIN_COMMIT_COUNT = 1;
|
static const int MIN_COMMIT_COUNT = 1;
|
||||||
|
|
||||||
int msg_count = 0;
|
int msg_count = 0;
|
||||||
tmq_resp_err_t err;
|
int32_t code;
|
||||||
|
|
||||||
if ((err = tmq_subscribe(tmq, topics))) {
|
if ((code = tmq_subscribe(tmq, topics))) {
|
||||||
fprintf(stderr, "%% Failed to start consuming topics: %s\n", tmq_err2str(err));
|
fprintf(stderr, "%% Failed to start consuming topics: %s\n", tmq_err2str(code));
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -245,9 +245,9 @@ void sync_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
err = tmq_consumer_close(tmq);
|
code = tmq_consumer_close(tmq);
|
||||||
if (err)
|
if (code)
|
||||||
fprintf(stderr, "%% Failed to close consumer: %s\n", tmq_err2str(err));
|
fprintf(stderr, "%% Failed to close consumer: %s\n", tmq_err2str(code));
|
||||||
else
|
else
|
||||||
fprintf(stderr, "%% Consumer closed\n");
|
fprintf(stderr, "%% Consumer closed\n");
|
||||||
}
|
}
|
||||||
|
|
|
@ -209,21 +209,20 @@ DLL_EXPORT TAOS_RES *taos_schemaless_insert(TAOS *taos, char *lines[], int numLi
|
||||||
|
|
||||||
/* --------------------------TMQ INTERFACE------------------------------- */
|
/* --------------------------TMQ INTERFACE------------------------------- */
|
||||||
|
|
||||||
|
#if 0
|
||||||
enum {
|
enum {
|
||||||
TMQ_RESP_ERR__FAIL = -1,
|
TMQ_RESP_ERR__FAIL = -1,
|
||||||
TMQ_RESP_ERR__SUCCESS = 0,
|
TMQ_RESP_ERR__SUCCESS = 0,
|
||||||
};
|
};
|
||||||
|
|
||||||
typedef int32_t tmq_resp_err_t;
|
typedef int32_t tmq_resp_err_t;
|
||||||
|
#endif
|
||||||
|
|
||||||
typedef struct tmq_t tmq_t;
|
typedef struct tmq_t tmq_t;
|
||||||
typedef struct tmq_topic_vgroup_t tmq_topic_vgroup_t;
|
|
||||||
typedef struct tmq_topic_vgroup_list_t tmq_topic_vgroup_list_t;
|
|
||||||
|
|
||||||
typedef struct tmq_conf_t tmq_conf_t;
|
typedef struct tmq_conf_t tmq_conf_t;
|
||||||
typedef struct tmq_list_t tmq_list_t;
|
typedef struct tmq_list_t tmq_list_t;
|
||||||
|
|
||||||
typedef void(tmq_commit_cb(tmq_t *, tmq_resp_err_t, tmq_topic_vgroup_list_t *, void *param));
|
typedef void(tmq_commit_cb(tmq_t *, int32_t code, void *param));
|
||||||
|
|
||||||
DLL_EXPORT tmq_list_t *tmq_list_new();
|
DLL_EXPORT tmq_list_t *tmq_list_new();
|
||||||
DLL_EXPORT int32_t tmq_list_append(tmq_list_t *, const char *);
|
DLL_EXPORT int32_t tmq_list_append(tmq_list_t *, const char *);
|
||||||
|
@ -233,24 +232,19 @@ DLL_EXPORT char **tmq_list_to_c_array(const tmq_list_t *);
|
||||||
|
|
||||||
DLL_EXPORT tmq_t *tmq_consumer_new(tmq_conf_t *conf, char *errstr, int32_t errstrLen);
|
DLL_EXPORT tmq_t *tmq_consumer_new(tmq_conf_t *conf, char *errstr, int32_t errstrLen);
|
||||||
|
|
||||||
DLL_EXPORT const char *tmq_err2str(tmq_resp_err_t);
|
DLL_EXPORT const char *tmq_err2str(int32_t code);
|
||||||
|
|
||||||
/* ------------------------TMQ CONSUMER INTERFACE------------------------ */
|
/* ------------------------TMQ CONSUMER INTERFACE------------------------ */
|
||||||
|
|
||||||
DLL_EXPORT tmq_resp_err_t tmq_subscribe(tmq_t *tmq, const tmq_list_t *topic_list);
|
DLL_EXPORT int32_t tmq_subscribe(tmq_t *tmq, const tmq_list_t *topic_list);
|
||||||
DLL_EXPORT tmq_resp_err_t tmq_unsubscribe(tmq_t *tmq);
|
DLL_EXPORT int32_t tmq_unsubscribe(tmq_t *tmq);
|
||||||
DLL_EXPORT tmq_resp_err_t tmq_subscription(tmq_t *tmq, tmq_list_t **topics);
|
DLL_EXPORT int32_t tmq_subscription(tmq_t *tmq, tmq_list_t **topics);
|
||||||
// timeout: -1 means infinitely waiting
|
// timeout: -1 means infinitely waiting
|
||||||
DLL_EXPORT TAOS_RES *tmq_consumer_poll(tmq_t *tmq, int64_t timeout);
|
DLL_EXPORT TAOS_RES *tmq_consumer_poll(tmq_t *tmq, int64_t timeout);
|
||||||
DLL_EXPORT tmq_resp_err_t tmq_consumer_close(tmq_t *tmq);
|
DLL_EXPORT int32_t tmq_consumer_close(tmq_t *tmq);
|
||||||
|
|
||||||
DLL_EXPORT tmq_resp_err_t tmq_commit_sync(tmq_t *tmq, const tmq_topic_vgroup_list_t *offsets);
|
DLL_EXPORT int32_t tmq_commit_sync(tmq_t *tmq, const TAOS_RES *msg);
|
||||||
DLL_EXPORT void tmq_commit_async(tmq_t *tmq, const tmq_topic_vgroup_list_t *offsets, tmq_commit_cb *cb, void *param);
|
DLL_EXPORT void tmq_commit_async(tmq_t *tmq, const TAOS_RES *msg, tmq_commit_cb *cb, void *param);
|
||||||
|
|
||||||
#if 0
|
|
||||||
DLL_EXPORT tmq_resp_err_t tmq_commit(tmq_t *tmq, const tmq_topic_vgroup_list_t *offsets, int32_t async);
|
|
||||||
DLL_EXPORT tmq_resp_err_t tmq_seek(tmq_t *tmq, const tmq_topic_vgroup_t *offset);
|
|
||||||
#endif
|
|
||||||
|
|
||||||
/* ----------------------TMQ CONFIGURATION INTERFACE---------------------- */
|
/* ----------------------TMQ CONFIGURATION INTERFACE---------------------- */
|
||||||
|
|
||||||
|
|
|
@ -710,6 +710,8 @@ int32_t* taosGetErrno();
|
||||||
//index
|
//index
|
||||||
#define TSDB_CODE_INDEX_REBUILDING TAOS_DEF_ERROR_CODE(0, 0x3200)
|
#define TSDB_CODE_INDEX_REBUILDING TAOS_DEF_ERROR_CODE(0, 0x3200)
|
||||||
|
|
||||||
|
//tmq
|
||||||
|
#define TSDB_CODE_TMQ_INVALID_MSG TAOS_DEF_ERROR_CODE(0, 0x4000)
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -48,14 +48,6 @@ struct tmq_list_t {
|
||||||
SArray container;
|
SArray container;
|
||||||
};
|
};
|
||||||
|
|
||||||
struct tmq_topic_vgroup_t {
|
|
||||||
SMqOffset offset;
|
|
||||||
};
|
|
||||||
|
|
||||||
struct tmq_topic_vgroup_list_t {
|
|
||||||
SArray container; // SArray<tmq_topic_vgroup_t*>
|
|
||||||
};
|
|
||||||
|
|
||||||
struct tmq_conf_t {
|
struct tmq_conf_t {
|
||||||
char clientId[256];
|
char clientId[256];
|
||||||
char groupId[TSDB_CGROUP_LEN];
|
char groupId[TSDB_CGROUP_LEN];
|
||||||
|
@ -161,9 +153,9 @@ typedef struct {
|
||||||
} SMqPollRspWrapper;
|
} SMqPollRspWrapper;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
tmq_t* tmq;
|
tmq_t* tmq;
|
||||||
tsem_t rspSem;
|
tsem_t rspSem;
|
||||||
tmq_resp_err_t rspErr;
|
int32_t rspErr;
|
||||||
} SMqSubscribeCbParam;
|
} SMqSubscribeCbParam;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
@ -189,7 +181,7 @@ typedef struct {
|
||||||
int8_t freeOffsets;
|
int8_t freeOffsets;
|
||||||
tmq_commit_cb* userCb;
|
tmq_commit_cb* userCb;
|
||||||
tsem_t rspSem;
|
tsem_t rspSem;
|
||||||
tmq_resp_err_t rspErr;
|
int32_t rspErr;
|
||||||
SArray* offsets;
|
SArray* offsets;
|
||||||
void* userParam;
|
void* userParam;
|
||||||
} SMqCommitCbParam;
|
} SMqCommitCbParam;
|
||||||
|
@ -201,7 +193,7 @@ typedef struct {
|
||||||
int8_t freeOffsets;
|
int8_t freeOffsets;
|
||||||
int32_t waitingRspNum;
|
int32_t waitingRspNum;
|
||||||
int32_t totalRspNum;
|
int32_t totalRspNum;
|
||||||
tmq_resp_err_t rspErr;
|
int32_t rspErr;
|
||||||
tmq_commit_cb* userCb;
|
tmq_commit_cb* userCb;
|
||||||
/*SArray* successfulOffsets;*/
|
/*SArray* successfulOffsets;*/
|
||||||
/*SArray* failedOffsets;*/
|
/*SArray* failedOffsets;*/
|
||||||
|
@ -347,10 +339,9 @@ int32_t tmqCommitCb(void* param, const SDataBuf* pMsg, int32_t code) {
|
||||||
pParam->rspErr = code;
|
pParam->rspErr = code;
|
||||||
if (pParam->async) {
|
if (pParam->async) {
|
||||||
if (pParam->automatic && pParam->tmq->commitCb) {
|
if (pParam->automatic && pParam->tmq->commitCb) {
|
||||||
pParam->tmq->commitCb(pParam->tmq, pParam->rspErr, (tmq_topic_vgroup_list_t*)pParam->offsets,
|
pParam->tmq->commitCb(pParam->tmq, pParam->rspErr, pParam->tmq->commitCbUserParam);
|
||||||
pParam->tmq->commitCbUserParam);
|
|
||||||
} else if (!pParam->automatic && pParam->userCb) {
|
} else if (!pParam->automatic && pParam->userCb) {
|
||||||
pParam->userCb(pParam->tmq, pParam->rspErr, (tmq_topic_vgroup_list_t*)pParam->offsets, pParam->userParam);
|
pParam->userCb(pParam->tmq, pParam->rspErr, pParam->userParam);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pParam->freeOffsets) {
|
if (pParam->freeOffsets) {
|
||||||
|
@ -388,10 +379,10 @@ int32_t tmqCommitCb2(void* param, const SDataBuf* pBuf, int32_t code) {
|
||||||
if (pParamSet->async) {
|
if (pParamSet->async) {
|
||||||
// call async cb func
|
// call async cb func
|
||||||
if (pParamSet->automatic && pParamSet->tmq->commitCb) {
|
if (pParamSet->automatic && pParamSet->tmq->commitCb) {
|
||||||
pParamSet->tmq->commitCb(pParamSet->tmq, pParamSet->rspErr, NULL, pParamSet->tmq->commitCbUserParam);
|
pParamSet->tmq->commitCb(pParamSet->tmq, pParamSet->rspErr, pParamSet->tmq->commitCbUserParam);
|
||||||
} else if (!pParamSet->automatic && pParamSet->userCb) {
|
} else if (!pParamSet->automatic && pParamSet->userCb) {
|
||||||
// sem post
|
// sem post
|
||||||
pParamSet->userCb(pParamSet->tmq, pParamSet->rspErr, NULL, pParamSet->userParam);
|
pParamSet->userCb(pParamSet->tmq, pParamSet->rspErr, pParamSet->userParam);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
tsem_post(&pParamSet->rspSem);
|
tsem_post(&pParamSet->rspSem);
|
||||||
|
@ -405,10 +396,132 @@ int32_t tmqCommitCb2(void* param, const SDataBuf* pBuf, int32_t code) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tmqCommitInner2(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, int8_t automatic, int8_t async,
|
int32_t tmqCommitInner2(tmq_t* tmq, const TAOS_RES* msg, int8_t automatic, int8_t async, tmq_commit_cb* userCb,
|
||||||
tmq_commit_cb* userCb, void* userParam) {
|
void* userParam) {
|
||||||
int32_t code = -1;
|
int32_t code = -1;
|
||||||
|
|
||||||
|
if (msg != NULL) {
|
||||||
|
SMqRspObj* pRspObj = (SMqRspObj*)msg;
|
||||||
|
if (!TD_RES_TMQ(pRspObj)) {
|
||||||
|
return TSDB_CODE_TMQ_INVALID_MSG;
|
||||||
|
}
|
||||||
|
|
||||||
|
SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet));
|
||||||
|
if (pParamSet == NULL) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
pParamSet->tmq = tmq;
|
||||||
|
pParamSet->automatic = automatic;
|
||||||
|
pParamSet->async = async;
|
||||||
|
pParamSet->freeOffsets = 1;
|
||||||
|
pParamSet->userCb = userCb;
|
||||||
|
pParamSet->userParam = userParam;
|
||||||
|
tsem_init(&pParamSet->rspSem, 0, 0);
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
|
||||||
|
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
|
||||||
|
if (strcmp(pTopic->topicName, pRspObj->topic) == 0) {
|
||||||
|
for (int32_t j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
|
||||||
|
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
|
||||||
|
if (pVg->vgId == pRspObj->vgId) {
|
||||||
|
if (pVg->currentOffset < 0 || pVg->committedOffset == pVg->currentOffset) {
|
||||||
|
tscDebug("consumer %ld skip commit for topic %s vg %d, current offset is %ld, committed offset is %ld",
|
||||||
|
tmq->consumerId, pTopic->topicName, pVg->vgId, pVg->currentOffset, pVg->committedOffset);
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
STqOffset* pOffset = taosMemoryCalloc(1, sizeof(STqOffset));
|
||||||
|
if (pOffset == NULL) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
pOffset->type = TMQ_OFFSET__LOG;
|
||||||
|
pOffset->version = pVg->currentOffset;
|
||||||
|
|
||||||
|
int32_t groupLen = strlen(tmq->groupId);
|
||||||
|
memcpy(pOffset->subKey, tmq->groupId, groupLen);
|
||||||
|
pOffset->subKey[groupLen] = TMQ_SEPARATOR;
|
||||||
|
strcpy(pOffset->subKey + groupLen + 1, pTopic->topicName);
|
||||||
|
|
||||||
|
int32_t len;
|
||||||
|
int32_t code;
|
||||||
|
tEncodeSize(tEncodeSTqOffset, pOffset, len, code);
|
||||||
|
if (code < 0) {
|
||||||
|
ASSERT(0);
|
||||||
|
}
|
||||||
|
void* buf = taosMemoryCalloc(1, sizeof(SMsgHead) + len);
|
||||||
|
((SMsgHead*)buf)->vgId = htonl(pVg->vgId);
|
||||||
|
|
||||||
|
void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
|
||||||
|
|
||||||
|
SEncoder encoder;
|
||||||
|
tEncoderInit(&encoder, abuf, len);
|
||||||
|
tEncodeSTqOffset(&encoder, pOffset);
|
||||||
|
|
||||||
|
// build param
|
||||||
|
SMqCommitCbParam2* pParam = taosMemoryCalloc(1, sizeof(SMqCommitCbParam2));
|
||||||
|
pParam->params = pParamSet;
|
||||||
|
pParam->pOffset = pOffset;
|
||||||
|
|
||||||
|
// build send info
|
||||||
|
SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
|
||||||
|
if (pMsgSendInfo == NULL) {
|
||||||
|
// TODO
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
pMsgSendInfo->msgInfo = (SDataBuf){
|
||||||
|
.pData = buf,
|
||||||
|
.len = sizeof(SMsgHead) + len,
|
||||||
|
.handle = NULL,
|
||||||
|
};
|
||||||
|
|
||||||
|
tscDebug("consumer %ld commit offset of %s on vg %d, offset is %ld", tmq->consumerId, pOffset->subKey,
|
||||||
|
pVg->vgId, pOffset->version);
|
||||||
|
|
||||||
|
// TODO: put into cb
|
||||||
|
pVg->committedOffset = pVg->currentOffset;
|
||||||
|
|
||||||
|
pMsgSendInfo->requestId = generateRequestId();
|
||||||
|
pMsgSendInfo->requestObjRefId = 0;
|
||||||
|
pMsgSendInfo->param = pParam;
|
||||||
|
pMsgSendInfo->fp = tmqCommitCb2;
|
||||||
|
pMsgSendInfo->msgType = TDMT_VND_MQ_COMMIT_OFFSET;
|
||||||
|
// send msg
|
||||||
|
|
||||||
|
int64_t transporterId = 0;
|
||||||
|
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, pMsgSendInfo);
|
||||||
|
pParamSet->waitingRspNum++;
|
||||||
|
pParamSet->totalRspNum++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (pParamSet->totalRspNum == 0) {
|
||||||
|
tsem_destroy(&pParamSet->rspSem);
|
||||||
|
taosMemoryFree(pParamSet);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!async) {
|
||||||
|
tsem_wait(&pParamSet->rspSem);
|
||||||
|
code = pParamSet->rspErr;
|
||||||
|
tsem_destroy(&pParamSet->rspSem);
|
||||||
|
} else {
|
||||||
|
code = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (code != 0 && async) {
|
||||||
|
if (automatic) {
|
||||||
|
tmq->commitCb(tmq, code, tmq->commitCbUserParam);
|
||||||
|
} else {
|
||||||
|
userCb(tmq, code, userParam);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet));
|
SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet));
|
||||||
if (pParamSet == NULL) {
|
if (pParamSet == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
@ -521,9 +634,9 @@ int32_t tmqCommitInner2(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, int8
|
||||||
|
|
||||||
if (code != 0 && async) {
|
if (code != 0 && async) {
|
||||||
if (automatic) {
|
if (automatic) {
|
||||||
tmq->commitCb(tmq, code, NULL, tmq->commitCbUserParam);
|
tmq->commitCb(tmq, code, tmq->commitCbUserParam);
|
||||||
} else {
|
} else {
|
||||||
userCb(tmq, code, NULL, userParam);
|
userCb(tmq, code, userParam);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -537,7 +650,8 @@ int32_t tmqCommitInner2(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, int8
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tmqCommitInner(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, int8_t automatic, int8_t async,
|
#if 0
|
||||||
|
int32_t tmqCommitInner(tmq_t* tmq, const TAOS_RES* msg, int8_t automatic, int8_t async,
|
||||||
tmq_commit_cb* userCb, void* userParam) {
|
tmq_commit_cb* userCb, void* userParam) {
|
||||||
SMqCMCommitOffsetReq req;
|
SMqCMCommitOffsetReq req;
|
||||||
SArray* pOffsets = NULL;
|
SArray* pOffsets = NULL;
|
||||||
|
@ -547,7 +661,7 @@ int32_t tmqCommitInner(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, int8_
|
||||||
int8_t freeOffsets;
|
int8_t freeOffsets;
|
||||||
int32_t code = -1;
|
int32_t code = -1;
|
||||||
|
|
||||||
if (offsets == NULL) {
|
if (msg == NULL) {
|
||||||
freeOffsets = 1;
|
freeOffsets = 1;
|
||||||
pOffsets = taosArrayInit(0, sizeof(SMqOffset));
|
pOffsets = taosArrayInit(0, sizeof(SMqOffset));
|
||||||
for (int32_t i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
|
for (int32_t i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
|
||||||
|
@ -564,7 +678,7 @@ int32_t tmqCommitInner(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, int8_
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
freeOffsets = 0;
|
freeOffsets = 0;
|
||||||
pOffsets = (SArray*)&offsets->container;
|
pOffsets = (SArray*)&msg->container;
|
||||||
}
|
}
|
||||||
|
|
||||||
req.num = (int32_t)pOffsets->size;
|
req.num = (int32_t)pOffsets->size;
|
||||||
|
@ -651,6 +765,7 @@ END:
|
||||||
}
|
}
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
void tmqAssignDelayedHbTask(void* param, void* tmrId) {
|
void tmqAssignDelayedHbTask(void* param, void* tmrId) {
|
||||||
tmq_t* tmq = (tmq_t*)param;
|
tmq_t* tmq = (tmq_t*)param;
|
||||||
|
@ -729,7 +844,7 @@ int32_t tmqSubscribeCb(void* param, const SDataBuf* pMsg, int32_t code) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
tmq_resp_err_t tmq_subscription(tmq_t* tmq, tmq_list_t** topics) {
|
int32_t tmq_subscription(tmq_t* tmq, tmq_list_t** topics) {
|
||||||
if (*topics == NULL) {
|
if (*topics == NULL) {
|
||||||
*topics = tmq_list_new();
|
*topics = tmq_list_new();
|
||||||
}
|
}
|
||||||
|
@ -737,12 +852,12 @@ tmq_resp_err_t tmq_subscription(tmq_t* tmq, tmq_list_t** topics) {
|
||||||
SMqClientTopic* topic = taosArrayGet(tmq->clientTopics, i);
|
SMqClientTopic* topic = taosArrayGet(tmq->clientTopics, i);
|
||||||
tmq_list_append(*topics, strchr(topic->topicName, '.') + 1);
|
tmq_list_append(*topics, strchr(topic->topicName, '.') + 1);
|
||||||
}
|
}
|
||||||
return TMQ_RESP_ERR__SUCCESS;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
tmq_resp_err_t tmq_unsubscribe(tmq_t* tmq) {
|
int32_t tmq_unsubscribe(tmq_t* tmq) {
|
||||||
tmq_list_t* lst = tmq_list_new();
|
tmq_list_t* lst = tmq_list_new();
|
||||||
tmq_resp_err_t rsp = tmq_subscribe(tmq, lst);
|
int32_t rsp = tmq_subscribe(tmq, lst);
|
||||||
tmq_list_destroy(lst);
|
tmq_list_destroy(lst);
|
||||||
return rsp;
|
return rsp;
|
||||||
}
|
}
|
||||||
|
@ -858,11 +973,13 @@ FAIL:
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, int32_t async) {
|
#if 0
|
||||||
|
int32_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, int32_t async) {
|
||||||
return tmqCommitInner2(tmq, offsets, 0, async, tmq->commitCb, tmq->commitCbUserParam);
|
return tmqCommitInner2(tmq, offsets, 0, async, tmq->commitCb, tmq->commitCbUserParam);
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
tmq_resp_err_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
|
int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
|
||||||
const SArray* container = &topic_list->container;
|
const SArray* container = &topic_list->container;
|
||||||
int32_t sz = taosArrayGetSize(container);
|
int32_t sz = taosArrayGetSize(container);
|
||||||
void* buf = NULL;
|
void* buf = NULL;
|
||||||
|
@ -902,7 +1019,7 @@ tmq_resp_err_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
|
||||||
if (sendInfo == NULL) goto FAIL;
|
if (sendInfo == NULL) goto FAIL;
|
||||||
|
|
||||||
SMqSubscribeCbParam param = {
|
SMqSubscribeCbParam param = {
|
||||||
.rspErr = TMQ_RESP_ERR__SUCCESS,
|
.rspErr = 0,
|
||||||
.tmq = tmq,
|
.tmq = tmq,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -1337,7 +1454,8 @@ int32_t tmqAskEp(tmq_t* tmq, bool async) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
tmq_resp_err_t tmq_seek(tmq_t* tmq, const tmq_topic_vgroup_t* offset) {
|
#if 0
|
||||||
|
int32_t tmq_seek(tmq_t* tmq, const tmq_topic_vgroup_t* offset) {
|
||||||
const SMqOffset* pOffset = &offset->offset;
|
const SMqOffset* pOffset = &offset->offset;
|
||||||
if (strcmp(pOffset->cgroup, tmq->groupId) != 0) {
|
if (strcmp(pOffset->cgroup, tmq->groupId) != 0) {
|
||||||
return TMQ_RESP_ERR__FAIL;
|
return TMQ_RESP_ERR__FAIL;
|
||||||
|
@ -1359,6 +1477,7 @@ tmq_resp_err_t tmq_seek(tmq_t* tmq, const tmq_topic_vgroup_t* offset) {
|
||||||
}
|
}
|
||||||
return TMQ_RESP_ERR__FAIL;
|
return TMQ_RESP_ERR__FAIL;
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
SMqPollReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t timeout, SMqClientTopic* pTopic, SMqClientVg* pVg) {
|
SMqPollReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t timeout, SMqClientTopic* pTopic, SMqClientVg* pVg) {
|
||||||
int64_t reqOffset;
|
int64_t reqOffset;
|
||||||
|
@ -1599,10 +1718,10 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
tmq_resp_err_t tmq_consumer_close(tmq_t* tmq) {
|
int32_t tmq_consumer_close(tmq_t* tmq) {
|
||||||
if (tmq->status == TMQ_CONSUMER_STATUS__READY) {
|
if (tmq->status == TMQ_CONSUMER_STATUS__READY) {
|
||||||
tmq_resp_err_t rsp = tmq_commit_sync(tmq, NULL);
|
int32_t rsp = tmq_commit_sync(tmq, NULL);
|
||||||
if (rsp != TMQ_RESP_ERR__SUCCESS) {
|
if (rsp != 0) {
|
||||||
return rsp;
|
return rsp;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1610,18 +1729,18 @@ tmq_resp_err_t tmq_consumer_close(tmq_t* tmq) {
|
||||||
rsp = tmq_subscribe(tmq, lst);
|
rsp = tmq_subscribe(tmq, lst);
|
||||||
tmq_list_destroy(lst);
|
tmq_list_destroy(lst);
|
||||||
|
|
||||||
if (rsp != TMQ_RESP_ERR__SUCCESS) {
|
if (rsp != 0) {
|
||||||
return rsp;
|
return rsp;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// TODO: free resources
|
// TODO: free resources
|
||||||
return TMQ_RESP_ERR__SUCCESS;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
const char* tmq_err2str(tmq_resp_err_t err) {
|
const char* tmq_err2str(int32_t err) {
|
||||||
if (err == TMQ_RESP_ERR__SUCCESS) {
|
if (err == 0) {
|
||||||
return "success";
|
return "success";
|
||||||
} else if (err == TMQ_RESP_ERR__FAIL) {
|
} else if (err == -1) {
|
||||||
return "fail";
|
return "fail";
|
||||||
} else {
|
} else {
|
||||||
return tstrerror(err);
|
return tstrerror(err);
|
||||||
|
@ -1667,10 +1786,8 @@ const char* tmq_get_table_name(TAOS_RES* res) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
void tmq_commit_async(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, tmq_commit_cb* cb, void* param) {
|
void tmq_commit_async(tmq_t* tmq, const TAOS_RES* msg, tmq_commit_cb* cb, void* param) {
|
||||||
tmqCommitInner2(tmq, offsets, 0, 1, cb, param);
|
tmqCommitInner2(tmq, msg, 0, 1, cb, param);
|
||||||
}
|
}
|
||||||
|
|
||||||
tmq_resp_err_t tmq_commit_sync(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets) {
|
int32_t tmq_commit_sync(tmq_t* tmq, const TAOS_RES* msg) { return tmqCommitInner2(tmq, msg, 0, 0, NULL, NULL); }
|
||||||
return tmqCommitInner2(tmq, offsets, 0, 0, NULL, NULL);
|
|
||||||
}
|
|
||||||
|
|
|
@ -583,6 +583,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TQ_NO_COMMITTED_OFFSET, "No committed offset
|
||||||
|
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_INDEX_REBUILDING, "Index is rebuilding")
|
TAOS_DEFINE_ERROR(TSDB_CODE_INDEX_REBUILDING, "Index is rebuilding")
|
||||||
|
|
||||||
|
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_INVALID_MSG, "Invalid message")
|
||||||
|
|
||||||
#ifdef TAOS_ERROR_C
|
#ifdef TAOS_ERROR_C
|
||||||
};
|
};
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -353,8 +353,8 @@ tmq_list_t* build_topic_list() {
|
||||||
void sync_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
|
void sync_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
|
||||||
static const int MIN_COMMIT_COUNT = 1000;
|
static const int MIN_COMMIT_COUNT = 1000;
|
||||||
|
|
||||||
int msg_count = 0;
|
int msg_count = 0;
|
||||||
tmq_resp_err_t err;
|
int32_t err;
|
||||||
|
|
||||||
if ((err = tmq_subscribe(tmq, topics))) {
|
if ((err = tmq_subscribe(tmq, topics))) {
|
||||||
fprintf(stderr, "%% Failed to start consuming topics: %s\n", tmq_err2str(err));
|
fprintf(stderr, "%% Failed to start consuming topics: %s\n", tmq_err2str(err));
|
||||||
|
@ -379,7 +379,7 @@ void sync_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void perf_loop(tmq_t* tmq, tmq_list_t* topics, int32_t totalMsgs, int64_t walLogSize) {
|
void perf_loop(tmq_t* tmq, tmq_list_t* topics, int32_t totalMsgs, int64_t walLogSize) {
|
||||||
tmq_resp_err_t err;
|
int32_t err;
|
||||||
|
|
||||||
if ((err = tmq_subscribe(tmq, topics))) {
|
if ((err = tmq_subscribe(tmq, topics))) {
|
||||||
fprintf(stderr, "%% Failed to start consuming topics: %s\n", tmq_err2str(err));
|
fprintf(stderr, "%% Failed to start consuming topics: %s\n", tmq_err2str(err));
|
||||||
|
|
|
@ -339,8 +339,8 @@ int queryDB(TAOS* taos, char* command) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void tmq_commit_cb_print(tmq_t* tmq, tmq_resp_err_t resp, tmq_topic_vgroup_list_t* offsets, void* param) {
|
static void tmq_commit_cb_print(tmq_t* tmq, int32_t code, void* param) {
|
||||||
pError("tmq_commit_cb_print() commit %d\n", resp);
|
pError("tmq_commit_cb_print() commit %d\n", code);
|
||||||
}
|
}
|
||||||
|
|
||||||
void build_consumer(SThreadInfo* pInfo) {
|
void build_consumer(SThreadInfo* pInfo) {
|
||||||
|
@ -443,7 +443,7 @@ int32_t saveConsumeResult(SThreadInfo* pInfo) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void loop_consume(SThreadInfo* pInfo) {
|
void loop_consume(SThreadInfo* pInfo) {
|
||||||
tmq_resp_err_t err;
|
int32_t code;
|
||||||
|
|
||||||
int64_t totalMsgs = 0;
|
int64_t totalMsgs = 0;
|
||||||
int64_t totalRows = 0;
|
int64_t totalRows = 0;
|
||||||
|
@ -496,8 +496,8 @@ void* consumeThreadFunc(void* param) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
tmq_resp_err_t err = tmq_subscribe(pInfo->tmq, pInfo->topicList);
|
int32_t err = tmq_subscribe(pInfo->tmq, pInfo->topicList);
|
||||||
if (err) {
|
if (err != 0) {
|
||||||
pError("tmq_subscribe() fail, reason: %s\n", tmq_err2str(err));
|
pError("tmq_subscribe() fail, reason: %s\n", tmq_err2str(err));
|
||||||
exit(-1);
|
exit(-1);
|
||||||
}
|
}
|
||||||
|
@ -517,14 +517,14 @@ void* consumeThreadFunc(void* param) {
|
||||||
}
|
}
|
||||||
|
|
||||||
err = tmq_unsubscribe(pInfo->tmq);
|
err = tmq_unsubscribe(pInfo->tmq);
|
||||||
if (err) {
|
if (err != 0) {
|
||||||
pError("tmq_unsubscribe() fail, reason: %s\n", tmq_err2str(err));
|
pError("tmq_unsubscribe() fail, reason: %s\n", tmq_err2str(err));
|
||||||
/*pInfo->consumeMsgCnt = -1;*/
|
/*pInfo->consumeMsgCnt = -1;*/
|
||||||
/*return NULL;*/
|
/*return NULL;*/
|
||||||
}
|
}
|
||||||
|
|
||||||
err = tmq_consumer_close(pInfo->tmq);
|
err = tmq_consumer_close(pInfo->tmq);
|
||||||
if (err) {
|
if (err != 0) {
|
||||||
pError("tmq_consumer_close() fail, reason: %s\n", tmq_err2str(err));
|
pError("tmq_consumer_close() fail, reason: %s\n", tmq_err2str(err));
|
||||||
/*exit(-1);*/
|
/*exit(-1);*/
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue