Merge pull request #16119 from taosdata/feature/stream

refactor(tmq): put write logic into taosx.c
This commit is contained in:
Liu Jicong 2022-08-15 23:00:06 +08:00 committed by GitHub
commit 3e33fda68d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 1857 additions and 1773 deletions

View File

@ -28,7 +28,8 @@ function runConsumer() {
console.log(msg.topicPartition); console.log(msg.topicPartition);
console.log(msg.block); console.log(msg.block);
console.log(msg.fields) console.log(msg.fields)
consumer.commit(msg); // fixme(@xiaolei): commented temp, should be fixed.
//consumer.commit(msg);
console.log(`=======consumer ${i} done`) console.log(`=======consumer ${i} done`)
} }
@ -48,4 +49,4 @@ try {
cursor.close(); cursor.close();
conn.close(); conn.close();
}, 2000); }, 2000);
} }

View File

@ -45,10 +45,9 @@ static int32_t msg_process(TAOS_RES* msg) {
int32_t numOfFields = taos_field_count(msg); int32_t numOfFields = taos_field_count(msg);
int32_t* length = taos_fetch_lengths(msg); int32_t* length = taos_fetch_lengths(msg);
int32_t precision = taos_result_precision(msg); int32_t precision = taos_result_precision(msg);
const char* tbName = tmq_get_table_name(msg);
rows++; rows++;
taos_print_row(buf, row, fields, numOfFields); taos_print_row(buf, row, fields, numOfFields);
printf("row content from %s: %s\n", (tbName != NULL ? tbName : "table null"), buf); printf("row content: %s\n", buf);
} }
return rows; return rows;
@ -167,7 +166,7 @@ int32_t create_topic() {
} }
taos_free_result(pRes); taos_free_result(pRes);
pRes = taos_query(pConn, "create topic topicname as select ts, c1, c2, c3 from tmqdb.stb where c1 > 1"); pRes = taos_query(pConn, "create topic topicname as select ts, c1, c2, c3, tbname from tmqdb.stb where c1 > 1");
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("failed to create topic topicname, reason:%s\n", taos_errstr(pRes)); printf("failed to create topic topicname, reason:%s\n", taos_errstr(pRes));
return -1; return -1;
@ -199,9 +198,7 @@ tmq_t* build_consumer() {
if (TMQ_CONF_OK != code) return NULL; if (TMQ_CONF_OK != code) return NULL;
code = tmq_conf_set(conf, "auto.offset.reset", "earliest"); code = tmq_conf_set(conf, "auto.offset.reset", "earliest");
if (TMQ_CONF_OK != code) return NULL; if (TMQ_CONF_OK != code) return NULL;
code = tmq_conf_set(conf, "experimental.snapshot.enable", "true"); code = tmq_conf_set(conf, "experimental.snapshot.enable", "false");
if (TMQ_CONF_OK != code) return NULL;
code = tmq_conf_set(conf, "msg.with.table.name", "true");
if (TMQ_CONF_OK != code) return NULL; if (TMQ_CONF_OK != code) return NULL;
tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL); tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL);
@ -220,14 +217,7 @@ tmq_list_t* build_topic_list() {
return topicList; return topicList;
} }
void basic_consume_loop(tmq_t* tmq, tmq_list_t* topicList) { void basic_consume_loop(tmq_t* tmq) {
int32_t code;
if ((code = tmq_subscribe(tmq, topicList))) {
fprintf(stderr, "%% Failed to tmq_subscribe(): %s\n", tmq_err2str(code));
return;
}
int32_t totalRows = 0; int32_t totalRows = 0;
int32_t msgCnt = 0; int32_t msgCnt = 0;
int32_t timeout = 5000; int32_t timeout = 5000;
@ -237,8 +227,8 @@ void basic_consume_loop(tmq_t* tmq, tmq_list_t* topicList) {
msgCnt++; msgCnt++;
totalRows += msg_process(tmqmsg); totalRows += msg_process(tmqmsg);
taos_free_result(tmqmsg); taos_free_result(tmqmsg);
/*} else {*/ } else {
/*break;*/ break;
} }
} }
@ -267,14 +257,12 @@ int main(int argc, char* argv[]) {
return -1; return -1;
} }
basic_consume_loop(tmq, topic_list); if ((code = tmq_subscribe(tmq, topic_list))) {
fprintf(stderr, "%% Failed to tmq_subscribe(): %s\n", tmq_err2str(code));
code = tmq_unsubscribe(tmq);
if (code) {
fprintf(stderr, "%% Failed to unsubscribe: %s\n", tmq_err2str(code));
} else {
fprintf(stderr, "%% unsubscribe\n");
} }
tmq_list_destroy(topic_list);
basic_consume_loop(tmq);
code = tmq_consumer_close(tmq); code = tmq_consumer_close(tmq);
if (code) { if (code) {

View File

@ -131,10 +131,10 @@ DLL_EXPORT int taos_options(TSDB_OPTION option, const void *arg, ...);
DLL_EXPORT setConfRet taos_set_config(const char *config); DLL_EXPORT setConfRet taos_set_config(const char *config);
DLL_EXPORT int taos_init(void); DLL_EXPORT int taos_init(void);
DLL_EXPORT TAOS *taos_connect(const char *ip, const char *user, const char *pass, const char *db, uint16_t port); DLL_EXPORT TAOS *taos_connect(const char *ip, const char *user, const char *pass, const char *db, uint16_t port);
DLL_EXPORT TAOS *taos_connect_auth(const char *ip, const char *user, const char *auth, const char *db, uint16_t port); DLL_EXPORT TAOS *taos_connect_auth(const char *ip, const char *user, const char *auth, const char *db, uint16_t port);
DLL_EXPORT void taos_close(TAOS *taos); DLL_EXPORT void taos_close(TAOS *taos);
const char *taos_data_type(int type); const char *taos_data_type(int type);
DLL_EXPORT TAOS_STMT *taos_stmt_init(TAOS *taos); DLL_EXPORT TAOS_STMT *taos_stmt_init(TAOS *taos);
DLL_EXPORT int taos_stmt_prepare(TAOS_STMT *stmt, const char *sql, unsigned long length); DLL_EXPORT int taos_stmt_prepare(TAOS_STMT *stmt, const char *sql, unsigned long length);
@ -244,33 +244,37 @@ DLL_EXPORT void tmq_conf_set_auto_commit_cb(tmq_conf_t *conf, tmq_comm
/* -------------------------TMQ MSG HANDLE INTERFACE---------------------- */ /* -------------------------TMQ MSG HANDLE INTERFACE---------------------- */
DLL_EXPORT const char *tmq_get_topic_name(TAOS_RES *res);
DLL_EXPORT const char *tmq_get_db_name(TAOS_RES *res);
DLL_EXPORT int32_t tmq_get_vgroup_id(TAOS_RES *res);
/* ------------------------------ TAOSX -----------------------------------*/
// note: following apis are unstable
enum tmq_res_t { enum tmq_res_t {
TMQ_RES_INVALID = -1, TMQ_RES_INVALID = -1,
TMQ_RES_DATA = 1, TMQ_RES_DATA = 1,
TMQ_RES_TABLE_META = 2, TMQ_RES_TABLE_META = 2,
}; };
typedef struct tmq_raw_data{ typedef struct tmq_raw_data {
void* raw; void *raw;
uint32_t raw_len; uint32_t raw_len;
uint16_t raw_type; uint16_t raw_type;
} tmq_raw_data; } tmq_raw_data;
typedef enum tmq_res_t tmq_res_t; typedef enum tmq_res_t tmq_res_t;
DLL_EXPORT tmq_res_t tmq_get_res_type(TAOS_RES *res); DLL_EXPORT const char *tmq_get_table_name(TAOS_RES *res);
DLL_EXPORT int32_t tmq_get_raw(TAOS_RES *res, tmq_raw_data *raw); DLL_EXPORT tmq_res_t tmq_get_res_type(TAOS_RES *res);
DLL_EXPORT int32_t tmq_write_raw(TAOS *taos, tmq_raw_data raw); DLL_EXPORT int32_t tmq_get_raw(TAOS_RES *res, tmq_raw_data *raw);
DLL_EXPORT int taos_write_raw_block(TAOS *taos, int numOfRows, char *pData, const char* tbname); DLL_EXPORT int32_t tmq_write_raw(TAOS *taos, tmq_raw_data raw);
DLL_EXPORT void tmq_free_raw(tmq_raw_data raw); DLL_EXPORT int taos_write_raw_block(TAOS *taos, int numOfRows, char *pData, const char *tbname);
DLL_EXPORT char *tmq_get_json_meta(TAOS_RES *res); // Returning null means error. Returned result need to be freed by tmq_free_json_meta DLL_EXPORT void tmq_free_raw(tmq_raw_data raw);
DLL_EXPORT void tmq_free_json_meta(char* jsonMeta); // Returning null means error. Returned result need to be freed by tmq_free_json_meta
DLL_EXPORT const char *tmq_get_topic_name(TAOS_RES *res); DLL_EXPORT char *tmq_get_json_meta(TAOS_RES *res);
DLL_EXPORT const char *tmq_get_db_name(TAOS_RES *res); DLL_EXPORT void tmq_free_json_meta(char *jsonMeta);
DLL_EXPORT int32_t tmq_get_vgroup_id(TAOS_RES *res);
DLL_EXPORT const char *tmq_get_table_name(TAOS_RES *res);
/* ------------------------------ TMQ END -------------------------------- */ /* ---------------------------- TAOSX END -------------------------------- */
typedef enum { typedef enum {
TSDB_SRV_STATUS_UNAVAILABLE = 0, TSDB_SRV_STATUS_UNAVAILABLE = 0,

View File

@ -622,6 +622,7 @@ int32_t* taosGetErrno();
//tmq //tmq
#define TSDB_CODE_TMQ_INVALID_MSG TAOS_DEF_ERROR_CODE(0, 0x4000) #define TSDB_CODE_TMQ_INVALID_MSG TAOS_DEF_ERROR_CODE(0, 0x4000)
#define TSDB_CODE_TMQ_CONSUMER_MISMATCH TAOS_DEF_ERROR_CODE(0, 0x4001) #define TSDB_CODE_TMQ_CONSUMER_MISMATCH TAOS_DEF_ERROR_CODE(0, 0x4001)
#define TSDB_CODE_TMQ_CONSUMER_CLOSED TAOS_DEF_ERROR_CODE(0, 0x4002)
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -29,11 +29,11 @@ int32_t taosOpenRef(int32_t max, void (*fp)(void *));
// close the reference set, refId is the return value by taosOpenRef // close the reference set, refId is the return value by taosOpenRef
// return 0 if success. On error, -1 is returned, and terrno is set appropriately // return 0 if success. On error, -1 is returned, and terrno is set appropriately
int32_t taosCloseRef(int32_t refId); int32_t taosCloseRef(int32_t rsetId);
// add ref, p is the pointer to resource or pointer ID // add ref, p is the pointer to resource or pointer ID
// return Reference ID(rid) allocated. On error, -1 is returned, and terrno is set appropriately // return Reference ID(rid) allocated. On error, -1 is returned, and terrno is set appropriately
int64_t taosAddRef(int32_t refId, void *p); int64_t taosAddRef(int32_t rsetId, void *p);
// remove ref, rid is the reference ID returned by taosAddRef // remove ref, rid is the reference ID returned by taosAddRef
// return 0 if success. On error, -1 is returned, and terrno is set appropriately // return 0 if success. On error, -1 is returned, and terrno is set appropriately

View File

@ -192,6 +192,7 @@ void taos_free_result(TAOS_RES *res) {
if (pRsp->rsp.withSchema) taosArrayDestroyP(pRsp->rsp.blockSchema, (FDelete)tDeleteSSchemaWrapper); if (pRsp->rsp.withSchema) taosArrayDestroyP(pRsp->rsp.blockSchema, (FDelete)tDeleteSSchemaWrapper);
pRsp->resInfo.pRspMsg = NULL; pRsp->resInfo.pRspMsg = NULL;
doFreeReqResultInfo(&pRsp->resInfo); doFreeReqResultInfo(&pRsp->resInfo);
taosMemoryFree(pRsp);
} else if (TD_RES_TMQ_META(res)) { } else if (TD_RES_TMQ_META(res)) {
SMqMetaRspObj *pRspObj = (SMqMetaRspObj *)res; SMqMetaRspObj *pRspObj = (SMqMetaRspObj *)res;
taosMemoryFree(pRspObj->metaRsp.metaRsp); taosMemoryFree(pRspObj->metaRsp.metaRsp);

1628
source/client/src/taosx.c Normal file

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -624,6 +624,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_INDEX_REBUILDING, "Invalid index file"
//tmq //tmq
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_INVALID_MSG, "Invalid message") TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_INVALID_MSG, "Invalid message")
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_CONSUMER_MISMATCH, "Consumer mismatch") TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_CONSUMER_MISMATCH, "Consumer mismatch")
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_CONSUMER_CLOSED, "Consumer closed")
#ifdef TAOS_ERROR_C #ifdef TAOS_ERROR_C
}; };