Merge pull request #13919 from taosdata/feature/stream
refactor(tmq): offset storage
This commit is contained in:
commit
4958fc066f
|
@ -47,7 +47,7 @@ int32_t init_env() {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
TAOS_RES* pRes = taos_query(pConn, "create database if not exists abc1 vgroups 2");
|
TAOS_RES* pRes = taos_query(pConn, "create database if not exists abc1 vgroups 1");
|
||||||
if (taos_errno(pRes) != 0) {
|
if (taos_errno(pRes) != 0) {
|
||||||
printf("error in create db, reason:%s\n", taos_errstr(pRes));
|
printf("error in create db, reason:%s\n", taos_errstr(pRes));
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -167,7 +167,7 @@ tmq_t* build_consumer() {
|
||||||
tmq_conf_set(conf, "td.connect.user", "root");
|
tmq_conf_set(conf, "td.connect.user", "root");
|
||||||
tmq_conf_set(conf, "td.connect.pass", "taosdata");
|
tmq_conf_set(conf, "td.connect.pass", "taosdata");
|
||||||
tmq_conf_set(conf, "msg.with.table.name", "true");
|
tmq_conf_set(conf, "msg.with.table.name", "true");
|
||||||
tmq_conf_set(conf, "enable.auto.commit", "false");
|
tmq_conf_set(conf, "enable.auto.commit", "true");
|
||||||
tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL);
|
tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL);
|
||||||
tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
|
tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
|
||||||
assert(tmq);
|
assert(tmq);
|
||||||
|
@ -201,6 +201,7 @@ void basic_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
|
||||||
taos_free_result(tmqmessage);
|
taos_free_result(tmqmessage);
|
||||||
/*} else {*/
|
/*} else {*/
|
||||||
/*break;*/
|
/*break;*/
|
||||||
|
/*tmq_commit_sync(tmq, NULL);*/
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -239,7 +240,7 @@ void sync_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
|
||||||
msg_process(tmqmessage);
|
msg_process(tmqmessage);
|
||||||
taos_free_result(tmqmessage);
|
taos_free_result(tmqmessage);
|
||||||
|
|
||||||
/*tmq_commit_async(tmq, NULL, tmq_commit_cb_print, NULL);*/
|
/*tmq_commit_sync(tmq, NULL);*/
|
||||||
/*if ((++msg_count % MIN_COMMIT_COUNT) == 0) tmq_commit(tmq, NULL, 0);*/
|
/*if ((++msg_count % MIN_COMMIT_COUNT) == 0) tmq_commit(tmq, NULL, 0);*/
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -126,47 +126,47 @@ typedef struct setConfRet {
|
||||||
char retMsg[RET_MSG_LENGTH];
|
char retMsg[RET_MSG_LENGTH];
|
||||||
} setConfRet;
|
} setConfRet;
|
||||||
|
|
||||||
DLL_EXPORT void taos_cleanup(void);
|
DLL_EXPORT void taos_cleanup(void);
|
||||||
DLL_EXPORT int taos_options(TSDB_OPTION option, const void *arg, ...);
|
DLL_EXPORT int taos_options(TSDB_OPTION option, const void *arg, ...);
|
||||||
DLL_EXPORT setConfRet taos_set_config(const char *config);
|
DLL_EXPORT setConfRet taos_set_config(const char *config);
|
||||||
DLL_EXPORT int taos_init(void);
|
DLL_EXPORT int taos_init(void);
|
||||||
DLL_EXPORT TAOS *taos_connect(const char *ip, const char *user, const char *pass, const char *db, uint16_t port);
|
DLL_EXPORT TAOS *taos_connect(const char *ip, const char *user, const char *pass, const char *db, uint16_t port);
|
||||||
DLL_EXPORT TAOS *taos_connect_auth(const char *ip, const char *user, const char *auth, const char *db, uint16_t port);
|
DLL_EXPORT TAOS *taos_connect_auth(const char *ip, const char *user, const char *auth, const char *db, uint16_t port);
|
||||||
DLL_EXPORT void taos_close(TAOS *taos);
|
DLL_EXPORT void taos_close(TAOS *taos);
|
||||||
|
|
||||||
const char *taos_data_type(int type);
|
const char *taos_data_type(int type);
|
||||||
|
|
||||||
DLL_EXPORT TAOS_STMT *taos_stmt_init(TAOS *taos);
|
DLL_EXPORT TAOS_STMT *taos_stmt_init(TAOS *taos);
|
||||||
DLL_EXPORT int taos_stmt_prepare(TAOS_STMT *stmt, const char *sql, unsigned long length);
|
DLL_EXPORT int taos_stmt_prepare(TAOS_STMT *stmt, const char *sql, unsigned long length);
|
||||||
DLL_EXPORT int taos_stmt_set_tbname_tags(TAOS_STMT *stmt, const char *name, TAOS_MULTI_BIND *tags);
|
DLL_EXPORT int taos_stmt_set_tbname_tags(TAOS_STMT *stmt, const char *name, TAOS_MULTI_BIND *tags);
|
||||||
DLL_EXPORT int taos_stmt_set_tbname(TAOS_STMT *stmt, const char *name);
|
DLL_EXPORT int taos_stmt_set_tbname(TAOS_STMT *stmt, const char *name);
|
||||||
DLL_EXPORT int taos_stmt_set_tags(TAOS_STMT *stmt, TAOS_MULTI_BIND *tags);
|
DLL_EXPORT int taos_stmt_set_tags(TAOS_STMT *stmt, TAOS_MULTI_BIND *tags);
|
||||||
DLL_EXPORT int taos_stmt_set_sub_tbname(TAOS_STMT *stmt, const char *name);
|
DLL_EXPORT int taos_stmt_set_sub_tbname(TAOS_STMT *stmt, const char *name);
|
||||||
DLL_EXPORT int taos_stmt_get_tag_fields(TAOS_STMT *stmt, int *fieldNum, TAOS_FIELD_E **fields);
|
DLL_EXPORT int taos_stmt_get_tag_fields(TAOS_STMT *stmt, int *fieldNum, TAOS_FIELD_E **fields);
|
||||||
DLL_EXPORT int taos_stmt_get_col_fields(TAOS_STMT *stmt, int *fieldNum, TAOS_FIELD_E **fields);
|
DLL_EXPORT int taos_stmt_get_col_fields(TAOS_STMT *stmt, int *fieldNum, TAOS_FIELD_E **fields);
|
||||||
|
|
||||||
DLL_EXPORT int taos_stmt_is_insert(TAOS_STMT *stmt, int *insert);
|
DLL_EXPORT int taos_stmt_is_insert(TAOS_STMT *stmt, int *insert);
|
||||||
DLL_EXPORT int taos_stmt_num_params(TAOS_STMT *stmt, int *nums);
|
DLL_EXPORT int taos_stmt_num_params(TAOS_STMT *stmt, int *nums);
|
||||||
DLL_EXPORT int taos_stmt_get_param(TAOS_STMT *stmt, int idx, int *type, int *bytes);
|
DLL_EXPORT int taos_stmt_get_param(TAOS_STMT *stmt, int idx, int *type, int *bytes);
|
||||||
DLL_EXPORT int taos_stmt_bind_param(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind);
|
DLL_EXPORT int taos_stmt_bind_param(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind);
|
||||||
DLL_EXPORT int taos_stmt_bind_param_batch(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind);
|
DLL_EXPORT int taos_stmt_bind_param_batch(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind);
|
||||||
DLL_EXPORT int taos_stmt_bind_single_param_batch(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind, int colIdx);
|
DLL_EXPORT int taos_stmt_bind_single_param_batch(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind, int colIdx);
|
||||||
DLL_EXPORT int taos_stmt_add_batch(TAOS_STMT *stmt);
|
DLL_EXPORT int taos_stmt_add_batch(TAOS_STMT *stmt);
|
||||||
DLL_EXPORT int taos_stmt_execute(TAOS_STMT *stmt);
|
DLL_EXPORT int taos_stmt_execute(TAOS_STMT *stmt);
|
||||||
DLL_EXPORT TAOS_RES *taos_stmt_use_result(TAOS_STMT *stmt);
|
DLL_EXPORT TAOS_RES *taos_stmt_use_result(TAOS_STMT *stmt);
|
||||||
DLL_EXPORT int taos_stmt_close(TAOS_STMT *stmt);
|
DLL_EXPORT int taos_stmt_close(TAOS_STMT *stmt);
|
||||||
DLL_EXPORT char *taos_stmt_errstr(TAOS_STMT *stmt);
|
DLL_EXPORT char *taos_stmt_errstr(TAOS_STMT *stmt);
|
||||||
DLL_EXPORT int taos_stmt_affected_rows(TAOS_STMT *stmt);
|
DLL_EXPORT int taos_stmt_affected_rows(TAOS_STMT *stmt);
|
||||||
DLL_EXPORT int taos_stmt_affected_rows_once(TAOS_STMT *stmt);
|
DLL_EXPORT int taos_stmt_affected_rows_once(TAOS_STMT *stmt);
|
||||||
|
|
||||||
DLL_EXPORT TAOS_RES *taos_query(TAOS *taos, const char *sql);
|
DLL_EXPORT TAOS_RES *taos_query(TAOS *taos, const char *sql);
|
||||||
|
|
||||||
DLL_EXPORT TAOS_ROW taos_fetch_row(TAOS_RES *res);
|
DLL_EXPORT TAOS_ROW taos_fetch_row(TAOS_RES *res);
|
||||||
DLL_EXPORT int taos_result_precision(TAOS_RES *res); // get the time precision of result
|
DLL_EXPORT int taos_result_precision(TAOS_RES *res); // get the time precision of result
|
||||||
DLL_EXPORT void taos_free_result(TAOS_RES *res);
|
DLL_EXPORT void taos_free_result(TAOS_RES *res);
|
||||||
DLL_EXPORT int taos_field_count(TAOS_RES *res);
|
DLL_EXPORT int taos_field_count(TAOS_RES *res);
|
||||||
DLL_EXPORT int taos_num_fields(TAOS_RES *res);
|
DLL_EXPORT int taos_num_fields(TAOS_RES *res);
|
||||||
DLL_EXPORT int taos_affected_rows(TAOS_RES *res);
|
DLL_EXPORT int taos_affected_rows(TAOS_RES *res);
|
||||||
|
|
||||||
DLL_EXPORT TAOS_FIELD *taos_fetch_fields(TAOS_RES *res);
|
DLL_EXPORT TAOS_FIELD *taos_fetch_fields(TAOS_RES *res);
|
||||||
DLL_EXPORT int taos_select_db(TAOS *taos, const char *db);
|
DLL_EXPORT int taos_select_db(TAOS *taos, const char *db);
|
||||||
|
@ -181,8 +181,8 @@ DLL_EXPORT int *taos_get_column_data_offset(TAOS_RES *res, int columnInde
|
||||||
DLL_EXPORT int taos_validate_sql(TAOS *taos, const char *sql);
|
DLL_EXPORT int taos_validate_sql(TAOS *taos, const char *sql);
|
||||||
DLL_EXPORT void taos_reset_current_db(TAOS *taos);
|
DLL_EXPORT void taos_reset_current_db(TAOS *taos);
|
||||||
|
|
||||||
DLL_EXPORT int *taos_fetch_lengths(TAOS_RES *res);
|
DLL_EXPORT int *taos_fetch_lengths(TAOS_RES *res);
|
||||||
DLL_EXPORT TAOS_ROW *taos_result_block(TAOS_RES *res);
|
DLL_EXPORT TAOS_ROW *taos_result_block(TAOS_RES *res);
|
||||||
|
|
||||||
DLL_EXPORT const char *taos_get_server_info(TAOS *taos);
|
DLL_EXPORT const char *taos_get_server_info(TAOS *taos);
|
||||||
DLL_EXPORT const char *taos_get_client_info();
|
DLL_EXPORT const char *taos_get_client_info();
|
||||||
|
@ -192,8 +192,8 @@ DLL_EXPORT int taos_errno(TAOS_RES *tres);
|
||||||
|
|
||||||
DLL_EXPORT void taos_query_a(TAOS *taos, const char *sql, __taos_async_fn_t fp, void *param);
|
DLL_EXPORT void taos_query_a(TAOS *taos, const char *sql, __taos_async_fn_t fp, void *param);
|
||||||
DLL_EXPORT void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, void *param);
|
DLL_EXPORT void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, void *param);
|
||||||
DLL_EXPORT void taos_fetch_raw_block_a(TAOS_RES* res, __taos_async_fn_t fp, void* param);
|
DLL_EXPORT void taos_fetch_raw_block_a(TAOS_RES *res, __taos_async_fn_t fp, void *param);
|
||||||
DLL_EXPORT const void *taos_get_raw_block(TAOS_RES* res);
|
DLL_EXPORT const void *taos_get_raw_block(TAOS_RES *res);
|
||||||
|
|
||||||
// Shuduo: temporary enable for app build
|
// Shuduo: temporary enable for app build
|
||||||
#if 1
|
#if 1
|
||||||
|
@ -240,11 +240,10 @@ DLL_EXPORT const char *tmq_err2str(tmq_resp_err_t);
|
||||||
DLL_EXPORT tmq_resp_err_t tmq_subscribe(tmq_t *tmq, const tmq_list_t *topic_list);
|
DLL_EXPORT tmq_resp_err_t tmq_subscribe(tmq_t *tmq, const tmq_list_t *topic_list);
|
||||||
DLL_EXPORT tmq_resp_err_t tmq_unsubscribe(tmq_t *tmq);
|
DLL_EXPORT tmq_resp_err_t tmq_unsubscribe(tmq_t *tmq);
|
||||||
DLL_EXPORT tmq_resp_err_t tmq_subscription(tmq_t *tmq, tmq_list_t **topics);
|
DLL_EXPORT tmq_resp_err_t tmq_subscription(tmq_t *tmq, tmq_list_t **topics);
|
||||||
|
|
||||||
// timeout: -1 means infinitely waiting
|
// timeout: -1 means infinitely waiting
|
||||||
DLL_EXPORT TAOS_RES *tmq_consumer_poll(tmq_t *tmq, int64_t timeout);
|
DLL_EXPORT TAOS_RES *tmq_consumer_poll(tmq_t *tmq, int64_t timeout);
|
||||||
|
|
||||||
DLL_EXPORT tmq_resp_err_t tmq_consumer_close(tmq_t *tmq);
|
DLL_EXPORT tmq_resp_err_t tmq_consumer_close(tmq_t *tmq);
|
||||||
|
|
||||||
DLL_EXPORT tmq_resp_err_t tmq_commit_sync(tmq_t *tmq, const tmq_topic_vgroup_list_t *offsets);
|
DLL_EXPORT tmq_resp_err_t tmq_commit_sync(tmq_t *tmq, const tmq_topic_vgroup_list_t *offsets);
|
||||||
DLL_EXPORT void tmq_commit_async(tmq_t *tmq, const tmq_topic_vgroup_list_t *offsets, tmq_commit_cb *cb, void *param);
|
DLL_EXPORT void tmq_commit_async(tmq_t *tmq, const tmq_topic_vgroup_list_t *offsets, tmq_commit_cb *cb, void *param);
|
||||||
|
|
||||||
|
@ -275,11 +274,6 @@ DLL_EXPORT const char *tmq_get_db_name(TAOS_RES *res);
|
||||||
DLL_EXPORT int32_t tmq_get_vgroup_id(TAOS_RES *res);
|
DLL_EXPORT int32_t tmq_get_vgroup_id(TAOS_RES *res);
|
||||||
DLL_EXPORT const char *tmq_get_table_name(TAOS_RES *res);
|
DLL_EXPORT const char *tmq_get_table_name(TAOS_RES *res);
|
||||||
|
|
||||||
#if 0
|
|
||||||
DLL_EXPORT int64_t tmq_get_request_offset(tmq_message_t *message);
|
|
||||||
DLL_EXPORT int64_t tmq_get_response_offset(tmq_message_t *message);
|
|
||||||
#endif
|
|
||||||
|
|
||||||
/* ------------------------------ TMQ END -------------------------------- */
|
/* ------------------------------ TMQ END -------------------------------- */
|
||||||
|
|
||||||
#if 1 // Shuduo: temporary enable for app build
|
#if 1 // Shuduo: temporary enable for app build
|
||||||
|
|
|
@ -35,6 +35,7 @@ enum {
|
||||||
TMQ_MSG_TYPE__DUMMY = 0,
|
TMQ_MSG_TYPE__DUMMY = 0,
|
||||||
TMQ_MSG_TYPE__POLL_RSP,
|
TMQ_MSG_TYPE__POLL_RSP,
|
||||||
TMQ_MSG_TYPE__EP_RSP,
|
TMQ_MSG_TYPE__EP_RSP,
|
||||||
|
TMQ_MSG_TYPE__END_RSP,
|
||||||
};
|
};
|
||||||
|
|
||||||
typedef enum EStreamType {
|
typedef enum EStreamType {
|
||||||
|
|
|
@ -429,7 +429,8 @@ int32_t* taosGetErrno();
|
||||||
#define TSDB_CODE_TQ_META_KEY_NOT_IN_TXN TAOS_DEF_ERROR_CODE(0, 0x0A09)
|
#define TSDB_CODE_TQ_META_KEY_NOT_IN_TXN TAOS_DEF_ERROR_CODE(0, 0x0A09)
|
||||||
#define TSDB_CODE_TQ_META_KEY_DUP_IN_TXN TAOS_DEF_ERROR_CODE(0, 0x0A0A)
|
#define TSDB_CODE_TQ_META_KEY_DUP_IN_TXN TAOS_DEF_ERROR_CODE(0, 0x0A0A)
|
||||||
#define TSDB_CODE_TQ_GROUP_NOT_SET TAOS_DEF_ERROR_CODE(0, 0x0A0B)
|
#define TSDB_CODE_TQ_GROUP_NOT_SET TAOS_DEF_ERROR_CODE(0, 0x0A0B)
|
||||||
#define TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND TAOS_DEF_ERROR_CODE(0, 0x0A0B)
|
#define TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND TAOS_DEF_ERROR_CODE(0, 0x0A0C)
|
||||||
|
#define TSDB_CODE_TQ_NO_COMMITTED_OFFSET TAOS_DEF_ERROR_CODE(0, 0x0A0D)
|
||||||
|
|
||||||
// wal
|
// wal
|
||||||
#define TSDB_CODE_WAL_APP_ERROR TAOS_DEF_ERROR_CODE(0, 0x1000)
|
#define TSDB_CODE_WAL_APP_ERROR TAOS_DEF_ERROR_CODE(0, 0x1000)
|
||||||
|
|
|
@ -203,10 +203,10 @@ typedef struct {
|
||||||
int32_t totalRspNum;
|
int32_t totalRspNum;
|
||||||
tmq_resp_err_t rspErr;
|
tmq_resp_err_t rspErr;
|
||||||
tmq_commit_cb* userCb;
|
tmq_commit_cb* userCb;
|
||||||
SArray* successfulOffsets;
|
/*SArray* successfulOffsets;*/
|
||||||
SArray* failedOffsets;
|
/*SArray* failedOffsets;*/
|
||||||
void* userParam;
|
void* userParam;
|
||||||
tsem_t rspSem;
|
tsem_t rspSem;
|
||||||
} SMqCommitCbParamSet;
|
} SMqCommitCbParamSet;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
@ -368,11 +368,16 @@ int32_t tmqCommitCb2(void* param, const SDataBuf* pBuf, int32_t code) {
|
||||||
SMqCommitCbParam2* pParam = (SMqCommitCbParam2*)param;
|
SMqCommitCbParam2* pParam = (SMqCommitCbParam2*)param;
|
||||||
SMqCommitCbParamSet* pParamSet = (SMqCommitCbParamSet*)pParam->params;
|
SMqCommitCbParamSet* pParamSet = (SMqCommitCbParamSet*)pParam->params;
|
||||||
// push into array
|
// push into array
|
||||||
|
#if 0
|
||||||
if (code == 0) {
|
if (code == 0) {
|
||||||
taosArrayPush(pParamSet->failedOffsets, &pParam->pOffset);
|
taosArrayPush(pParamSet->failedOffsets, &pParam->pOffset);
|
||||||
} else {
|
} else {
|
||||||
taosArrayPush(pParamSet->successfulOffsets, &pParam->pOffset);
|
taosArrayPush(pParamSet->successfulOffsets, &pParam->pOffset);
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
/*tscDebug("receive offset commit cb of %s on vg %d, offset is %ld", pParam->pOffset->subKey, pParam->->vgId,
|
||||||
|
* pOffset->version);*/
|
||||||
|
|
||||||
// count down waiting rsp
|
// count down waiting rsp
|
||||||
int32_t waitingRspNum = atomic_sub_fetch_32(&pParamSet->waitingRspNum, 1);
|
int32_t waitingRspNum = atomic_sub_fetch_32(&pParamSet->waitingRspNum, 1);
|
||||||
|
@ -388,10 +393,14 @@ int32_t tmqCommitCb2(void* param, const SDataBuf* pBuf, int32_t code) {
|
||||||
// sem post
|
// sem post
|
||||||
pParamSet->userCb(pParamSet->tmq, pParamSet->rspErr, NULL, pParamSet->userParam);
|
pParamSet->userCb(pParamSet->tmq, pParamSet->rspErr, NULL, pParamSet->userParam);
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
tsem_post(&pParamSet->rspSem);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#if 0
|
||||||
taosArrayDestroyP(pParamSet->successfulOffsets, taosMemoryFree);
|
taosArrayDestroyP(pParamSet->successfulOffsets, taosMemoryFree);
|
||||||
taosArrayDestroyP(pParamSet->failedOffsets, taosMemoryFree);
|
taosArrayDestroyP(pParamSet->failedOffsets, taosMemoryFree);
|
||||||
|
#endif
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -415,17 +424,35 @@ int32_t tmqCommitInner2(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, int8
|
||||||
|
|
||||||
for (int32_t i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
|
for (int32_t i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
|
||||||
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
|
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
|
||||||
|
|
||||||
|
tscDebug("consumer %ld begin commit for topic %s, vgNum %d", tmq->consumerId, pTopic->topicName,
|
||||||
|
(int32_t)taosArrayGetSize(pTopic->vgs));
|
||||||
|
|
||||||
for (int32_t j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
|
for (int32_t j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
|
||||||
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, i);
|
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
|
||||||
STqOffset* pOffset = taosMemoryCalloc(1, sizeof(STqOffset));
|
|
||||||
|
tscDebug("consumer %ld begin commit for topic %s, vgId %d", tmq->consumerId, pTopic->topicName, pVg->vgId);
|
||||||
|
|
||||||
|
/*if (pVg->currentOffset < 0) {*/
|
||||||
|
if (pVg->currentOffset < 0 || pVg->committedOffset == pVg->currentOffset) {
|
||||||
|
tscDebug("consumer %ld skip commit for topic %s vg %d, current offset is %ld, committed offset is %ld",
|
||||||
|
tmq->consumerId, pTopic->topicName, pVg->vgId, pVg->currentOffset, pVg->committedOffset);
|
||||||
|
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
STqOffset* pOffset = taosMemoryCalloc(1, sizeof(STqOffset));
|
||||||
if (pOffset == NULL) {
|
if (pOffset == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
int32_t tlen = strlen(tmq->groupId);
|
pOffset->type = TMQ_OFFSET__LOG;
|
||||||
memcpy(pOffset->subKey, tmq->groupId, tlen);
|
pOffset->version = pVg->currentOffset;
|
||||||
pOffset->subKey[tlen] = TMQ_SEPARATOR;
|
|
||||||
strcpy(pOffset->subKey + tlen + 1, pTopic->topicName);
|
int32_t groupLen = strlen(tmq->groupId);
|
||||||
|
memcpy(pOffset->subKey, tmq->groupId, groupLen);
|
||||||
|
pOffset->subKey[groupLen] = TMQ_SEPARATOR;
|
||||||
|
strcpy(pOffset->subKey + groupLen + 1, pTopic->topicName);
|
||||||
|
|
||||||
int32_t len;
|
int32_t len;
|
||||||
int32_t code;
|
int32_t code;
|
||||||
tEncodeSize(tEncodeSTqOffset, pOffset, len, code);
|
tEncodeSize(tEncodeSTqOffset, pOffset, len, code);
|
||||||
|
@ -454,25 +481,36 @@ int32_t tmqCommitInner2(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, int8
|
||||||
}
|
}
|
||||||
pMsgSendInfo->msgInfo = (SDataBuf){
|
pMsgSendInfo->msgInfo = (SDataBuf){
|
||||||
.pData = buf,
|
.pData = buf,
|
||||||
.len = len,
|
.len = sizeof(SMsgHead) + len,
|
||||||
.handle = NULL,
|
.handle = NULL,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
tscDebug("consumer %ld commit offset of %s on vg %d, offset is %ld", tmq->consumerId, pOffset->subKey, pVg->vgId,
|
||||||
|
pOffset->version);
|
||||||
|
|
||||||
|
// TODO: put into cb
|
||||||
|
pVg->committedOffset = pVg->currentOffset;
|
||||||
|
|
||||||
pMsgSendInfo->requestId = generateRequestId();
|
pMsgSendInfo->requestId = generateRequestId();
|
||||||
pMsgSendInfo->requestObjRefId = 0;
|
pMsgSendInfo->requestObjRefId = 0;
|
||||||
pMsgSendInfo->param = pParam;
|
pMsgSendInfo->param = pParam;
|
||||||
pMsgSendInfo->fp = tmqCommitCb2;
|
pMsgSendInfo->fp = tmqCommitCb2;
|
||||||
pMsgSendInfo->msgType = TDMT_MND_MQ_COMMIT_OFFSET;
|
pMsgSendInfo->msgType = TDMT_VND_MQ_COMMIT_OFFSET;
|
||||||
// send msg
|
// send msg
|
||||||
|
|
||||||
SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);
|
|
||||||
int64_t transporterId = 0;
|
int64_t transporterId = 0;
|
||||||
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, pMsgSendInfo);
|
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, pMsgSendInfo);
|
||||||
pParamSet->waitingRspNum++;
|
pParamSet->waitingRspNum++;
|
||||||
pParamSet->totalRspNum++;
|
pParamSet->totalRspNum++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (pParamSet->totalRspNum == 0) {
|
||||||
|
tsem_destroy(&pParamSet->rspSem);
|
||||||
|
taosMemoryFree(pParamSet);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
if (!async) {
|
if (!async) {
|
||||||
tsem_wait(&pParamSet->rspSem);
|
tsem_wait(&pParamSet->rspSem);
|
||||||
code = pParamSet->rspErr;
|
code = pParamSet->rspErr;
|
||||||
|
@ -489,10 +527,12 @@ int32_t tmqCommitInner2(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, int8
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#if 0
|
||||||
if (!async) {
|
if (!async) {
|
||||||
taosArrayDestroyP(pParamSet->successfulOffsets, taosMemoryFree);
|
taosArrayDestroyP(pParamSet->successfulOffsets, taosMemoryFree);
|
||||||
taosArrayDestroyP(pParamSet->failedOffsets, taosMemoryFree);
|
taosArrayDestroyP(pParamSet->failedOffsets, taosMemoryFree);
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -648,7 +688,7 @@ int32_t tmqHandleAllDelayedTask(tmq_t* tmq) {
|
||||||
tmqAskEp(tmq, true);
|
tmqAskEp(tmq, true);
|
||||||
taosTmrReset(tmqAssignDelayedHbTask, 1000, tmq, tmqMgmt.timer, &tmq->hbTimer);
|
taosTmrReset(tmqAssignDelayedHbTask, 1000, tmq, tmqMgmt.timer, &tmq->hbTimer);
|
||||||
} else if (*pTaskType == TMQ_DELAYED_TASK__COMMIT) {
|
} else if (*pTaskType == TMQ_DELAYED_TASK__COMMIT) {
|
||||||
tmqCommitInner(tmq, NULL, 1, 1, tmq->commitCb, tmq->commitCbUserParam);
|
tmqCommitInner2(tmq, NULL, 1, 1, tmq->commitCb, tmq->commitCbUserParam);
|
||||||
taosTmrReset(tmqAssignDelayedCommitTask, tmq->autoCommitInterval, tmq, tmqMgmt.timer, &tmq->commitTimer);
|
taosTmrReset(tmqAssignDelayedCommitTask, tmq->autoCommitInterval, tmq, tmqMgmt.timer, &tmq->commitTimer);
|
||||||
} else if (*pTaskType == TMQ_DELAYED_TASK__REPORT) {
|
} else if (*pTaskType == TMQ_DELAYED_TASK__REPORT) {
|
||||||
} else {
|
} else {
|
||||||
|
@ -819,7 +859,7 @@ FAIL:
|
||||||
}
|
}
|
||||||
|
|
||||||
tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, int32_t async) {
|
tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, int32_t async) {
|
||||||
return tmqCommitInner(tmq, offsets, 0, async, tmq->commitCb, tmq->commitCbUserParam);
|
return tmqCommitInner2(tmq, offsets, 0, async, tmq->commitCb, tmq->commitCbUserParam);
|
||||||
}
|
}
|
||||||
|
|
||||||
tmq_resp_err_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
|
tmq_resp_err_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
|
||||||
|
@ -943,6 +983,19 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
tscWarn("msg discard from vg %d, epoch %d, code:%x", vgId, epoch, code);
|
tscWarn("msg discard from vg %d, epoch %d, code:%x", vgId, epoch, code);
|
||||||
if (pMsg->pData) taosMemoryFree(pMsg->pData);
|
if (pMsg->pData) taosMemoryFree(pMsg->pData);
|
||||||
|
if (code == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) {
|
||||||
|
SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM);
|
||||||
|
if (pRspWrapper == NULL) {
|
||||||
|
taosMemoryFree(pMsg->pData);
|
||||||
|
tscWarn("msg discard from vg %d, epoch %d since out of memory", vgId, epoch);
|
||||||
|
goto CREATE_MSG_FAIL;
|
||||||
|
}
|
||||||
|
pRspWrapper->tmqRspType = TMQ_MSG_TYPE__END_RSP;
|
||||||
|
/*pRspWrapper->vgHandle = pVg;*/
|
||||||
|
/*pRspWrapper->topicHandle = pTopic;*/
|
||||||
|
taosWriteQitem(tmq->mqueue, pRspWrapper);
|
||||||
|
tsem_post(&tmq->rspSem);
|
||||||
|
}
|
||||||
goto CREATE_MSG_FAIL;
|
goto CREATE_MSG_FAIL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1017,14 +1070,13 @@ bool tmqUpdateEp2(tmq_t* tmq, int32_t epoch, SMqAskEpRsp* pRsp) {
|
||||||
if (pTopicCur->vgs) {
|
if (pTopicCur->vgs) {
|
||||||
int32_t vgNumCur = taosArrayGetSize(pTopicCur->vgs);
|
int32_t vgNumCur = taosArrayGetSize(pTopicCur->vgs);
|
||||||
tscDebug("consumer %ld new vg num: %d", tmq->consumerId, vgNumCur);
|
tscDebug("consumer %ld new vg num: %d", tmq->consumerId, vgNumCur);
|
||||||
if (vgNumCur == 0) break;
|
|
||||||
for (int32_t j = 0; j < vgNumCur; j++) {
|
for (int32_t j = 0; j < vgNumCur; j++) {
|
||||||
SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, j);
|
SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, j);
|
||||||
sprintf(vgKey, "%s:%d", pTopicCur->topicName, pVgCur->vgId);
|
sprintf(vgKey, "%s:%d", pTopicCur->topicName, pVgCur->vgId);
|
||||||
tscDebug("consumer %ld epoch %d vg %d build %s", tmq->consumerId, epoch, pVgCur->vgId, vgKey);
|
tscDebug("consumer %ld epoch %d vg %d vgKey is %s, offset is %ld", tmq->consumerId, epoch, pVgCur->vgId, vgKey,
|
||||||
|
pVgCur->currentOffset);
|
||||||
taosHashPut(pHash, vgKey, strlen(vgKey), &pVgCur->currentOffset, sizeof(int64_t));
|
taosHashPut(pHash, vgKey, strlen(vgKey), &pVgCur->currentOffset, sizeof(int64_t));
|
||||||
}
|
}
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1032,7 +1084,6 @@ bool tmqUpdateEp2(tmq_t* tmq, int32_t epoch, SMqAskEpRsp* pRsp) {
|
||||||
SMqClientTopic topic = {0};
|
SMqClientTopic topic = {0};
|
||||||
SMqSubTopicEp* pTopicEp = taosArrayGet(pRsp->topics, i);
|
SMqSubTopicEp* pTopicEp = taosArrayGet(pRsp->topics, i);
|
||||||
topic.schema = pTopicEp->schema;
|
topic.schema = pTopicEp->schema;
|
||||||
taosHashClear(pHash);
|
|
||||||
topic.topicName = strdup(pTopicEp->topic);
|
topic.topicName = strdup(pTopicEp->topic);
|
||||||
tstrncpy(topic.db, pTopicEp->db, TSDB_DB_FNAME_LEN);
|
tstrncpy(topic.db, pTopicEp->db, TSDB_DB_FNAME_LEN);
|
||||||
|
|
||||||
|
@ -1049,7 +1100,8 @@ bool tmqUpdateEp2(tmq_t* tmq, int32_t epoch, SMqAskEpRsp* pRsp) {
|
||||||
offset = *pOffset;
|
offset = *pOffset;
|
||||||
}
|
}
|
||||||
|
|
||||||
tscDebug("consumer %ld(epoch %d) offset of vg %d updated to %ld", tmq->consumerId, epoch, pVgEp->vgId, offset);
|
tscDebug("consumer %ld(epoch %d) offset of vg %d updated to %ld, vgKey is %s", tmq->consumerId, epoch,
|
||||||
|
pVgEp->vgId, offset, vgKey);
|
||||||
SMqClientVg clientVg = {
|
SMqClientVg clientVg = {
|
||||||
.pollCnt = 0,
|
.pollCnt = 0,
|
||||||
.currentOffset = offset,
|
.currentOffset = offset,
|
||||||
|
@ -1076,6 +1128,7 @@ bool tmqUpdateEp2(tmq_t* tmq, int32_t epoch, SMqAskEpRsp* pRsp) {
|
||||||
return set;
|
return set;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#if 1
|
||||||
bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqAskEpRsp* pRsp) {
|
bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqAskEpRsp* pRsp) {
|
||||||
/*printf("call update ep %d\n", epoch);*/
|
/*printf("call update ep %d\n", epoch);*/
|
||||||
bool set = false;
|
bool set = false;
|
||||||
|
@ -1160,6 +1213,7 @@ bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqAskEpRsp* pRsp) {
|
||||||
atomic_store_32(&tmq->epoch, epoch);
|
atomic_store_32(&tmq->epoch, epoch);
|
||||||
return set;
|
return set;
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) {
|
int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) {
|
||||||
SMqAskEpCbParam* pParam = (SMqAskEpCbParam*)param;
|
SMqAskEpCbParam* pParam = (SMqAskEpCbParam*)param;
|
||||||
|
@ -1186,7 +1240,7 @@ int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) {
|
||||||
tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &rsp);
|
tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &rsp);
|
||||||
/*printf("rsp epoch %ld sz %ld\n", rsp.epoch, rsp.topics->size);*/
|
/*printf("rsp epoch %ld sz %ld\n", rsp.epoch, rsp.topics->size);*/
|
||||||
/*printf("tmq epoch %ld sz %ld\n", tmq->epoch, tmq->clientTopics->size);*/
|
/*printf("tmq epoch %ld sz %ld\n", tmq->epoch, tmq->clientTopics->size);*/
|
||||||
tmqUpdateEp(tmq, head->epoch, &rsp);
|
tmqUpdateEp2(tmq, head->epoch, &rsp);
|
||||||
tDeleteSMqAskEpRsp(&rsp);
|
tDeleteSMqAskEpRsp(&rsp);
|
||||||
} else {
|
} else {
|
||||||
SMqAskEpRspWrapper* pWrapper = taosAllocateQitem(sizeof(SMqAskEpRspWrapper), DEF_QITEM);
|
SMqAskEpRspWrapper* pWrapper = taosAllocateQitem(sizeof(SMqAskEpRspWrapper), DEF_QITEM);
|
||||||
|
@ -1311,10 +1365,10 @@ SMqPollReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t timeout, SMqClientTopic*
|
||||||
if (pVg->currentOffset >= 0) {
|
if (pVg->currentOffset >= 0) {
|
||||||
reqOffset = pVg->currentOffset;
|
reqOffset = pVg->currentOffset;
|
||||||
} else {
|
} else {
|
||||||
if (tmq->resetOffsetCfg == TMQ_CONF__RESET_OFFSET__NONE) {
|
/*if (tmq->resetOffsetCfg == TMQ_CONF__RESET_OFFSET__NONE) {*/
|
||||||
tscError("unable to poll since no committed offset but reset offset is set to none");
|
/*tscError("unable to poll since no committed offset but reset offset is set to none");*/
|
||||||
return NULL;
|
/*return NULL;*/
|
||||||
}
|
/*}*/
|
||||||
reqOffset = tmq->resetOffsetCfg;
|
reqOffset = tmq->resetOffsetCfg;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1326,10 +1380,10 @@ SMqPollReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t timeout, SMqClientTopic*
|
||||||
/*strcpy(pReq->topic, pTopic->topicName);*/
|
/*strcpy(pReq->topic, pTopic->topicName);*/
|
||||||
/*strcpy(pReq->cgroup, tmq->groupId);*/
|
/*strcpy(pReq->cgroup, tmq->groupId);*/
|
||||||
|
|
||||||
int32_t tlen = strlen(tmq->groupId);
|
int32_t groupLen = strlen(tmq->groupId);
|
||||||
memcpy(pReq->subKey, tmq->groupId, tlen);
|
memcpy(pReq->subKey, tmq->groupId, groupLen);
|
||||||
pReq->subKey[tlen] = TMQ_SEPARATOR;
|
pReq->subKey[groupLen] = TMQ_SEPARATOR;
|
||||||
strcpy(pReq->subKey + tlen + 1, pTopic->topicName);
|
strcpy(pReq->subKey + groupLen + 1, pTopic->topicName);
|
||||||
|
|
||||||
pReq->withTbName = tmq->withTbName;
|
pReq->withTbName = tmq->withTbName;
|
||||||
pReq->timeout = timeout;
|
pReq->timeout = timeout;
|
||||||
|
@ -1440,7 +1494,7 @@ int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper, bool* pReset)
|
||||||
if (rspWrapper->epoch > atomic_load_32(&tmq->epoch)) {
|
if (rspWrapper->epoch > atomic_load_32(&tmq->epoch)) {
|
||||||
SMqAskEpRspWrapper* pEpRspWrapper = (SMqAskEpRspWrapper*)rspWrapper;
|
SMqAskEpRspWrapper* pEpRspWrapper = (SMqAskEpRspWrapper*)rspWrapper;
|
||||||
SMqAskEpRsp* rspMsg = &pEpRspWrapper->msg;
|
SMqAskEpRsp* rspMsg = &pEpRspWrapper->msg;
|
||||||
tmqUpdateEp(tmq, rspWrapper->epoch, rspMsg);
|
tmqUpdateEp2(tmq, rspWrapper->epoch, rspMsg);
|
||||||
/*tmqClearUnhandleMsg(tmq);*/
|
/*tmqClearUnhandleMsg(tmq);*/
|
||||||
*pReset = true;
|
*pReset = true;
|
||||||
} else {
|
} else {
|
||||||
|
@ -1462,7 +1516,11 @@ SMqRspObj* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
|
||||||
if (rspWrapper == NULL) return NULL;
|
if (rspWrapper == NULL) return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RSP) {
|
if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__END_RSP) {
|
||||||
|
taosFreeQitem(rspWrapper);
|
||||||
|
terrno = TSDB_CODE_TQ_NO_COMMITTED_OFFSET;
|
||||||
|
return NULL;
|
||||||
|
} else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RSP) {
|
||||||
SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)rspWrapper;
|
SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)rspWrapper;
|
||||||
/*atomic_sub_fetch_32(&tmq->readyRequest, 1);*/
|
/*atomic_sub_fetch_32(&tmq->readyRequest, 1);*/
|
||||||
int32_t consumerEpoch = atomic_load_32(&tmq->epoch);
|
int32_t consumerEpoch = atomic_load_32(&tmq->epoch);
|
||||||
|
@ -1523,6 +1581,8 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
|
||||||
rspObj = tmqHandleAllRsp(tmq, timeout, false);
|
rspObj = tmqHandleAllRsp(tmq, timeout, false);
|
||||||
if (rspObj) {
|
if (rspObj) {
|
||||||
return (TAOS_RES*)rspObj;
|
return (TAOS_RES*)rspObj;
|
||||||
|
} else if (terrno == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) {
|
||||||
|
return NULL;
|
||||||
}
|
}
|
||||||
if (timeout != -1) {
|
if (timeout != -1) {
|
||||||
int64_t endTime = taosGetTimestampMs();
|
int64_t endTime = taosGetTimestampMs();
|
||||||
|
@ -1608,9 +1668,9 @@ const char* tmq_get_table_name(TAOS_RES* res) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void tmq_commit_async(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, tmq_commit_cb* cb, void* param) {
|
void tmq_commit_async(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, tmq_commit_cb* cb, void* param) {
|
||||||
tmqCommitInner(tmq, offsets, 0, 1, cb, param);
|
tmqCommitInner2(tmq, offsets, 0, 1, cb, param);
|
||||||
}
|
}
|
||||||
|
|
||||||
tmq_resp_err_t tmq_commit_sync(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets) {
|
tmq_resp_err_t tmq_commit_sync(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets) {
|
||||||
return tmqCommitInner(tmq, offsets, 0, 0, NULL, NULL);
|
return tmqCommitInner2(tmq, offsets, 0, 0, NULL, NULL);
|
||||||
}
|
}
|
||||||
|
|
|
@ -124,17 +124,20 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, char* msg, int32_t msgLen) {
|
||||||
tDecoderClear(&decoder);
|
tDecoderClear(&decoder);
|
||||||
|
|
||||||
if (offset.type == TMQ_OFFSET__SNAPSHOT) {
|
if (offset.type == TMQ_OFFSET__SNAPSHOT) {
|
||||||
tqDebug("receive offset commit msg to %s, offset(type:snapshot) uid: %ld, ts: %ld", offset.subKey, offset.uid,
|
tqDebug("receive offset commit msg to %s on vg %d, offset(type:snapshot) uid: %ld, ts: %ld", offset.subKey,
|
||||||
offset.ts);
|
pTq->pVnode->config.vgId, offset.uid, offset.ts);
|
||||||
} else if (offset.type == TMQ_OFFSET__LOG) {
|
} else if (offset.type == TMQ_OFFSET__LOG) {
|
||||||
tqDebug("receive offset commit msg to %s, offset(type:log) version: %ld", offset.subKey, offset.version);
|
tqDebug("receive offset commit msg to %s on vg %d, offset(type:log) version: %ld", offset.subKey,
|
||||||
|
pTq->pVnode->config.vgId, offset.version);
|
||||||
} else {
|
} else {
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
}
|
}
|
||||||
|
STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, offset.subKey);
|
||||||
if (tqOffsetWrite(pTq->pOffsetStore, &offset) < 0) {
|
if (pOffset == NULL || pOffset->version < offset.version) {
|
||||||
ASSERT(0);
|
if (tqOffsetWrite(pTq->pOffsetStore, &offset) < 0) {
|
||||||
return -1;
|
ASSERT(0);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -149,16 +152,33 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
// get offset to fetch message
|
// get offset to fetch message
|
||||||
if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__EARLIEAST) {
|
if (pReq->currentOffset >= 0) {
|
||||||
fetchOffset = walGetFirstVer(pTq->pWal);
|
|
||||||
} else if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__LATEST) {
|
|
||||||
fetchOffset = walGetCommittedVer(pTq->pWal);
|
|
||||||
} else {
|
|
||||||
fetchOffset = pReq->currentOffset + 1;
|
fetchOffset = pReq->currentOffset + 1;
|
||||||
|
} else {
|
||||||
|
STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, pReq->subKey);
|
||||||
|
if (pOffset != NULL) {
|
||||||
|
ASSERT(pOffset->type == TMQ_OFFSET__LOG);
|
||||||
|
tqDebug("consumer %ld, restore offset of %s on vg %d, offset(type:log) version: %ld", consumerId, pReq->subKey,
|
||||||
|
pTq->pVnode->config.vgId, pOffset->version);
|
||||||
|
fetchOffset = pOffset->version + 1;
|
||||||
|
} else {
|
||||||
|
if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__EARLIEAST) {
|
||||||
|
fetchOffset = walGetFirstVer(pTq->pWal);
|
||||||
|
} else if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__LATEST) {
|
||||||
|
fetchOffset = walGetCommittedVer(pTq->pWal);
|
||||||
|
} else if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__NONE) {
|
||||||
|
tqError("tmq poll: no offset committed for consumer %ld in vg %d, subkey %s", consumerId,
|
||||||
|
pTq->pVnode->config.vgId, pReq->subKey);
|
||||||
|
terrno = TSDB_CODE_TQ_NO_COMMITTED_OFFSET;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
tqDebug("consumer %ld, restore offset of %s on vg %d failed, config is %ld, set to %ld", consumerId, pReq->subKey,
|
||||||
|
pTq->pVnode->config.vgId, pReq->currentOffset, fetchOffset);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
tqDebug("tmq poll: consumer %ld (epoch %d) recv poll req in vg %d, req %ld %ld", consumerId, pReq->epoch,
|
tqDebug("tmq poll: consumer %ld (epoch %d) recv poll req in vg %d, req offset %ld fetch offset %ld", consumerId,
|
||||||
TD_VID(pTq->pVnode), pReq->currentOffset, fetchOffset);
|
pReq->epoch, TD_VID(pTq->pVnode), pReq->currentOffset, fetchOffset);
|
||||||
|
|
||||||
STqHandle* pHandle = taosHashGet(pTq->handles, pReq->subKey, strlen(pReq->subKey));
|
STqHandle* pHandle = taosHashGet(pTq->handles, pReq->subKey, strlen(pReq->subKey));
|
||||||
/*ASSERT(pHandle);*/
|
/*ASSERT(pHandle);*/
|
||||||
|
|
|
@ -15,7 +15,4 @@
|
||||||
|
|
||||||
#include "tq.h"
|
#include "tq.h"
|
||||||
|
|
||||||
int tqCommit(STQ* pTq) {
|
int tqCommit(STQ* pTq) { return tqOffsetSnapshot(pTq->pOffsetStore); }
|
||||||
// do nothing
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
|
@ -92,6 +92,8 @@ STqOffset* tqOffsetRead(STqOffsetStore* pStore, const char* subscribeKey) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqOffsetWrite(STqOffsetStore* pStore, const STqOffset* pOffset) {
|
int32_t tqOffsetWrite(STqOffsetStore* pStore, const STqOffset* pOffset) {
|
||||||
|
ASSERT(pOffset->type == TMQ_OFFSET__LOG);
|
||||||
|
ASSERT(pOffset->version >= 0);
|
||||||
return taosHashPut(pStore->pHash, pOffset->subKey, strlen(pOffset->subKey), pOffset, sizeof(STqOffset));
|
return taosHashPut(pStore->pHash, pOffset->subKey, strlen(pOffset->subKey), pOffset, sizeof(STqOffset));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -129,7 +131,7 @@ int32_t tqOffsetSnapshot(STqOffsetStore* pStore) {
|
||||||
tEncodeSTqOffset(&encoder, pOffset);
|
tEncodeSTqOffset(&encoder, pOffset);
|
||||||
// write file
|
// write file
|
||||||
int64_t writeLen;
|
int64_t writeLen;
|
||||||
if ((writeLen = taosWriteFile(pFile, buf, totLen)) != bodyLen) {
|
if ((writeLen = taosWriteFile(pFile, buf, totLen)) != totLen) {
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
tqError("write offset incomplete, len %d, write len %ld", bodyLen, writeLen);
|
tqError("write offset incomplete, len %d, write len %ld", bodyLen, writeLen);
|
||||||
taosHashCancelIterate(pStore->pHash, pIter);
|
taosHashCancelIterate(pStore->pHash, pIter);
|
||||||
|
|
|
@ -577,6 +577,9 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TSMA_NO_INDEX_IN_CACHE, "No tsma index in ca
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_INVALID_ENV, "Invalid rsma env")
|
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_INVALID_ENV, "Invalid rsma env")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_INVALID_STAT, "Invalid rsma state")
|
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_INVALID_STAT, "Invalid rsma state")
|
||||||
|
|
||||||
|
//tq
|
||||||
|
TAOS_DEFINE_ERROR(TSDB_CODE_TQ_NO_COMMITTED_OFFSET, "No committed offset")
|
||||||
|
|
||||||
|
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_INDEX_REBUILDING, "Index is rebuilding")
|
TAOS_DEFINE_ERROR(TSDB_CODE_INDEX_REBUILDING, "Index is rebuilding")
|
||||||
|
|
||||||
|
|
|
@ -62,7 +62,7 @@ typedef struct {
|
||||||
|
|
||||||
tmq_t* tmq;
|
tmq_t* tmq;
|
||||||
tmq_list_t* topicList;
|
tmq_list_t* topicList;
|
||||||
|
|
||||||
int32_t numOfVgroups;
|
int32_t numOfVgroups;
|
||||||
int32_t rowsOfPerVgroups[MAX_VGROUP_CNT][2]; // [i][0]: vgroup id, [i][1]: rows of consume
|
int32_t rowsOfPerVgroups[MAX_VGROUP_CNT][2]; // [i][0]: vgroup id, [i][1]: rows of consume
|
||||||
int64_t ts;
|
int64_t ts;
|
||||||
|
@ -74,7 +74,7 @@ typedef struct {
|
||||||
char cdbName[32];
|
char cdbName[32];
|
||||||
char dbName[32];
|
char dbName[32];
|
||||||
int32_t showMsgFlag;
|
int32_t showMsgFlag;
|
||||||
int32_t showRowFlag;
|
int32_t showRowFlag;
|
||||||
int32_t saveRowFlag;
|
int32_t saveRowFlag;
|
||||||
int32_t consumeDelay; // unit s
|
int32_t consumeDelay; // unit s
|
||||||
int32_t numOfThread;
|
int32_t numOfThread;
|
||||||
|
@ -108,26 +108,20 @@ static void printHelp() {
|
||||||
}
|
}
|
||||||
|
|
||||||
char* getCurrentTimeString(char* timeString) {
|
char* getCurrentTimeString(char* timeString) {
|
||||||
time_t tTime = taosGetTimestampSec();
|
time_t tTime = taosGetTimestampSec();
|
||||||
struct tm tm = *taosLocalTime(&tTime, NULL);
|
struct tm tm = *taosLocalTime(&tTime, NULL);
|
||||||
sprintf(timeString, "%d-%02d-%02d %02d:%02d:%02d",
|
sprintf(timeString, "%d-%02d-%02d %02d:%02d:%02d", tm.tm_year + 1900, tm.tm_mon + 1, tm.tm_mday, tm.tm_hour,
|
||||||
tm.tm_year + 1900,
|
tm.tm_min, tm.tm_sec);
|
||||||
tm.tm_mon + 1,
|
|
||||||
tm.tm_mday,
|
|
||||||
tm.tm_hour,
|
|
||||||
tm.tm_min,
|
|
||||||
tm.tm_sec);
|
|
||||||
|
|
||||||
return timeString;
|
return timeString;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void initLogFile() {
|
void initLogFile() {
|
||||||
char filename[256];
|
char filename[256];
|
||||||
char tmpString[128];
|
char tmpString[128];
|
||||||
|
|
||||||
sprintf(filename,"%s/../log/tmqlog_%s.txt", configDir, getCurrentTimeString(tmpString));
|
sprintf(filename, "%s/../log/tmqlog_%s.txt", configDir, getCurrentTimeString(tmpString));
|
||||||
//sprintf(filename, "%s/../log/tmqlog.txt", configDir);
|
// sprintf(filename, "%s/../log/tmqlog.txt", configDir);
|
||||||
#ifdef WINDOWS
|
#ifdef WINDOWS
|
||||||
for (int i = 2; i < sizeof(filename); i++) {
|
for (int i = 2; i < sizeof(filename); i++) {
|
||||||
if (filename[i] == ':') filename[i] = '-';
|
if (filename[i] == ':') filename[i] = '-';
|
||||||
|
@ -249,17 +243,18 @@ void addRowsToVgroupId(SThreadInfo* pInfo, int32_t vgroupId, int32_t rows) {
|
||||||
for (i = 0; i < pInfo->numOfVgroups; i++) {
|
for (i = 0; i < pInfo->numOfVgroups; i++) {
|
||||||
if (vgroupId == pInfo->rowsOfPerVgroups[i][0]) {
|
if (vgroupId == pInfo->rowsOfPerVgroups[i][0]) {
|
||||||
pInfo->rowsOfPerVgroups[i][1] += rows;
|
pInfo->rowsOfPerVgroups[i][1] += rows;
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pInfo->rowsOfPerVgroups[pInfo->numOfVgroups][0] = vgroupId;
|
pInfo->rowsOfPerVgroups[pInfo->numOfVgroups][0] = vgroupId;
|
||||||
pInfo->rowsOfPerVgroups[pInfo->numOfVgroups][1] += rows;
|
pInfo->rowsOfPerVgroups[pInfo->numOfVgroups][1] += rows;
|
||||||
pInfo->numOfVgroups++;
|
pInfo->numOfVgroups++;
|
||||||
|
|
||||||
taosFprintfFile(g_fp, "consume id %d, add one new vogroup id: %d\n", pInfo->consumerId, vgroupId);
|
taosFprintfFile(g_fp, "consume id %d, add one new vogroup id: %d\n", pInfo->consumerId, vgroupId);
|
||||||
if (pInfo->numOfVgroups > MAX_VGROUP_CNT) {
|
if (pInfo->numOfVgroups > MAX_VGROUP_CNT) {
|
||||||
taosFprintfFile(g_fp, "====consume id %d, vgroup num %d over than 32. new vgroupId: %d\n", pInfo->consumerId, pInfo->numOfVgroups, vgroupId);
|
taosFprintfFile(g_fp, "====consume id %d, vgroup num %d over than 32. new vgroupId: %d\n", pInfo->consumerId,
|
||||||
|
pInfo->numOfVgroups, vgroupId);
|
||||||
taosCloseFile(&g_fp);
|
taosCloseFile(&g_fp);
|
||||||
exit(-1);
|
exit(-1);
|
||||||
}
|
}
|
||||||
|
@ -277,7 +272,8 @@ int32_t saveConsumeContentToTbl(SThreadInfo* pInfo, char* buf) {
|
||||||
TAOS* pConn = taos_connect(NULL, "root", "taosdata", NULL, 0);
|
TAOS* pConn = taos_connect(NULL, "root", "taosdata", NULL, 0);
|
||||||
assert(pConn != NULL);
|
assert(pConn != NULL);
|
||||||
|
|
||||||
sprintf(sqlStr, "insert into %s.content_%d values (%"PRId64", \'%s\')", g_stConfInfo.cdbName, pInfo->consumerId, pInfo->ts++, buf);
|
sprintf(sqlStr, "insert into %s.content_%d values (%" PRId64 ", \'%s\')", g_stConfInfo.cdbName, pInfo->consumerId,
|
||||||
|
pInfo->ts++, buf);
|
||||||
TAOS_RES* pRes = taos_query(pConn, sqlStr);
|
TAOS_RES* pRes = taos_query(pConn, sqlStr);
|
||||||
if (taos_errno(pRes) != 0) {
|
if (taos_errno(pRes) != 0) {
|
||||||
pError("error in insert consume result, reason:%s\n", taos_errstr(pRes));
|
pError("error in insert consume result, reason:%s\n", taos_errstr(pRes));
|
||||||
|
@ -295,12 +291,13 @@ int32_t saveConsumeContentToTbl(SThreadInfo* pInfo, char* buf) {
|
||||||
static int32_t msg_process(TAOS_RES* msg, SThreadInfo* pInfo, int32_t msgIndex) {
|
static int32_t msg_process(TAOS_RES* msg, SThreadInfo* pInfo, int32_t msgIndex) {
|
||||||
char buf[1024];
|
char buf[1024];
|
||||||
int32_t totalRows = 0;
|
int32_t totalRows = 0;
|
||||||
|
|
||||||
// printf("topic: %s\n", tmq_get_topic_name(msg));
|
// printf("topic: %s\n", tmq_get_topic_name(msg));
|
||||||
int32_t vgroupId = tmq_get_vgroup_id(msg);
|
int32_t vgroupId = tmq_get_vgroup_id(msg);
|
||||||
|
|
||||||
taosFprintfFile(g_fp, "msg index:%" PRId64 ", consumerId: %d\n", msgIndex, pInfo->consumerId);
|
taosFprintfFile(g_fp, "msg index:%" PRId64 ", consumerId: %d\n", msgIndex, pInfo->consumerId);
|
||||||
//taosFprintfFile(g_fp, "topic: %s, vgroupId: %d, tableName: %s\n", tmq_get_topic_name(msg), vgroupId, tmq_get_table_name(msg));
|
// taosFprintfFile(g_fp, "topic: %s, vgroupId: %d, tableName: %s\n", tmq_get_topic_name(msg), vgroupId,
|
||||||
|
// tmq_get_table_name(msg));
|
||||||
taosFprintfFile(g_fp, "topic: %s, vgroupId: %d\n", tmq_get_topic_name(msg), vgroupId);
|
taosFprintfFile(g_fp, "topic: %s, vgroupId: %d\n", tmq_get_topic_name(msg), vgroupId);
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
|
@ -316,9 +313,9 @@ static int32_t msg_process(TAOS_RES* msg, SThreadInfo* pInfo, int32_t msgIndex)
|
||||||
const char* tbName = tmq_get_table_name(msg);
|
const char* tbName = tmq_get_table_name(msg);
|
||||||
|
|
||||||
if (0 != g_stConfInfo.showRowFlag) {
|
if (0 != g_stConfInfo.showRowFlag) {
|
||||||
taosFprintfFile(g_fp, "tbname:%s, rows[%d]: %s\n", (tbName != NULL ? tbName:"null table"), totalRows, buf);
|
taosFprintfFile(g_fp, "tbname:%s, rows[%d]: %s\n", (tbName != NULL ? tbName : "null table"), totalRows, buf);
|
||||||
if (0 != g_stConfInfo.saveRowFlag) {
|
if (0 != g_stConfInfo.saveRowFlag) {
|
||||||
saveConsumeContentToTbl(pInfo, buf);
|
saveConsumeContentToTbl(pInfo, buf);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -326,7 +323,7 @@ static int32_t msg_process(TAOS_RES* msg, SThreadInfo* pInfo, int32_t msgIndex)
|
||||||
}
|
}
|
||||||
|
|
||||||
addRowsToVgroupId(pInfo, vgroupId, totalRows);
|
addRowsToVgroupId(pInfo, vgroupId, totalRows);
|
||||||
|
|
||||||
return totalRows;
|
return totalRows;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -401,16 +398,11 @@ int32_t saveConsumeResult(SThreadInfo* pInfo) {
|
||||||
int64_t now = taosGetTimestampMs();
|
int64_t now = taosGetTimestampMs();
|
||||||
|
|
||||||
// schema: ts timestamp, consumerid int, consummsgcnt bigint, checkresult int
|
// schema: ts timestamp, consumerid int, consummsgcnt bigint, checkresult int
|
||||||
sprintf(sqlStr, "insert into %s.consumeresult values (%"PRId64", %d, %" PRId64 ", %" PRId64 ", %d)",
|
sprintf(sqlStr, "insert into %s.consumeresult values (%" PRId64 ", %d, %" PRId64 ", %" PRId64 ", %d)",
|
||||||
g_stConfInfo.cdbName,
|
g_stConfInfo.cdbName, now, pInfo->consumerId, pInfo->consumeMsgCnt, pInfo->consumeRowCnt, pInfo->checkresult);
|
||||||
now,
|
|
||||||
pInfo->consumerId,
|
|
||||||
pInfo->consumeMsgCnt,
|
|
||||||
pInfo->consumeRowCnt,
|
|
||||||
pInfo->checkresult);
|
|
||||||
|
|
||||||
char tmpString[128];
|
char tmpString[128];
|
||||||
taosFprintfFile(g_fp, "%s, consume id %d result: %s\n", getCurrentTimeString(tmpString), pInfo->consumerId ,sqlStr);
|
taosFprintfFile(g_fp, "%s, consume id %d result: %s\n", getCurrentTimeString(tmpString), pInfo->consumerId, sqlStr);
|
||||||
|
|
||||||
TAOS_RES* pRes = taos_query(pConn, sqlStr);
|
TAOS_RES* pRes = taos_query(pConn, sqlStr);
|
||||||
if (taos_errno(pRes) != 0) {
|
if (taos_errno(pRes) != 0) {
|
||||||
|
@ -421,7 +413,7 @@ int32_t saveConsumeResult(SThreadInfo* pInfo) {
|
||||||
|
|
||||||
taos_free_result(pRes);
|
taos_free_result(pRes);
|
||||||
|
|
||||||
#if 0
|
#if 0
|
||||||
// vgroups
|
// vgroups
|
||||||
for (i = 0; i < pInfo->numOfVgroups; i++) {
|
for (i = 0; i < pInfo->numOfVgroups; i++) {
|
||||||
// schema: ts timestamp, consumerid int, consummsgcnt bigint, checkresult int
|
// schema: ts timestamp, consumerid int, consummsgcnt bigint, checkresult int
|
||||||
|
@ -445,7 +437,7 @@ int32_t saveConsumeResult(SThreadInfo* pInfo) {
|
||||||
|
|
||||||
taos_free_result(pRes);
|
taos_free_result(pRes);
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -457,7 +449,8 @@ void loop_consume(SThreadInfo* pInfo) {
|
||||||
int64_t totalRows = 0;
|
int64_t totalRows = 0;
|
||||||
|
|
||||||
char tmpString[128];
|
char tmpString[128];
|
||||||
taosFprintfFile(g_fp, "%s consumer id %d start to loop pull msg\n", getCurrentTimeString(tmpString), pInfo->consumerId);
|
taosFprintfFile(g_fp, "%s consumer id %d start to loop pull msg\n", getCurrentTimeString(tmpString),
|
||||||
|
pInfo->consumerId);
|
||||||
|
|
||||||
pInfo->ts = taosGetTimestampMs();
|
pInfo->ts = taosGetTimestampMs();
|
||||||
|
|
||||||
|
@ -473,7 +466,7 @@ void loop_consume(SThreadInfo* pInfo) {
|
||||||
totalMsgs++;
|
totalMsgs++;
|
||||||
|
|
||||||
if (totalRows >= pInfo->expectMsgCnt) {
|
if (totalRows >= pInfo->expectMsgCnt) {
|
||||||
char tmpString[128];
|
char tmpString[128];
|
||||||
taosFprintfFile(g_fp, "%s over than expect rows, so break consume\n", getCurrentTimeString(tmpString));
|
taosFprintfFile(g_fp, "%s over than expect rows, so break consume\n", getCurrentTimeString(tmpString));
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -519,6 +512,8 @@ void* consumeThreadFunc(void* param) {
|
||||||
pPrint("tmq_commit() manual commit when consume end.\n");
|
pPrint("tmq_commit() manual commit when consume end.\n");
|
||||||
/*tmq_commit(pInfo->tmq, NULL, 0);*/
|
/*tmq_commit(pInfo->tmq, NULL, 0);*/
|
||||||
tmq_commit_sync(pInfo->tmq, NULL);
|
tmq_commit_sync(pInfo->tmq, NULL);
|
||||||
|
taosFprintfFile(g_fp, "tmq_commit() manual commit over.\n");
|
||||||
|
pPrint("tmq_commit() manual commit over.\n");
|
||||||
}
|
}
|
||||||
|
|
||||||
err = tmq_unsubscribe(pInfo->tmq);
|
err = tmq_unsubscribe(pInfo->tmq);
|
||||||
|
|
Loading…
Reference in New Issue