diff --git a/example/src/tmq.c b/example/src/tmq.c
index 99e0c443dd..5858282aab 100644
--- a/example/src/tmq.c
+++ b/example/src/tmq.c
@@ -13,16 +13,14 @@
* along with this program. If not, see .
*/
+#include
#include
#include
-#include
#include
#include "taos.h"
-static int running = 1;
-static void msg_process(tmq_message_t* message) {
- tmqShowMsg(message);
-}
+static int running = 1;
+static void msg_process(tmq_message_t* message) { tmqShowMsg(message); }
int32_t init_env() {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
@@ -44,29 +42,28 @@ int32_t init_env() {
}
taos_free_result(pRes);
- /*pRes = taos_query(pConn, "create stable if not exists st1 (ts timestamp, k int) tags(a int)");*/
- /*if (taos_errno(pRes) != 0) {*/
- /*printf("failed to create super table 123_$^), reason:%s\n", taos_errstr(pRes));*/
- /*return -1;*/
- /*}*/
- /*taos_free_result(pRes);*/
+ pRes = taos_query(pConn, "create stable if not exists st1 (ts timestamp, k int) tags(a int)");
+ if (taos_errno(pRes) != 0) {
+ printf("failed to create super table 123_$^), reason:%s\n", taos_errstr(pRes));
+ return -1;
+ }
+ taos_free_result(pRes);
- /*pRes = taos_query(pConn, "create table if not exists tu using st1 tags(1)");*/
- /*if (taos_errno(pRes) != 0) {*/
- /*printf("failed to create child table tu, reason:%s\n", taos_errstr(pRes));*/
- /*return -1;*/
- /*}*/
- /*taos_free_result(pRes);*/
+ pRes = taos_query(pConn, "create table if not exists tu1 using st1 tags(1)");
+ if (taos_errno(pRes) != 0) {
+ printf("failed to create child table tu1, reason:%s\n", taos_errstr(pRes));
+ return -1;
+ }
+ taos_free_result(pRes);
- /*pRes = taos_query(pConn, "create table if not exists tu2 using st1 tags(2)");*/
- /*if (taos_errno(pRes) != 0) {*/
- /*printf("failed to create child table tu2, reason:%s\n", taos_errstr(pRes));*/
- /*return -1;*/
- /*}*/
- /*taos_free_result(pRes);*/
+ pRes = taos_query(pConn, "create table if not exists tu2 using st1 tags(2)");
+ if (taos_errno(pRes) != 0) {
+ printf("failed to create child table tu2, reason:%s\n", taos_errstr(pRes));
+ return -1;
+ }
+ taos_free_result(pRes);
-
- const char* sql = "select * from st1";
+ const char* sql = "select * from tu1";
pRes = tmq_create_topic(pConn, "test_stb_topic_1", sql, strlen(sql));
if (taos_errno(pRes) != 0) {
printf("failed to create topic test_stb_topic_1, reason:%s\n", taos_errstr(pRes));
@@ -95,7 +92,7 @@ tmq_t* build_consumer() {
tmq_list_t* topic_list = tmq_list_new();
tmq_list_append(topic_list, "test_stb_topic_1");
tmq_subscribe(tmq, topic_list);
- return NULL;
+ return NULL;
}
tmq_list_t* build_topic_list() {
@@ -104,8 +101,7 @@ tmq_list_t* build_topic_list() {
return topic_list;
}
-void basic_consume_loop(tmq_t *tmq,
- tmq_list_t *topics) {
+void basic_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
tmq_resp_err_t err;
if ((err = tmq_subscribe(tmq, topics))) {
@@ -116,12 +112,12 @@ void basic_consume_loop(tmq_t *tmq,
int32_t cnt = 0;
/*clock_t startTime = clock();*/
while (running) {
- tmq_message_t *tmqmessage = tmq_consumer_poll(tmq, 500);
+ tmq_message_t* tmqmessage = tmq_consumer_poll(tmq, 500);
if (tmqmessage) {
cnt++;
msg_process(tmqmessage);
tmq_message_destroy(tmqmessage);
- /*} else {*/
+ /*} else {*/
/*break;*/
}
}
@@ -135,11 +131,10 @@ void basic_consume_loop(tmq_t *tmq,
fprintf(stderr, "%% Consumer closed\n");
}
-void sync_consume_loop(tmq_t *tmq,
- tmq_list_t *topics) {
+void sync_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
static const int MIN_COMMIT_COUNT = 1000;
- int msg_count = 0;
+ int msg_count = 0;
tmq_resp_err_t err;
if ((err = tmq_subscribe(tmq, topics))) {
@@ -148,15 +143,47 @@ void sync_consume_loop(tmq_t *tmq,
}
while (running) {
- tmq_message_t *tmqmessage = tmq_consumer_poll(tmq, 500);
+ tmq_message_t* tmqmessage = tmq_consumer_poll(tmq, 500);
if (tmqmessage) {
msg_process(tmqmessage);
tmq_message_destroy(tmqmessage);
- if ((++msg_count % MIN_COMMIT_COUNT) == 0)
- tmq_commit(tmq, NULL, 0);
+ if ((++msg_count % MIN_COMMIT_COUNT) == 0) tmq_commit(tmq, NULL, 0);
}
- }
+ }
+
+ err = tmq_consumer_close(tmq);
+ if (err)
+ fprintf(stderr, "%% Failed to close consumer: %s\n", tmq_err2str(err));
+ else
+ fprintf(stderr, "%% Consumer closed\n");
+}
+
+void perf_loop(tmq_t* tmq, tmq_list_t* topics) {
+ tmq_resp_err_t err;
+
+ if ((err = tmq_subscribe(tmq, topics))) {
+ fprintf(stderr, "%% Failed to start consuming topics: %s\n", tmq_err2str(err));
+ printf("subscribe err\n");
+ return;
+ }
+ int32_t batchCnt = 0;
+ int32_t skipLogNum = 0;
+ clock_t startTime = clock();
+ while (running) {
+ tmq_message_t* tmqmessage = tmq_consumer_poll(tmq, 500);
+ if (tmqmessage) {
+ batchCnt++;
+ skipLogNum += tmqGetSkipLogNum(tmqmessage);
+ /*msg_process(tmqmessage);*/
+ tmq_message_destroy(tmqmessage);
+ } else {
+ break;
+ }
+ }
+ clock_t endTime = clock();
+ printf("log batch cnt: %d, skip log cnt: %d, time used:%f s\n", batchCnt, skipLogNum,
+ (double)(endTime - startTime) / CLOCKS_PER_SEC);
err = tmq_consumer_close(tmq);
if (err)
@@ -168,8 +195,9 @@ void sync_consume_loop(tmq_t *tmq,
int main() {
int code;
code = init_env();
- tmq_t* tmq = build_consumer();
+ tmq_t* tmq = build_consumer();
tmq_list_t* topic_list = build_topic_list();
- basic_consume_loop(tmq, topic_list);
+ perf_loop(tmq, topic_list);
+ /*basic_consume_loop(tmq, topic_list);*/
/*sync_consume_loop(tmq, topic_list);*/
}
diff --git a/include/client/taos.h b/include/client/taos.h
index a8627a43da..9b676d1286 100644
--- a/include/client/taos.h
+++ b/include/client/taos.h
@@ -16,8 +16,8 @@
#ifndef TDENGINE_TAOS_H
#define TDENGINE_TAOS_H
-#include
#include
+#include
#ifdef __cplusplus
extern "C" {
@@ -31,26 +31,26 @@ typedef void TAOS_SUB;
typedef void **TAOS_ROW;
// Data type definition
-#define TSDB_DATA_TYPE_NULL 0 // 1 bytes
-#define TSDB_DATA_TYPE_BOOL 1 // 1 bytes
-#define TSDB_DATA_TYPE_TINYINT 2 // 1 byte
-#define TSDB_DATA_TYPE_SMALLINT 3 // 2 bytes
-#define TSDB_DATA_TYPE_INT 4 // 4 bytes
-#define TSDB_DATA_TYPE_BIGINT 5 // 8 bytes
-#define TSDB_DATA_TYPE_FLOAT 6 // 4 bytes
-#define TSDB_DATA_TYPE_DOUBLE 7 // 8 bytes
-#define TSDB_DATA_TYPE_BINARY 8 // string, alias for varchar
-#define TSDB_DATA_TYPE_TIMESTAMP 9 // 8 bytes
-#define TSDB_DATA_TYPE_NCHAR 10 // unicode string
-#define TSDB_DATA_TYPE_UTINYINT 11 // 1 byte
-#define TSDB_DATA_TYPE_USMALLINT 12 // 2 bytes
-#define TSDB_DATA_TYPE_UINT 13 // 4 bytes
-#define TSDB_DATA_TYPE_UBIGINT 14 // 8 bytes
-#define TSDB_DATA_TYPE_VARCHAR 15 // string
-#define TSDB_DATA_TYPE_VARBINARY 16 // binary
-#define TSDB_DATA_TYPE_JSON 17 // json
-#define TSDB_DATA_TYPE_DECIMAL 18 // decimal
-#define TSDB_DATA_TYPE_BLOB 19 // binary
+#define TSDB_DATA_TYPE_NULL 0 // 1 bytes
+#define TSDB_DATA_TYPE_BOOL 1 // 1 bytes
+#define TSDB_DATA_TYPE_TINYINT 2 // 1 byte
+#define TSDB_DATA_TYPE_SMALLINT 3 // 2 bytes
+#define TSDB_DATA_TYPE_INT 4 // 4 bytes
+#define TSDB_DATA_TYPE_BIGINT 5 // 8 bytes
+#define TSDB_DATA_TYPE_FLOAT 6 // 4 bytes
+#define TSDB_DATA_TYPE_DOUBLE 7 // 8 bytes
+#define TSDB_DATA_TYPE_BINARY 8 // string, alias for varchar
+#define TSDB_DATA_TYPE_TIMESTAMP 9 // 8 bytes
+#define TSDB_DATA_TYPE_NCHAR 10 // unicode string
+#define TSDB_DATA_TYPE_UTINYINT 11 // 1 byte
+#define TSDB_DATA_TYPE_USMALLINT 12 // 2 bytes
+#define TSDB_DATA_TYPE_UINT 13 // 4 bytes
+#define TSDB_DATA_TYPE_UBIGINT 14 // 8 bytes
+#define TSDB_DATA_TYPE_VARCHAR 15 // string
+#define TSDB_DATA_TYPE_VARBINARY 16 // binary
+#define TSDB_DATA_TYPE_JSON 17 // json
+#define TSDB_DATA_TYPE_DECIMAL 18 // decimal
+#define TSDB_DATA_TYPE_BLOB 19 // binary
typedef enum {
TSDB_OPTION_LOCALE,
@@ -63,9 +63,9 @@ typedef enum {
typedef enum {
TSDB_SML_UNKNOWN_PROTOCOL = 0,
- TSDB_SML_LINE_PROTOCOL = 1,
- TSDB_SML_TELNET_PROTOCOL = 2,
- TSDB_SML_JSON_PROTOCOL = 3,
+ TSDB_SML_LINE_PROTOCOL = 1,
+ TSDB_SML_TELNET_PROTOCOL = 2,
+ TSDB_SML_JSON_PROTOCOL = 3,
} TSDB_SML_PROTOCOL_TYPE;
typedef enum {
@@ -79,28 +79,28 @@ typedef enum {
} TSDB_SML_TIMESTAMP_TYPE;
typedef struct taosField {
- char name[65];
- int8_t type;
- int32_t bytes;
+ char name[65];
+ int8_t type;
+ int32_t bytes;
} TAOS_FIELD;
#ifdef _TD_GO_DLL_
- #define DLL_EXPORT __declspec(dllexport)
+#define DLL_EXPORT __declspec(dllexport)
#else
- #define DLL_EXPORT
+#define DLL_EXPORT
#endif
typedef void (*__taos_async_fn_t)(void *param, TAOS_RES *, int code);
typedef struct TAOS_BIND {
- int buffer_type;
- void * buffer;
- uintptr_t buffer_length; // unused
- uintptr_t *length;
- int * is_null;
+ int buffer_type;
+ void *buffer;
+ uintptr_t buffer_length; // unused
+ uintptr_t *length;
+ int *is_null;
- int is_unsigned; // unused
- int * error; // unused
+ int is_unsigned; // unused
+ int *error; // unused
union {
int64_t ts;
int8_t b;
@@ -113,22 +113,23 @@ typedef struct TAOS_BIND {
unsigned char *bin;
char *nchar;
} u;
- unsigned int allocated;
+ unsigned int allocated;
} TAOS_BIND;
typedef struct TAOS_MULTI_BIND {
- int buffer_type;
- void *buffer;
- uintptr_t buffer_length;
- int32_t *length;
- char *is_null;
- int num;
+ int buffer_type;
+ void *buffer;
+ uintptr_t buffer_length;
+ int32_t *length;
+ char *is_null;
+ int num;
} TAOS_MULTI_BIND;
DLL_EXPORT void taos_cleanup(void);
DLL_EXPORT int taos_options(TSDB_OPTION option, const void *arg, ...);
DLL_EXPORT TAOS *taos_connect(const char *ip, const char *user, const char *pass, const char *db, uint16_t port);
-DLL_EXPORT TAOS *taos_connect_l(const char *ip, int ipLen, const char *user, int userLen, const char *pass, int passLen, const char *db, int dbLen, uint16_t port);
+DLL_EXPORT TAOS *taos_connect_l(const char *ip, int ipLen, const char *user, int userLen, const char *pass, int passLen,
+ const char *db, int dbLen, uint16_t port);
DLL_EXPORT TAOS *taos_connect_auth(const char *ip, const char *user, const char *auth, const char *db, uint16_t port);
DLL_EXPORT void taos_close(TAOS *taos);
@@ -136,62 +137,63 @@ const char *taos_data_type(int type);
DLL_EXPORT TAOS_STMT *taos_stmt_init(TAOS *taos);
DLL_EXPORT int taos_stmt_prepare(TAOS_STMT *stmt, const char *sql, unsigned long length);
-DLL_EXPORT int taos_stmt_set_tbname_tags(TAOS_STMT* stmt, const char* name, TAOS_BIND* tags);
-DLL_EXPORT int taos_stmt_set_tbname(TAOS_STMT* stmt, const char* name);
-DLL_EXPORT int taos_stmt_set_sub_tbname(TAOS_STMT* stmt, const char* name);
+DLL_EXPORT int taos_stmt_set_tbname_tags(TAOS_STMT *stmt, const char *name, TAOS_BIND *tags);
+DLL_EXPORT int taos_stmt_set_tbname(TAOS_STMT *stmt, const char *name);
+DLL_EXPORT int taos_stmt_set_sub_tbname(TAOS_STMT *stmt, const char *name);
-DLL_EXPORT int taos_stmt_is_insert(TAOS_STMT *stmt, int *insert);
-DLL_EXPORT int taos_stmt_num_params(TAOS_STMT *stmt, int *nums);
-DLL_EXPORT int taos_stmt_get_param(TAOS_STMT *stmt, int idx, int *type, int *bytes);
-DLL_EXPORT int taos_stmt_bind_param(TAOS_STMT *stmt, TAOS_BIND *bind);
-DLL_EXPORT int taos_stmt_bind_param_batch(TAOS_STMT* stmt, TAOS_MULTI_BIND* bind);
-DLL_EXPORT int taos_stmt_bind_single_param_batch(TAOS_STMT* stmt, TAOS_MULTI_BIND* bind, int colIdx);
-DLL_EXPORT int taos_stmt_add_batch(TAOS_STMT *stmt);
-DLL_EXPORT int taos_stmt_execute(TAOS_STMT *stmt);
-DLL_EXPORT TAOS_RES * taos_stmt_use_result(TAOS_STMT *stmt);
-DLL_EXPORT int taos_stmt_close(TAOS_STMT *stmt);
-DLL_EXPORT char * taos_stmt_errstr(TAOS_STMT *stmt);
+DLL_EXPORT int taos_stmt_is_insert(TAOS_STMT *stmt, int *insert);
+DLL_EXPORT int taos_stmt_num_params(TAOS_STMT *stmt, int *nums);
+DLL_EXPORT int taos_stmt_get_param(TAOS_STMT *stmt, int idx, int *type, int *bytes);
+DLL_EXPORT int taos_stmt_bind_param(TAOS_STMT *stmt, TAOS_BIND *bind);
+DLL_EXPORT int taos_stmt_bind_param_batch(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind);
+DLL_EXPORT int taos_stmt_bind_single_param_batch(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind, int colIdx);
+DLL_EXPORT int taos_stmt_add_batch(TAOS_STMT *stmt);
+DLL_EXPORT int taos_stmt_execute(TAOS_STMT *stmt);
+DLL_EXPORT TAOS_RES *taos_stmt_use_result(TAOS_STMT *stmt);
+DLL_EXPORT int taos_stmt_close(TAOS_STMT *stmt);
+DLL_EXPORT char *taos_stmt_errstr(TAOS_STMT *stmt);
DLL_EXPORT TAOS_RES *taos_query(TAOS *taos, const char *sql);
DLL_EXPORT TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen);
DLL_EXPORT TAOS_ROW taos_fetch_row(TAOS_RES *res);
-DLL_EXPORT int taos_result_precision(TAOS_RES *res); // get the time precision of result
-DLL_EXPORT void taos_free_result(TAOS_RES *res);
-DLL_EXPORT int taos_field_count(TAOS_RES *res);
-DLL_EXPORT int taos_num_fields(TAOS_RES *res);
-DLL_EXPORT int taos_affected_rows(TAOS_RES *res);
+DLL_EXPORT int taos_result_precision(TAOS_RES *res); // get the time precision of result
+DLL_EXPORT void taos_free_result(TAOS_RES *res);
+DLL_EXPORT int taos_field_count(TAOS_RES *res);
+DLL_EXPORT int taos_num_fields(TAOS_RES *res);
+DLL_EXPORT int taos_affected_rows(TAOS_RES *res);
DLL_EXPORT TAOS_FIELD *taos_fetch_fields(TAOS_RES *res);
-DLL_EXPORT int taos_select_db(TAOS *taos, const char *db);
-DLL_EXPORT int taos_print_row(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields);
-DLL_EXPORT void taos_stop_query(TAOS_RES *res);
-DLL_EXPORT bool taos_is_null(TAOS_RES *res, int32_t row, int32_t col);
-DLL_EXPORT int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows);
-DLL_EXPORT int taos_validate_sql(TAOS *taos, const char *sql);
+DLL_EXPORT int taos_select_db(TAOS *taos, const char *db);
+DLL_EXPORT int taos_print_row(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields);
+DLL_EXPORT void taos_stop_query(TAOS_RES *res);
+DLL_EXPORT bool taos_is_null(TAOS_RES *res, int32_t row, int32_t col);
+DLL_EXPORT int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows);
+DLL_EXPORT int taos_validate_sql(TAOS *taos, const char *sql);
-DLL_EXPORT int* taos_fetch_lengths(TAOS_RES *res);
+DLL_EXPORT int *taos_fetch_lengths(TAOS_RES *res);
DLL_EXPORT const char *taos_get_server_info(TAOS *taos);
DLL_EXPORT const char *taos_get_client_info();
DLL_EXPORT const char *taos_errstr(TAOS_RES *tres);
-DLL_EXPORT int taos_errno(TAOS_RES *tres);
+DLL_EXPORT int taos_errno(TAOS_RES *tres);
DLL_EXPORT void taos_query_a(TAOS *taos, const char *sql, __taos_async_fn_t fp, void *param);
DLL_EXPORT void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, void *param);
-typedef void (*__taos_sub_fn_t)(TAOS_SUB* tsub, TAOS_RES *res, void* param, int code);
-DLL_EXPORT TAOS_SUB *taos_subscribe(TAOS* taos, int restart, const char* topic, const char *sql, __taos_sub_fn_t fp, void *param, int interval);
+typedef void (*__taos_sub_fn_t)(TAOS_SUB *tsub, TAOS_RES *res, void *param, int code);
+DLL_EXPORT TAOS_SUB *taos_subscribe(TAOS *taos, int restart, const char *topic, const char *sql, __taos_sub_fn_t fp,
+ void *param, int interval);
DLL_EXPORT TAOS_RES *taos_consume(TAOS_SUB *tsub);
DLL_EXPORT void taos_unsubscribe(TAOS_SUB *tsub, int keepProgress);
DLL_EXPORT TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sql, void (*fp)(void *param, TAOS_RES *, TAOS_ROW row),
- int64_t stime, void *param, void (*callback)(void *));
-DLL_EXPORT void taos_close_stream(TAOS_STREAM *tstr);
+ int64_t stime, void *param, void (*callback)(void *));
+DLL_EXPORT void taos_close_stream(TAOS_STREAM *tstr);
-DLL_EXPORT int taos_load_table_info(TAOS *taos, const char* tableNameList);
-DLL_EXPORT TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int protocol, int precision);
+DLL_EXPORT int taos_load_table_info(TAOS *taos, const char *tableNameList);
+DLL_EXPORT TAOS_RES *taos_schemaless_insert(TAOS *taos, char *lines[], int numLines, int protocol, int precision);
/* --------------------------TMQ INTERFACE------------------------------- */
@@ -215,10 +217,10 @@ typedef void(tmq_commit_cb(tmq_t *, tmq_resp_err_t, tmq_topic_vgroup_list_t *, v
DLL_EXPORT tmq_list_t *tmq_list_new();
DLL_EXPORT int32_t tmq_list_append(tmq_list_t *, const char *);
-DLL_EXPORT TAOS_RES *tmq_create_topic(TAOS *taos, const char *name, const char *sql, int sqlLen);
-DLL_EXPORT tmq_t *tmq_consumer_new(void *conn, tmq_conf_t *conf, char *errstr, int32_t errstrLen);
-DLL_EXPORT void tmq_message_destroy(tmq_message_t* tmq_message);
-DLL_EXPORT const char* tmq_err2str(tmq_resp_err_t);
+DLL_EXPORT TAOS_RES *tmq_create_topic(TAOS *taos, const char *name, const char *sql, int sqlLen);
+DLL_EXPORT tmq_t *tmq_consumer_new(void *conn, tmq_conf_t *conf, char *errstr, int32_t errstrLen);
+DLL_EXPORT void tmq_message_destroy(tmq_message_t *tmq_message);
+DLL_EXPORT const char *tmq_err2str(tmq_resp_err_t);
/* ------------------------TMQ CONSUMER INTERFACE------------------------ */
DLL_EXPORT tmq_resp_err_t tmq_subscribe(tmq_t *tmq, tmq_list_t *topic_list);
@@ -227,7 +229,7 @@ DLL_EXPORT tmq_resp_err_t tmq_unsubscribe(tmq_t* tmq);
DLL_EXPORT tmq_resp_err_t tmq_subscription(tmq_t* tmq, tmq_topic_vgroup_list_t** topics);
#endif
DLL_EXPORT tmq_message_t *tmq_consumer_poll(tmq_t *tmq, int64_t blocking_time);
-DLL_EXPORT tmq_resp_err_t tmq_consumer_close(tmq_t* tmq);
+DLL_EXPORT tmq_resp_err_t tmq_consumer_close(tmq_t *tmq);
#if 0
DLL_EXPORT tmq_resp_err_t tmq_assign(tmq_t* tmq, const tmq_topic_vgroup_list_t* vgroups);
DLL_EXPORT tmq_resp_err_t tmq_assignment(tmq_t* tmq, tmq_topic_vgroup_list_t** vgroups);
@@ -251,8 +253,9 @@ DLL_EXPORT void tmq_conf_destroy(tmq_conf_t *conf);
DLL_EXPORT tmq_conf_res_t tmq_conf_set(tmq_conf_t *conf, const char *key, const char *value);
DLL_EXPORT void tmq_conf_set_offset_commit_cb(tmq_conf_t *conf, tmq_commit_cb *cb);
-//temporary used function for demo only
-void tmqShowMsg(tmq_message_t* tmq_message);
+// temporary used function for demo only
+void tmqShowMsg(tmq_message_t *tmq_message);
+int32_t tmqGetSkipLogNum(tmq_message_t *tmq_message);
#ifdef __cplusplus
}
diff --git a/include/common/common.h b/include/common/common.h
index 31f905d47f..f2518e5945 100644
--- a/include/common/common.h
+++ b/include/common/common.h
@@ -17,35 +17,35 @@
#define TDENGINE_COMMON_H
#include "taosdef.h"
-#include "tmsg.h"
#include "tarray.h"
+#include "tmsg.h"
#include "tvariant.h"
-//typedef struct STimeWindow {
-// TSKEY skey;
-// TSKEY ekey;
-//} STimeWindow;
+// typedef struct STimeWindow {
+// TSKEY skey;
+// TSKEY ekey;
+// } STimeWindow;
-//typedef struct {
-// int32_t dataLen;
-// char name[TSDB_TABLE_FNAME_LEN];
-// char *data;
-//} STagData;
+// typedef struct {
+// int32_t dataLen;
+// char name[TSDB_TABLE_FNAME_LEN];
+// char *data;
+// } STagData;
-//typedef struct SSchema {
-// uint8_t type;
-// char name[TSDB_COL_NAME_LEN];
-// int16_t colId;
-// int16_t bytes;
-//} SSchema;
+// typedef struct SSchema {
+// uint8_t type;
+// char name[TSDB_COL_NAME_LEN];
+// int16_t colId;
+// int16_t bytes;
+// } SSchema;
-#define TMQ_REQ_TYPE_COMMIT_ONLY 0
-#define TMQ_REQ_TYPE_CONSUME_ONLY 1
+#define TMQ_REQ_TYPE_COMMIT_ONLY 0
+#define TMQ_REQ_TYPE_CONSUME_ONLY 1
#define TMQ_REQ_TYPE_CONSUME_AND_COMMIT 2
typedef struct {
uint32_t numOfTables;
- SArray *pGroupList;
- SHashObj *map; // speedup acquire the tableQueryInfo by table uid
+ SArray* pGroupList;
+ SHashObj* map; // speedup acquire the tableQueryInfo by table uid
} STableGroupInfo;
typedef struct SColumnDataAgg {
@@ -67,25 +67,25 @@ typedef struct SDataBlockInfo {
typedef struct SConstantItem {
SColumnInfo info;
- int32_t startRow; // run-length-encoding to save the space for multiple rows
+ int32_t startRow; // run-length-encoding to save the space for multiple rows
int32_t endRow;
SVariant value;
} SConstantItem;
// info.numOfCols = taosArrayGetSize(pDataBlock) + taosArrayGetSize(pConstantList);
typedef struct SSDataBlock {
- SColumnDataAgg *pBlockAgg;
- SArray *pDataBlock; // SArray
- SArray *pConstantList; // SArray, it is a constant/tags value of the corresponding result value.
- SDataBlockInfo info;
+ SColumnDataAgg* pBlockAgg;
+ SArray* pDataBlock; // SArray
+ SArray* pConstantList; // SArray, it is a constant/tags value of the corresponding result value.
+ SDataBlockInfo info;
} SSDataBlock;
// pBlockAgg->numOfNull == info.rows, all data are null
// pBlockAgg->numOfNull == 0, no data are null.
typedef struct SColumnInfoData {
- SColumnInfo info; // TODO filter info needs to be removed
- char *nullbitmap;//
- char *pData; // the corresponding block data in memory
+ SColumnInfo info; // TODO filter info needs to be removed
+ char* nullbitmap; //
+ char* pData; // the corresponding block data in memory
} SColumnInfoData;
static FORCE_INLINE int32_t tEncodeDataBlock(void** buf, const SSDataBlock* pBlock) {
@@ -110,7 +110,7 @@ static FORCE_INLINE int32_t tEncodeDataBlock(void** buf, const SSDataBlock* pBlo
return tlen;
}
-static FORCE_INLINE void* tDecodeDataBlock(void* buf, SSDataBlock* pBlock) {
+static FORCE_INLINE void* tDecodeDataBlock(const void* buf, SSDataBlock* pBlock) {
int32_t sz;
buf = taosDecodeFixedI64(buf, &pBlock->info.uid);
@@ -127,7 +127,7 @@ static FORCE_INLINE void* tDecodeDataBlock(void* buf, SSDataBlock* pBlock) {
buf = taosDecodeBinary(buf, (void**)&data.pData, colSz);
taosArrayPush(pBlock->pDataBlock, &data);
}
- return buf;
+ return (void*)buf;
}
static FORCE_INLINE int32_t tEncodeSMqConsumeRsp(void** buf, const SMqConsumeRsp* pRsp) {
@@ -146,7 +146,7 @@ static FORCE_INLINE int32_t tEncodeSMqConsumeRsp(void** buf, const SMqConsumeRsp
}
tlen += taosEncodeFixedI32(buf, sz);
for (int32_t i = 0; i < sz; i++) {
- SSDataBlock* pBlock = (SSDataBlock*) taosArrayGet(pRsp->pBlockData, i);
+ SSDataBlock* pBlock = (SSDataBlock*)taosArrayGet(pRsp->pBlockData, i);
tlen += tEncodeDataBlock(buf, pBlock);
}
return tlen;
@@ -179,19 +179,18 @@ static FORCE_INLINE void tDeleteSSDataBlock(SSDataBlock* pBlock) {
return;
}
- //int32_t numOfOutput = pBlock->info.numOfCols;
+ // int32_t numOfOutput = pBlock->info.numOfCols;
int32_t sz = taosArrayGetSize(pBlock->pDataBlock);
- for(int32_t i = 0; i < sz; ++i) {
+ for (int32_t i = 0; i < sz; ++i) {
SColumnInfoData* pColInfoData = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, i);
tfree(pColInfoData->pData);
}
taosArrayDestroy(pBlock->pDataBlock);
tfree(pBlock->pBlockAgg);
- //tfree(pBlock);
+ // tfree(pBlock);
}
-
static FORCE_INLINE void tDeleteSMqConsumeRsp(SMqConsumeRsp* pRsp) {
if (pRsp->schemas) {
if (pRsp->schemas->nCols) {
@@ -199,53 +198,53 @@ static FORCE_INLINE void tDeleteSMqConsumeRsp(SMqConsumeRsp* pRsp) {
}
free(pRsp->schemas);
}
- taosArrayDestroyEx(pRsp->pBlockData, (void(*)(void*))tDeleteSSDataBlock);
- pRsp->pBlockData = NULL;
- //for (int i = 0; i < taosArrayGetSize(pRsp->pBlockData); i++) {
- //SSDataBlock* pDataBlock = (SSDataBlock*)taosArrayGet(pRsp->pBlockData, i);
- //tDeleteSSDataBlock(pDataBlock);
+ taosArrayDestroyEx(pRsp->pBlockData, (void (*)(void*))tDeleteSSDataBlock);
+ pRsp->pBlockData = NULL;
+ // for (int i = 0; i < taosArrayGetSize(pRsp->pBlockData); i++) {
+ // SSDataBlock* pDataBlock = (SSDataBlock*)taosArrayGet(pRsp->pBlockData, i);
+ // tDeleteSSDataBlock(pDataBlock);
//}
}
//======================================================================================================================
// the following structure shared by parser and executor
typedef struct SColumn {
- uint64_t uid;
- char name[TSDB_COL_NAME_LEN];
- int8_t flag; // column type: normal column, tag, or user-input column (integer/float/string)
- SColumnInfo info;
+ uint64_t uid;
+ char name[TSDB_COL_NAME_LEN];
+ int8_t flag; // column type: normal column, tag, or user-input column (integer/float/string)
+ SColumnInfo info;
} SColumn;
typedef struct SLimit {
- int64_t limit;
- int64_t offset;
+ int64_t limit;
+ int64_t offset;
} SLimit;
typedef struct SOrder {
- uint32_t order;
- SColumn col;
+ uint32_t order;
+ SColumn col;
} SOrder;
typedef struct SGroupbyExpr {
- SArray* columnInfo; // SArray, group by columns information
- bool groupbyTag; // group by tag or column
+ SArray* columnInfo; // SArray, group by columns information
+ bool groupbyTag; // group by tag or column
} SGroupbyExpr;
// the structure for sql function in select clause
typedef struct SSqlExpr {
- char token[TSDB_COL_NAME_LEN]; // original token
- SSchema resSchema;
+ char token[TSDB_COL_NAME_LEN]; // original token
+ SSchema resSchema;
- int32_t numOfCols;
- SColumn* pColumns; // data columns that are required by query
- int32_t interBytes; // inter result buffer size
- int16_t numOfParams; // argument value of each function
- SVariant param[3]; // parameters are not more than 3
+ int32_t numOfCols;
+ SColumn* pColumns; // data columns that are required by query
+ int32_t interBytes; // inter result buffer size
+ int16_t numOfParams; // argument value of each function
+ SVariant param[3]; // parameters are not more than 3
} SSqlExpr;
typedef struct SExprInfo {
- struct SSqlExpr base;
- struct tExprNode *pExpr;
+ struct SSqlExpr base;
+ struct tExprNode* pExpr;
} SExprInfo;
typedef struct SStateWindow {
@@ -253,11 +252,11 @@ typedef struct SStateWindow {
} SStateWindow;
typedef struct SSessionWindow {
- int64_t gap; // gap between two session window(in microseconds)
+ int64_t gap; // gap between two session window(in microseconds)
SColumn col;
} SSessionWindow;
-#define QUERY_ASC_FORWARD_STEP 1
+#define QUERY_ASC_FORWARD_STEP 1
#define QUERY_DESC_FORWARD_STEP -1
#define GET_FORWARD_DIRECTION_FACTOR(ord) (((ord) == TSDB_ORDER_ASC) ? QUERY_ASC_FORWARD_STEP : QUERY_DESC_FORWARD_STEP)
diff --git a/include/common/taosdef.h b/include/common/taosdef.h
index 46c0c98ff0..e5a7a3563e 100644
--- a/include/common/taosdef.h
+++ b/include/common/taosdef.h
@@ -23,7 +23,7 @@ extern "C" {
#include "taos.h"
#include "tdef.h"
-typedef uint64_t tb_uid_t;
+typedef int64_t tb_uid_t;
#define TSWINDOW_INITIALIZER ((STimeWindow){INT64_MIN, INT64_MAX})
#define TSWINDOW_DESC_INITIALIZER ((STimeWindow){INT64_MAX, INT64_MIN})
@@ -38,12 +38,12 @@ typedef enum {
} EQType;
typedef enum {
- TSDB_SUPER_TABLE = 1, // super table
- TSDB_CHILD_TABLE = 2, // table created from super table
- TSDB_NORMAL_TABLE = 3, // ordinary table
- TSDB_STREAM_TABLE = 4, // table created from stream computing
- TSDB_TEMP_TABLE = 5, // temp table created by nest query
- TSDB_TABLE_MAX = 6
+ TSDB_SUPER_TABLE = 1, // super table
+ TSDB_CHILD_TABLE = 2, // table created from super table
+ TSDB_NORMAL_TABLE = 3, // ordinary table
+ TSDB_STREAM_TABLE = 4, // table created from stream computing
+ TSDB_TEMP_TABLE = 5, // temp table created by nest query
+ TSDB_TABLE_MAX = 6
} ETableType;
typedef enum {
diff --git a/include/common/tmsg.h b/include/common/tmsg.h
index 8ebc8b80f2..80a81ff143 100644
--- a/include/common/tmsg.h
+++ b/include/common/tmsg.h
@@ -25,9 +25,9 @@ extern "C" {
#include "taoserror.h"
#include "tarray.h"
#include "tcoding.h"
-#include "trow.h"
#include "thash.h"
#include "tlist.h"
+#include "trow.h"
/* ------------------------ MESSAGE DEFINITIONS ------------------------ */
#define TD_MSG_NUMBER_
@@ -69,7 +69,7 @@ typedef uint16_t tmsg_t;
#define TSDB_IE_TYPE_DNODE_STATE 7
typedef enum {
- HEARTBEAT_TYPE_MQ = 0,
+ HEARTBEAT_TYPE_MQ = 0,
HEARTBEAT_TYPE_QUERY = 1,
// types can be added here
//
@@ -82,7 +82,6 @@ enum {
HEARTBEAT_KEY_MQ_TMP,
};
-
typedef enum _mgmt_table {
TSDB_MGMT_TABLE_START,
TSDB_MGMT_TABLE_ACCT,
@@ -192,14 +191,14 @@ typedef struct {
// Submit message for one table
typedef struct SSubmitBlk {
- uint64_t uid; // table unique id
- int32_t tid; // table id
- int32_t padding; // TODO just for padding here
- int32_t sversion; // data schema version
- int32_t dataLen; // data part length, not including the SSubmitBlk head
- int32_t schemaLen; // schema length, if length is 0, no schema exists
- int16_t numOfRows; // total number of rows in current submit block
- char data[];
+ int64_t uid; // table unique id
+ int32_t tid; // table id
+ int32_t padding; // TODO just for padding here
+ int32_t sversion; // data schema version
+ int32_t dataLen; // data part length, not including the SSubmitBlk head
+ int32_t schemaLen; // schema length, if length is 0, no schema exists
+ int16_t numOfRows; // total number of rows in current submit block
+ char data[];
} SSubmitBlk;
typedef struct {
@@ -226,7 +225,7 @@ typedef struct {
typedef struct {
int32_t totalLen;
int32_t len;
- STSRow *row;
+ STSRow* row;
} SSubmitBlkIter;
typedef struct {
@@ -303,9 +302,9 @@ typedef struct {
} SConnectReq;
typedef struct SEpSet {
- int8_t inUse;
- int8_t numOfEps;
- SEp eps[TSDB_MAX_REPLICA];
+ int8_t inUse;
+ int8_t numOfEps;
+ SEp eps[TSDB_MAX_REPLICA];
} SEpSet;
static FORCE_INLINE int taosEncodeSEpSet(void** buf, const SEpSet* pEp) {
@@ -328,7 +327,6 @@ static FORCE_INLINE void* taosDecodeSEpSet(void* buf, SEpSet* pEp) {
}
return buf;
}
-
static FORCE_INLINE int32_t tEncodeSEpSet(SCoder* pEncoder, const SEpSet* pEp) {
if (tEncodeI8(pEncoder, pEp->inUse) < 0) return -1;
if (tEncodeI8(pEncoder, pEp->numOfEps) < 0) return -1;
@@ -736,9 +734,9 @@ typedef struct {
} SDnodeCfg;
typedef struct {
- int32_t id;
- int8_t isMnode;
- SEp ep;
+ int32_t id;
+ int8_t isMnode;
+ SEp ep;
} SDnodeEp;
typedef struct {
@@ -818,10 +816,10 @@ typedef struct {
// todo refactor
typedef struct SVgroupInfo {
- int32_t vgId;
- uint32_t hashBegin;
- uint32_t hashEnd;
- SEpSet epset;
+ int32_t vgId;
+ uint32_t hashBegin;
+ uint32_t hashEnd;
+ SEpSet epset;
} SVgroupInfo;
typedef struct {
@@ -1036,7 +1034,7 @@ typedef struct SSubQueryMsg {
uint64_t queryId;
uint64_t taskId;
int8_t taskType;
- uint32_t sqlLen; // the query sql,
+ uint32_t sqlLen; // the query sql,
uint32_t phyLen;
char msg[];
} SSubQueryMsg;
@@ -1055,7 +1053,6 @@ typedef struct {
uint64_t taskId;
} SQueryContinueReq;
-
typedef struct {
SMsgHead header;
uint64_t sId;
@@ -1256,10 +1253,10 @@ typedef struct {
} SMqTmrMsg;
typedef struct {
- const char* key;
- SArray* lostConsumers; //SArray
- SArray* removedConsumers; //SArray
- SArray* newConsumers; //SArray
+ const char* key;
+ SArray* lostConsumers; // SArray
+ SArray* removedConsumers; // SArray
+ SArray* newConsumers; // SArray
} SMqRebSubscribe;
static FORCE_INLINE SMqRebSubscribe* tNewSMqRebSubscribe(const char* key) {
@@ -1289,10 +1286,11 @@ _err:
return NULL;
}
-// this message is sent from mnode to mnode(read thread to write thread), so there is no need for serialization / deserialization
+// this message is sent from mnode to mnode(read thread to write thread), so there is no need for serialization /
+// deserialization
typedef struct {
- //SArray* rebSubscribes; //SArray
- SHashObj* rebSubHash; // SHashObj
+ // SArray* rebSubscribes; //SArray
+ SHashObj* rebSubHash; // SHashObj
} SMqDoRebalanceMsg;
#if 0
@@ -1460,9 +1458,9 @@ static FORCE_INLINE int32_t tEncodeSMsgHead(void** buf, const SMsgHead* pMsg) {
}
typedef struct SMqHbRsp {
- int8_t status; //idle or not
+ int8_t status; // idle or not
int8_t vnodeChanged;
- int8_t epChanged; // should use new epset
+ int8_t epChanged; // should use new epset
int8_t reserved;
SEpSet epSet;
} SMqHbRsp;
@@ -1485,7 +1483,7 @@ static FORCE_INLINE void* taosDecodeSMqHbRsp(void* buf, SMqHbRsp* pRsp) {
}
typedef struct SMqHbOneTopicBatchRsp {
- char topicName[TSDB_TOPIC_FNAME_LEN];
+ char topicName[TSDB_TOPIC_FNAME_LEN];
SArray* rsps; // SArray
} SMqHbOneTopicBatchRsp;
@@ -1515,8 +1513,8 @@ static FORCE_INLINE void* taosDecodeSMqHbOneTopicBatchRsp(void* buf, SMqHbOneTop
}
typedef struct SMqHbBatchRsp {
- int64_t consumerId;
- SArray* batchRsps; // SArray
+ int64_t consumerId;
+ SArray* batchRsps; // SArray
} SMqHbBatchRsp;
static FORCE_INLINE int taosEncodeSMqHbBatchRsp(void** buf, const SMqHbBatchRsp* pBatchRsp) {
@@ -1525,7 +1523,7 @@ static FORCE_INLINE int taosEncodeSMqHbBatchRsp(void** buf, const SMqHbBatchRsp*
int32_t sz;
tlen += taosEncodeFixedI32(buf, sz);
for (int32_t i = 0; i < sz; i++) {
- SMqHbOneTopicBatchRsp* pRsp = (SMqHbOneTopicBatchRsp*) taosArrayGet(pBatchRsp->batchRsps, i);
+ SMqHbOneTopicBatchRsp* pRsp = (SMqHbOneTopicBatchRsp*)taosArrayGet(pBatchRsp->batchRsps, i);
tlen += taosEncodeSMqHbOneTopicBatchRsp(buf, pRsp);
}
return tlen;
@@ -1537,7 +1535,7 @@ static FORCE_INLINE void* taosDecodeSMqHbBatchRsp(void* buf, SMqHbBatchRsp* pBat
buf = taosDecodeFixedI32(buf, &sz);
pBatchRsp->batchRsps = taosArrayInit(sz, sizeof(SMqHbOneTopicBatchRsp));
for (int32_t i = 0; i < sz; i++) {
- SMqHbOneTopicBatchRsp rsp;
+ SMqHbOneTopicBatchRsp rsp;
buf = taosDecodeSMqHbOneTopicBatchRsp(buf, &rsp);
buf = taosArrayPush(pBatchRsp->batchRsps, &rsp);
}
@@ -1709,10 +1707,10 @@ static FORCE_INLINE void* taosDecodeSMqHbTopicInfoMsg(void* buf, SMqHbTopicInfo*
}
typedef struct SMqHbMsg {
- int32_t status; // ask hb endpoint
- int32_t epoch;
- int64_t consumerId;
- SArray* pTopics; // SArray
+ int32_t status; // ask hb endpoint
+ int32_t epoch;
+ int64_t consumerId;
+ SArray* pTopics; // SArray
} SMqHbMsg;
static FORCE_INLINE int taosEncodeSMqMsg(void** buf, const SMqHbMsg* pMsg) {
@@ -1745,15 +1743,15 @@ static FORCE_INLINE void* taosDecodeSMqMsg(void* buf, SMqHbMsg* pMsg) {
}
typedef struct {
- int64_t leftForVer;
- int32_t vgId;
- int64_t consumerId;
- char topicName[TSDB_TOPIC_FNAME_LEN];
- char cgroup[TSDB_CONSUMER_GROUP_LEN];
- char* sql;
- char* logicalPlan;
- char* physicalPlan;
- char* qmsg;
+ int64_t leftForVer;
+ int32_t vgId;
+ int64_t consumerId;
+ char topicName[TSDB_TOPIC_FNAME_LEN];
+ char cgroup[TSDB_CONSUMER_GROUP_LEN];
+ char* sql;
+ char* logicalPlan;
+ char* physicalPlan;
+ char* qmsg;
} SMqSetCVgReq;
static FORCE_INLINE int32_t tEncodeSMqSetCVgReq(void** buf, const SMqSetCVgReq* pReq) {
@@ -1784,16 +1782,16 @@ static FORCE_INLINE void* tDecodeSMqSetCVgReq(void* buf, SMqSetCVgReq* pReq) {
}
typedef struct {
- int64_t leftForVer;
- int32_t vgId;
- int64_t oldConsumerId;
- int64_t newConsumerId;
- //char topicName[TSDB_TOPIC_FNAME_LEN];
- //char cgroup[TSDB_CONSUMER_GROUP_LEN];
- //char* sql;
- //char* logicalPlan;
- //char* physicalPlan;
- //char* qmsg;
+ int64_t leftForVer;
+ int32_t vgId;
+ int64_t oldConsumerId;
+ int64_t newConsumerId;
+ // char topicName[TSDB_TOPIC_FNAME_LEN];
+ // char cgroup[TSDB_CONSUMER_GROUP_LEN];
+ // char* sql;
+ // char* logicalPlan;
+ // char* physicalPlan;
+ // char* qmsg;
} SMqMVRebReq;
static FORCE_INLINE int32_t tEncodeSMqMVRebReq(void** buf, const SMqMVRebReq* pReq) {
@@ -1802,13 +1800,13 @@ static FORCE_INLINE int32_t tEncodeSMqMVRebReq(void** buf, const SMqMVRebReq* pR
tlen += taosEncodeFixedI32(buf, pReq->vgId);
tlen += taosEncodeFixedI64(buf, pReq->oldConsumerId);
tlen += taosEncodeFixedI64(buf, pReq->newConsumerId);
- //tlen += taosEncodeString(buf, pReq->topicName);
- //tlen += taosEncodeString(buf, pReq->cgroup);
- //tlen += taosEncodeString(buf, pReq->sql);
- //tlen += taosEncodeString(buf, pReq->logicalPlan);
- //tlen += taosEncodeString(buf, pReq->physicalPlan);
- //tlen += taosEncodeString(buf, pReq->qmsg);
- //tlen += tEncodeSSubQueryMsg(buf, &pReq->msg);
+ // tlen += taosEncodeString(buf, pReq->topicName);
+ // tlen += taosEncodeString(buf, pReq->cgroup);
+ // tlen += taosEncodeString(buf, pReq->sql);
+ // tlen += taosEncodeString(buf, pReq->logicalPlan);
+ // tlen += taosEncodeString(buf, pReq->physicalPlan);
+ // tlen += taosEncodeString(buf, pReq->qmsg);
+ // tlen += tEncodeSSubQueryMsg(buf, &pReq->msg);
return tlen;
}
@@ -1817,13 +1815,13 @@ static FORCE_INLINE void* tDecodeSMqMVRebReq(void* buf, SMqMVRebReq* pReq) {
buf = taosDecodeFixedI32(buf, &pReq->vgId);
buf = taosDecodeFixedI64(buf, &pReq->oldConsumerId);
buf = taosDecodeFixedI64(buf, &pReq->newConsumerId);
- //buf = taosDecodeStringTo(buf, pReq->topicName);
- //buf = taosDecodeStringTo(buf, pReq->cgroup);
- //buf = taosDecodeString(buf, &pReq->sql);
- //buf = taosDecodeString(buf, &pReq->logicalPlan);
- //buf = taosDecodeString(buf, &pReq->physicalPlan);
- //buf = taosDecodeString(buf, &pReq->qmsg);
- //buf = tDecodeSSubQueryMsg(buf, &pReq->msg);
+ // buf = taosDecodeStringTo(buf, pReq->topicName);
+ // buf = taosDecodeStringTo(buf, pReq->cgroup);
+ // buf = taosDecodeString(buf, &pReq->sql);
+ // buf = taosDecodeString(buf, &pReq->logicalPlan);
+ // buf = taosDecodeString(buf, &pReq->physicalPlan);
+ // buf = taosDecodeString(buf, &pReq->qmsg);
+ // buf = tDecodeSSubQueryMsg(buf, &pReq->msg);
return buf;
}
@@ -1845,7 +1843,7 @@ typedef struct {
typedef struct {
uint32_t nCols;
- SSchema *pSchema;
+ SSchema* pSchema;
} SSchemaWrapper;
static FORCE_INLINE int32_t taosEncodeSSchema(void** buf, const SSchema* pSchema) {
@@ -1884,7 +1882,7 @@ static FORCE_INLINE int32_t tDecodeSSchema(SCoder* pDecoder, SSchema* pSchema) {
static FORCE_INLINE int32_t tEncodeSSchemaWrapper(void** buf, const SSchemaWrapper* pSW) {
int32_t tlen = 0;
tlen += taosEncodeFixedU32(buf, pSW->nCols);
- for (int32_t i = 0; i < pSW->nCols; i ++) {
+ for (int32_t i = 0; i < pSW->nCols; i++) {
tlen += taosEncodeSSchema(buf, &pSW->pSchema[i]);
}
return tlen;
@@ -1892,20 +1890,21 @@ static FORCE_INLINE int32_t tEncodeSSchemaWrapper(void** buf, const SSchemaWrapp
static FORCE_INLINE void* tDecodeSSchemaWrapper(void* buf, SSchemaWrapper* pSW) {
buf = taosDecodeFixedU32(buf, &pSW->nCols);
- pSW->pSchema = (SSchema*) calloc(pSW->nCols, sizeof(SSchema));
+ pSW->pSchema = (SSchema*)calloc(pSW->nCols, sizeof(SSchema));
if (pSW->pSchema == NULL) {
return NULL;
}
- for (int32_t i = 0; i < pSW->nCols; i ++) {
+
+ for (int32_t i = 0; i < pSW->nCols; i++) {
buf = taosDecodeSSchema(buf, &pSW->pSchema[i]);
}
return buf;
}
typedef struct {
- int64_t uid;
- int32_t numOfRows;
- char* colData;
+ int64_t uid;
+ int32_t numOfRows;
+ char* colData;
} SMqTbData;
typedef struct {
@@ -1927,24 +1926,24 @@ typedef struct {
int64_t rspOffset;
int32_t skipLogNum;
int32_t numOfTopics;
- SArray* pBlockData; //SArray
+ SArray* pBlockData; // SArray
} SMqConsumeRsp;
// one req for one vg+topic
typedef struct {
- SMsgHead head;
- //0: commit only, current offset
- //1: consume only, poll next offset
- //2: commit current and consume next offset
- int32_t reqType;
+ SMsgHead head;
+ // 0: commit only, current offset
+ // 1: consume only, poll next offset
+ // 2: commit current and consume next offset
+ int32_t reqType;
- int64_t reqId;
- int64_t consumerId;
- int64_t blockingTime;
- char cgroup[TSDB_CONSUMER_GROUP_LEN];
+ int64_t reqId;
+ int64_t consumerId;
+ int64_t blockingTime;
+ char cgroup[TSDB_CONSUMER_GROUP_LEN];
- int64_t offset;
- char topic[TSDB_TOPIC_FNAME_LEN];
+ int64_t offset;
+ char topic[TSDB_TOPIC_FNAME_LEN];
} SMqConsumeReq;
typedef struct {
@@ -1954,7 +1953,7 @@ typedef struct {
typedef struct {
char topic[TSDB_TOPIC_FNAME_LEN];
- SArray* vgs; // SArray
+ SArray* vgs; // SArray
} SMqSubTopicEp;
typedef struct {
@@ -1964,9 +1963,7 @@ typedef struct {
SArray* topics; // SArray
} SMqCMGetSubEpRsp;
-static FORCE_INLINE void tDeleteSMqSubTopicEp(SMqSubTopicEp* pSubTopicEp) {
- taosArrayDestroy(pSubTopicEp->vgs);
-}
+static FORCE_INLINE void tDeleteSMqSubTopicEp(SMqSubTopicEp* pSubTopicEp) { taosArrayDestroy(pSubTopicEp->vgs); }
static FORCE_INLINE int32_t tEncodeSMqSubVgEp(void** buf, const SMqSubVgEp* pVgEp) {
int32_t tlen = 0;
@@ -1982,7 +1979,7 @@ static FORCE_INLINE void* tDecodeSMqSubVgEp(void* buf, SMqSubVgEp* pVgEp) {
}
static FORCE_INLINE void tDeleteSMqCMGetSubEpRsp(SMqCMGetSubEpRsp* pRsp) {
- taosArrayDestroyEx(pRsp->topics, (void (*)(void*)) tDeleteSMqSubTopicEp);
+ taosArrayDestroyEx(pRsp->topics, (void (*)(void*))tDeleteSMqSubTopicEp);
}
static FORCE_INLINE int32_t tEncodeSMqSubTopicEp(void** buf, const SMqSubTopicEp* pTopicEp) {
@@ -2049,4 +2046,4 @@ static FORCE_INLINE void* tDecodeSMqCMGetSubEpRsp(void* buf, SMqCMGetSubEpRsp* p
}
#endif
-#endif /*_TD_COMMON_TAOS_MSG_H_*/
\ No newline at end of file
+#endif /*_TD_COMMON_TAOS_MSG_H_*/
diff --git a/include/nodes/nodes.h b/include/libs/nodes/nodes.h
similarity index 100%
rename from include/nodes/nodes.h
rename to include/libs/nodes/nodes.h
diff --git a/include/nodes/nodesShowStmts.h b/include/libs/nodes/nodesShowStmts.h
similarity index 100%
rename from include/nodes/nodesShowStmts.h
rename to include/libs/nodes/nodesShowStmts.h
diff --git a/include/nodes/querynodes.h b/include/libs/nodes/querynodes.h
similarity index 100%
rename from include/nodes/querynodes.h
rename to include/libs/nodes/querynodes.h
diff --git a/include/util/tcoding.h b/include/util/tcoding.h
index c105ce1ab9..fed9d12cee 100644
--- a/include/util/tcoding.h
+++ b/include/util/tcoding.h
@@ -37,7 +37,7 @@ static FORCE_INLINE int taosEncodeFixedU8(void **buf, uint8_t value) {
return (int)sizeof(value);
}
-static FORCE_INLINE void *taosDecodeFixedU8(void *buf, uint8_t *value) {
+static FORCE_INLINE void *taosDecodeFixedU8(const void *buf, uint8_t *value) {
*value = ((uint8_t *)buf)[0];
return POINTER_SHIFT(buf, sizeof(*value));
}
@@ -51,7 +51,7 @@ static FORCE_INLINE int taosEncodeFixedI8(void **buf, int8_t value) {
return (int)sizeof(value);
}
-static FORCE_INLINE void *taosDecodeFixedI8(void *buf, int8_t *value) {
+static FORCE_INLINE void *taosDecodeFixedI8(const void *buf, int8_t *value) {
*value = ((int8_t *)buf)[0];
return POINTER_SHIFT(buf, sizeof(*value));
}
@@ -71,7 +71,7 @@ static FORCE_INLINE int taosEncodeFixedU16(void **buf, uint16_t value) {
return (int)sizeof(value);
}
-static FORCE_INLINE void *taosDecodeFixedU16(void *buf, uint16_t *value) {
+static FORCE_INLINE void *taosDecodeFixedU16(const void *buf, uint16_t *value) {
if (IS_LITTLE_ENDIAN()) {
memcpy(value, buf, sizeof(*value));
} else {
@@ -87,9 +87,9 @@ static FORCE_INLINE int taosEncodeFixedI16(void **buf, int16_t value) {
return taosEncodeFixedU16(buf, ZIGZAGE(int16_t, value));
}
-static FORCE_INLINE void *taosDecodeFixedI16(void *buf, int16_t *value) {
+static FORCE_INLINE void *taosDecodeFixedI16(const void *buf, int16_t *value) {
uint16_t tvalue = 0;
- void * ret = taosDecodeFixedU16(buf, &tvalue);
+ void *ret = taosDecodeFixedU16(buf, &tvalue);
*value = ZIGZAGD(int16_t, tvalue);
return ret;
}
@@ -111,7 +111,7 @@ static FORCE_INLINE int taosEncodeFixedU32(void **buf, uint32_t value) {
return (int)sizeof(value);
}
-static FORCE_INLINE void *taosDecodeFixedU32(void *buf, uint32_t *value) {
+static FORCE_INLINE void *taosDecodeFixedU32(const void *buf, uint32_t *value) {
if (IS_LITTLE_ENDIAN()) {
memcpy(value, buf, sizeof(*value));
} else {
@@ -129,9 +129,9 @@ static FORCE_INLINE int taosEncodeFixedI32(void **buf, int32_t value) {
return taosEncodeFixedU32(buf, ZIGZAGE(int32_t, value));
}
-static FORCE_INLINE void *taosDecodeFixedI32(void *buf, int32_t *value) {
+static FORCE_INLINE void *taosDecodeFixedI32(const void *buf, int32_t *value) {
uint32_t tvalue = 0;
- void * ret = taosDecodeFixedU32(buf, &tvalue);
+ void *ret = taosDecodeFixedU32(buf, &tvalue);
*value = ZIGZAGD(int32_t, tvalue);
return ret;
}
@@ -158,7 +158,7 @@ static FORCE_INLINE int taosEncodeFixedU64(void **buf, uint64_t value) {
return (int)sizeof(value);
}
-static FORCE_INLINE void *taosDecodeFixedU64(void *buf, uint64_t *value) {
+static FORCE_INLINE void *taosDecodeFixedU64(const void *buf, uint64_t *value) {
if (IS_LITTLE_ENDIAN()) {
memcpy(value, buf, sizeof(*value));
} else {
@@ -180,9 +180,9 @@ static FORCE_INLINE int taosEncodeFixedI64(void **buf, int64_t value) {
return taosEncodeFixedU64(buf, ZIGZAGE(int64_t, value));
}
-static FORCE_INLINE void *taosDecodeFixedI64(void *buf, int64_t *value) {
+static FORCE_INLINE void *taosDecodeFixedI64(const void *buf, int64_t *value) {
uint64_t tvalue = 0;
- void * ret = taosDecodeFixedU64(buf, &tvalue);
+ void *ret = taosDecodeFixedU64(buf, &tvalue);
*value = ZIGZAGD(int64_t, tvalue);
return ret;
}
@@ -205,7 +205,7 @@ static FORCE_INLINE int taosEncodeVariantU16(void **buf, uint16_t value) {
return i + 1;
}
-static FORCE_INLINE void *taosDecodeVariantU16(void *buf, uint16_t *value) {
+static FORCE_INLINE void *taosDecodeVariantU16(const void *buf, uint16_t *value) {
int i = 0;
uint16_t tval = 0;
*value = 0;
@@ -228,9 +228,9 @@ static FORCE_INLINE int taosEncodeVariantI16(void **buf, int16_t value) {
return taosEncodeVariantU16(buf, ZIGZAGE(int16_t, value));
}
-static FORCE_INLINE void *taosDecodeVariantI16(void *buf, int16_t *value) {
+static FORCE_INLINE void *taosDecodeVariantI16(const void *buf, int16_t *value) {
uint16_t tvalue = 0;
- void * ret = taosDecodeVariantU16(buf, &tvalue);
+ void *ret = taosDecodeVariantU16(buf, &tvalue);
*value = ZIGZAGD(int16_t, tvalue);
return ret;
}
@@ -253,7 +253,7 @@ static FORCE_INLINE int taosEncodeVariantU32(void **buf, uint32_t value) {
return i + 1;
}
-static FORCE_INLINE void *taosDecodeVariantU32(void *buf, uint32_t *value) {
+static FORCE_INLINE void *taosDecodeVariantU32(const void *buf, uint32_t *value) {
int i = 0;
uint32_t tval = 0;
*value = 0;
@@ -276,9 +276,9 @@ static FORCE_INLINE int taosEncodeVariantI32(void **buf, int32_t value) {
return taosEncodeVariantU32(buf, ZIGZAGE(int32_t, value));
}
-static FORCE_INLINE void *taosDecodeVariantI32(void *buf, int32_t *value) {
+static FORCE_INLINE void *taosDecodeVariantI32(const void *buf, int32_t *value) {
uint32_t tvalue = 0;
- void * ret = taosDecodeVariantU32(buf, &tvalue);
+ void *ret = taosDecodeVariantU32(buf, &tvalue);
*value = ZIGZAGD(int32_t, tvalue);
return ret;
}
@@ -301,7 +301,7 @@ static FORCE_INLINE int taosEncodeVariantU64(void **buf, uint64_t value) {
return i + 1;
}
-static FORCE_INLINE void *taosDecodeVariantU64(void *buf, uint64_t *value) {
+static FORCE_INLINE void *taosDecodeVariantU64(const void *buf, uint64_t *value) {
int i = 0;
uint64_t tval = 0;
*value = 0;
@@ -324,9 +324,9 @@ static FORCE_INLINE int taosEncodeVariantI64(void **buf, int64_t value) {
return taosEncodeVariantU64(buf, ZIGZAGE(int64_t, value));
}
-static FORCE_INLINE void *taosDecodeVariantI64(void *buf, int64_t *value) {
+static FORCE_INLINE void *taosDecodeVariantI64(const void *buf, int64_t *value) {
uint64_t tvalue = 0;
- void * ret = taosDecodeVariantU64(buf, &tvalue);
+ void *ret = taosDecodeVariantU64(buf, &tvalue);
*value = ZIGZAGD(int64_t, tvalue);
return ret;
}
@@ -346,7 +346,7 @@ static FORCE_INLINE int taosEncodeString(void **buf, const char *value) {
return tlen;
}
-static FORCE_INLINE void *taosDecodeString(void *buf, char **value) {
+static FORCE_INLINE void *taosDecodeString(const void *buf, char **value) {
uint64_t size = 0;
buf = taosDecodeVariantU64(buf, &size);
@@ -360,7 +360,7 @@ static FORCE_INLINE void *taosDecodeString(void *buf, char **value) {
return POINTER_SHIFT(buf, size);
}
-static FORCE_INLINE void *taosDecodeStringTo(void *buf, char *value) {
+static FORCE_INLINE void *taosDecodeStringTo(const void *buf, char *value) {
uint64_t size = 0;
buf = taosDecodeVariantU64(buf, &size);
@@ -373,7 +373,7 @@ static FORCE_INLINE void *taosDecodeStringTo(void *buf, char *value) {
// ---- binary
static FORCE_INLINE int taosEncodeBinary(void **buf, const void *value, int32_t valueLen) {
- int tlen = 0;
+ int tlen = 0;
if (buf != NULL) {
memcpy(*buf, value, valueLen);
@@ -384,8 +384,7 @@ static FORCE_INLINE int taosEncodeBinary(void **buf, const void *value, int32_t
return tlen;
}
-static FORCE_INLINE void *taosDecodeBinary(void *buf, void **value, int32_t valueLen) {
-
+static FORCE_INLINE void *taosDecodeBinary(const void *buf, void **value, int32_t valueLen) {
*value = malloc((size_t)valueLen);
if (*value == NULL) return NULL;
memcpy(*value, buf, (size_t)valueLen);
@@ -393,8 +392,7 @@ static FORCE_INLINE void *taosDecodeBinary(void *buf, void **value, int32_t valu
return POINTER_SHIFT(buf, valueLen);
}
-static FORCE_INLINE void *taosDecodeBinaryTo(void *buf, void *value, int32_t valueLen) {
-
+static FORCE_INLINE void *taosDecodeBinaryTo(const void *buf, void *value, int32_t valueLen) {
memcpy(value, buf, (size_t)valueLen);
return POINTER_SHIFT(buf, valueLen);
}
diff --git a/source/CMakeLists.txt b/source/CMakeLists.txt
index fbf045b99c..2833b329a7 100644
--- a/source/CMakeLists.txt
+++ b/source/CMakeLists.txt
@@ -3,5 +3,4 @@ add_subdirectory(util)
add_subdirectory(common)
add_subdirectory(libs)
add_subdirectory(client)
-add_subdirectory(dnode)
-add_subdirectory(nodes)
\ No newline at end of file
+add_subdirectory(dnode)
\ No newline at end of file
diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c
index 8b1faaef4e..315a632180 100644
--- a/source/client/src/tmq.c
+++ b/source/client/src/tmq.c
@@ -452,6 +452,12 @@ static char* formatTimestamp(char* buf, int64_t val, int precision) {
return buf;
}
+int32_t tmqGetSkipLogNum(tmq_message_t* tmq_message) {
+ if (tmq_message == NULL) return 0;
+ SMqConsumeRsp* pRsp = (SMqConsumeRsp*)tmq_message;
+ return pRsp->skipLogNum;
+}
+
void tmqShowMsg(tmq_message_t* tmq_message) {
if (tmq_message == NULL) return;
diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c
index 7999edbcff..56c14bcfd3 100644
--- a/source/dnode/mnode/impl/src/mndTopic.c
+++ b/source/dnode/mnode/impl/src/mndTopic.c
@@ -60,8 +60,8 @@ void mndCleanupTopic(SMnode *pMnode) {}
SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
- int32_t logicalPlanLen = strlen(pTopic->logicalPlan) + 1;
- int32_t physicalPlanLen = strlen(pTopic->physicalPlan) + 1;
+ int32_t logicalPlanLen = strlen(pTopic->logicalPlan) + 1;
+ int32_t physicalPlanLen = strlen(pTopic->physicalPlan) + 1;
int32_t size = sizeof(SMqTopicObj) + logicalPlanLen + physicalPlanLen + pTopic->sqlLen + MND_TOPIC_RESERVE_SIZE;
SSdbRaw *pRaw = sdbAllocRaw(SDB_TOPIC, MND_TOPIC_VER_NUMBER, size);
if (pRaw == NULL) goto TOPIC_ENCODE_OVER;
@@ -127,7 +127,7 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) {
SDB_GET_INT32(pRaw, dataPos, &pTopic->sqlLen, TOPIC_DECODE_OVER);
pTopic->sql = calloc(pTopic->sqlLen + 1, sizeof(char));
- SDB_GET_BINARY(pRaw, dataPos, pTopic->sql, pTopic->sqlLen, TOPIC_DECODE_OVER);
+ SDB_GET_BINARY(pRaw, dataPos, pTopic->sql, pTopic->sqlLen, TOPIC_DECODE_OVER);
SDB_GET_INT32(pRaw, dataPos, &len, TOPIC_DECODE_OVER);
pTopic->logicalPlan = calloc(len + 1, sizeof(char));
@@ -248,12 +248,21 @@ static int32_t mndCreateTopic(SMnode *pMnode, SMnodeMsg *pMsg, SCMCreateTopicReq
SSdbRaw *pTopicRaw = mndTopicActionEncode(&topicObj);
if (pTopicRaw == NULL) return -1;
if (sdbSetRawStatus(pTopicRaw, SDB_STATUS_READY) != 0) return -1;
+ /*STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pMsg->rpcMsg);*/
+ /*mndTransAppendRedolog(pTrans, pTopicRaw);*/
+ /*if (mndTransPrepare(pMnode, pTrans) != 0) {*/
+ /*mError("mq-createTopic-trans:%d, failed to prepare since %s", pTrans->id, terrstr());*/
+ /*mndTransDrop(pTrans);*/
+ /*return -1;*/
+ /*}*/
+ /*mndTransDrop(pTrans);*/
+ /*return 0;*/
return sdbWrite(pMnode->pSdb, pTopicRaw);
}
static int32_t mndProcessCreateTopicMsg(SMnodeMsg *pMsg) {
- SMnode *pMnode = pMsg->pMnode;
- char *msgStr = pMsg->rpcMsg.pCont;
+ SMnode *pMnode = pMsg->pMnode;
+ char *msgStr = pMsg->rpcMsg.pCont;
SCMCreateTopicReq createTopicReq = {0};
tDeserializeSCMCreateTopicReq(msgStr, &createTopicReq);
@@ -288,13 +297,13 @@ static int32_t mndProcessCreateTopicMsg(SMnodeMsg *pMsg) {
int32_t code = mndCreateTopic(pMnode, pMsg, &createTopicReq, pDb);
mndReleaseDb(pMnode, pDb);
- if (code != 0) {
+ if (code != TSDB_CODE_SUCCESS) {
terrno = code;
mError("topic:%s, failed to create since %s", createTopicReq.name, terrstr());
return -1;
}
- return TSDB_CODE_SUCCESS;
+ return 0;
}
static int32_t mndDropTopic(SMnode *pMnode, SMnodeMsg *pMsg, SMqTopicObj *pTopic) { return 0; }
diff --git a/source/dnode/vnode/inc/tq.h b/source/dnode/vnode/inc/tq.h
index 9d396e0771..a516f423bb 100644
--- a/source/dnode/vnode/inc/tq.h
+++ b/source/dnode/vnode/inc/tq.h
@@ -38,7 +38,7 @@ extern "C" {
typedef struct STQ STQ;
// memory allocator provided by vnode
-typedef struct STqMemRef {
+typedef struct {
SMemAllocatorFactory* pAllocatorFactory;
SMemAllocator* pAllocator;
} STqMemRef;
diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h
index 520a4d026a..394076f433 100644
--- a/source/dnode/vnode/inc/vnode.h
+++ b/source/dnode/vnode/inc/vnode.h
@@ -35,12 +35,12 @@ typedef struct SDnode SDnode;
typedef int32_t (*PutReqToVQueryQFp)(SDnode *pDnode, struct SRpcMsg *pReq);
typedef int32_t (*SendReqToDnodeFp)(SDnode *pDnode, struct SEpSet *epSet, struct SRpcMsg *rpcMsg);
-typedef struct STqCfg {
+typedef struct {
// TODO
int32_t reserved;
} STqCfg;
-typedef struct SVnodeCfg {
+typedef struct {
int32_t vgId;
uint64_t dbId;
SDnode *pDnode;
@@ -68,9 +68,9 @@ typedef struct {
SendReqToDnodeFp sendReqToDnodeFp;
} SVnodeOpt;
-typedef struct STqReadHandle {
+typedef struct {
int64_t ver;
- uint64_t tbUid;
+ int64_t tbUid;
SHashObj *tbIdHash;
const SSubmitMsg *pMsg;
SSubmitBlk *pBlock;
@@ -200,7 +200,7 @@ int32_t vnodeCompact(SVnode *pVnode);
int32_t vnodeSync(SVnode *pVnode);
int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad);
-/* ------------------------- TQ QUERY -------------------------- */
+/* ------------------------- TQ READ --------------------------- */
STqReadHandle *tqInitSubmitMsgScanner(SMeta *pMeta);
@@ -208,12 +208,12 @@ static FORCE_INLINE void tqReadHandleSetColIdList(STqReadHandle *pReadHandle, SA
pReadHandle->pColIdList = pColIdList;
}
-// static FORCE_INLINE void tqReadHandleSetTbUid(STqReadHandle* pHandle, const SArray* pTableIdList) {
-// pHandle->tbUid = pTableIdList;
+// static FORCE_INLINE void tqReadHandleSetTbUid(STqReadHandle* pHandle, int64_t tbUid) {
+// pHandle->tbUid = tbUid;
//}
static FORCE_INLINE int tqReadHandleSetTbUidList(STqReadHandle *pHandle, const SArray *tbUidList) {
- pHandle->tbIdHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), true, HASH_NO_LOCK);
+ pHandle->tbIdHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
if (pHandle->tbIdHash == NULL) {
return -1;
}
diff --git a/source/dnode/vnode/src/inc/tqInt.h b/source/dnode/vnode/src/inc/tqInt.h
index 13c0150cd2..344ad992f0 100644
--- a/source/dnode/vnode/src/inc/tqInt.h
+++ b/source/dnode/vnode/src/inc/tqInt.h
@@ -20,6 +20,7 @@
#include "tlog.h"
#include "tq.h"
#include "trpc.h"
+
#ifdef __cplusplus
extern "C" {
#endif
@@ -110,10 +111,11 @@ typedef struct {
char content[];
} STqSerializedHead;
-typedef int (*FTqSerialize)(const void* pObj, STqSerializedHead** ppHead);
-typedef const void* (*FTqDeserialize)(const STqSerializedHead* pHead, void** ppObj);
+typedef int32_t (*FTqSerialize)(const void* pObj, STqSerializedHead** ppHead);
+typedef int32_t (*FTqDeserialize)(void* self, const STqSerializedHead* pHead, void** ppObj);
typedef void (*FTqDelete)(void*);
-typedef struct STqMetaHandle {
+
+typedef struct {
int64_t key;
int64_t offset;
int64_t serializedSize;
@@ -131,6 +133,7 @@ typedef struct STqMetaList {
} STqMetaList;
typedef struct {
+ STQ* pTq;
STqMetaList* bucket[TQ_BUCKET_SIZE];
// a table head
STqMetaList* unpersistHead;
@@ -187,21 +190,22 @@ typedef struct {
char* logicalPlan;
char* physicalPlan;
char* qmsg;
+ int64_t persistedOffset;
int64_t committedOffset;
int64_t currentOffset;
STqBuffer buffer;
SWalReadHandle* pReadhandle;
-} STqTopicHandle;
+} STqTopic;
typedef struct {
int64_t consumerId;
int64_t epoch;
char cgroup[TSDB_TOPIC_FNAME_LEN];
- SArray* topics; // SArray
-} STqConsumerHandle;
+ SArray* topics; // SArray
+} STqConsumer;
-int tqSerializeConsumer(const STqConsumerHandle*, STqSerializedHead**);
-const void* tqDeserializeConsumer(const STqSerializedHead* pHead, STqConsumerHandle**);
+int32_t tqSerializeConsumer(const STqConsumer*, STqSerializedHead**);
+int32_t tqDeserializeConsumer(STQ*, const STqSerializedHead*, STqConsumer**);
static int FORCE_INLINE tqQueryExecuting(int32_t status) { return status; }
diff --git a/source/dnode/vnode/src/inc/tqMetaStore.h b/source/dnode/vnode/src/inc/tqMetaStore.h
index 3bf9bb7138..eb203b7117 100644
--- a/source/dnode/vnode/src/inc/tqMetaStore.h
+++ b/source/dnode/vnode/src/inc/tqMetaStore.h
@@ -23,8 +23,8 @@
extern "C" {
#endif
-STqMetaStore* tqStoreOpen(const char* path, FTqSerialize pSerializer, FTqDeserialize pDeserializer, FTqDelete pDeleter,
- int32_t tqConfigFlag);
+STqMetaStore* tqStoreOpen(STQ* pTq, const char* path, FTqSerialize pSerializer, FTqDeserialize pDeserializer,
+ FTqDelete pDeleter, int32_t tqConfigFlag);
int32_t tqStoreClose(STqMetaStore*);
// int32_t tqStoreDelete(TqMetaStore*);
// int32_t tqStoreCommitAll(TqMetaStore*);
diff --git a/source/dnode/vnode/src/inc/tqOffset.h b/source/dnode/vnode/src/inc/tqOffset.h
new file mode 100644
index 0000000000..b58de26f68
--- /dev/null
+++ b/source/dnode/vnode/src/inc/tqOffset.h
@@ -0,0 +1,40 @@
+/*
+ * Copyright (c) 2019 TAOS Data, Inc.
+ *
+ * This program is free software: you can use, redistribute, and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3
+ * or later ("AGPL"), as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+
+#ifndef _TD_TQ_OFFSET_H_
+#define _TD_TQ_OFFSET_H_
+
+#include "tqInt.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+typedef struct STqOffsetCfg STqOffsetCfg;
+typedef struct STqOffsetStore STqOffsetStore;
+
+STqOffsetStore* STqOffsetOpen(STqOffsetCfg*);
+void STqOffsetClose(STqOffsetStore*);
+
+int64_t tqOffsetFetch(STqOffsetStore* pStore, const char* subscribeKey);
+int32_t tqOffsetCommit(STqOffsetStore* pStore, const char* subscribeKey, int64_t offset);
+int32_t tqOffsetPersist(STqOffsetStore* pStore, const char* subscribeKey);
+int32_t tqOffsetPersistAll(STqOffsetStore* pStore);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /*_TD_TQ_OFFSET_H_*/
diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c
index 8ffedfebe4..252bc889f5 100644
--- a/source/dnode/vnode/src/tq/tq.c
+++ b/source/dnode/vnode/src/tq/tq.c
@@ -50,7 +50,8 @@ STQ* tqOpen(const char* path, SWal* pWal, SMeta* pMeta, STqCfg* tqConfig, SMemAl
// TODO: error code of buffer pool
}
#endif
- pTq->tqMeta = tqStoreOpen(path, (FTqSerialize)tqSerializeConsumer, (FTqDeserialize)tqDeserializeConsumer, free, 0);
+ pTq->tqMeta =
+ tqStoreOpen(pTq, path, (FTqSerialize)tqSerializeConsumer, (FTqDeserialize)tqDeserializeConsumer, free, 0);
if (pTq->tqMeta == NULL) {
free(pTq);
#if 0
@@ -76,19 +77,89 @@ int tqPushMsg(STQ* pTq, void* p, int64_t version) {
return 0;
}
-int tqCommit(STQ* pTq) {
- // do nothing
- return 0;
+int tqCommit(STQ* pTq) { return tqStorePersist(pTq->tqMeta); }
+
+int32_t tqGetTopicHandleSize(const STqTopic* pTopic) {
+ return strlen(pTopic->topicName) + strlen(pTopic->sql) + strlen(pTopic->logicalPlan) + strlen(pTopic->physicalPlan) +
+ strlen(pTopic->qmsg) + sizeof(int64_t) * 3;
}
-int tqSerializeConsumer(const STqConsumerHandle* pConsumer, STqSerializedHead** ppHead) {
- int32_t num = taosArrayGetSize(pConsumer->topics);
- int32_t sz = sizeof(STqSerializedHead) + sizeof(int64_t) * 2 + TSDB_TOPIC_FNAME_LEN +
- num * (sizeof(int64_t) + TSDB_TOPIC_FNAME_LEN);
+int32_t tqGetConsumerHandleSize(const STqConsumer* pConsumer) {
+ int num = taosArrayGetSize(pConsumer->topics);
+ int32_t sz = 0;
+ for (int i = 0; i < num; i++) {
+ STqTopic* pTopic = taosArrayGet(pConsumer->topics, i);
+ sz += tqGetTopicHandleSize(pTopic);
+ }
+ return sz;
+}
+
+static FORCE_INLINE int32_t tEncodeSTqTopic(void** buf, const STqTopic* pTopic) {
+ int32_t tlen = 0;
+ tlen += taosEncodeString(buf, pTopic->topicName);
+ /*tlen += taosEncodeString(buf, pTopic->sql);*/
+ /*tlen += taosEncodeString(buf, pTopic->logicalPlan);*/
+ /*tlen += taosEncodeString(buf, pTopic->physicalPlan);*/
+ tlen += taosEncodeString(buf, pTopic->qmsg);
+ tlen += taosEncodeFixedI64(buf, pTopic->persistedOffset);
+ tlen += taosEncodeFixedI64(buf, pTopic->committedOffset);
+ tlen += taosEncodeFixedI64(buf, pTopic->currentOffset);
+ return tlen;
+}
+
+static FORCE_INLINE const void* tDecodeSTqTopic(const void* buf, STqTopic* pTopic) {
+ buf = taosDecodeStringTo(buf, pTopic->topicName);
+ /*buf = taosDecodeString(buf, &pTopic->sql);*/
+ /*buf = taosDecodeString(buf, &pTopic->logicalPlan);*/
+ /*buf = taosDecodeString(buf, &pTopic->physicalPlan);*/
+ buf = taosDecodeString(buf, &pTopic->qmsg);
+ buf = taosDecodeFixedI64(buf, &pTopic->persistedOffset);
+ buf = taosDecodeFixedI64(buf, &pTopic->committedOffset);
+ buf = taosDecodeFixedI64(buf, &pTopic->currentOffset);
+ return buf;
+}
+
+static FORCE_INLINE int32_t tEncodeSTqConsumer(void** buf, const STqConsumer* pConsumer) {
+ int32_t sz;
+
+ int32_t tlen = 0;
+ tlen += taosEncodeFixedI64(buf, pConsumer->consumerId);
+ tlen += taosEncodeFixedI64(buf, pConsumer->epoch);
+ tlen += taosEncodeString(buf, pConsumer->cgroup);
+ sz = taosArrayGetSize(pConsumer->topics);
+ tlen += taosEncodeFixedI32(buf, sz);
+ for (int32_t i = 0; i < sz; i++) {
+ STqTopic* pTopic = taosArrayGet(pConsumer->topics, i);
+ tlen += tEncodeSTqTopic(buf, pTopic);
+ }
+ return tlen;
+}
+
+static FORCE_INLINE const void* tDecodeSTqConsumer(const void* buf, STqConsumer* pConsumer) {
+ int32_t sz;
+
+ buf = taosDecodeFixedI64(buf, &pConsumer->consumerId);
+ buf = taosDecodeFixedI64(buf, &pConsumer->epoch);
+ buf = taosDecodeStringTo(buf, pConsumer->cgroup);
+ buf = taosDecodeFixedI32(buf, &sz);
+ pConsumer->topics = taosArrayInit(sz, sizeof(STqTopic));
+ if (pConsumer->topics == NULL) return NULL;
+ for (int32_t i = 0; i < sz; i++) {
+ STqTopic pTopic;
+ buf = tDecodeSTqTopic(buf, &pTopic);
+ taosArrayPush(pConsumer->topics, &pTopic);
+ }
+ return buf;
+}
+
+int tqSerializeConsumer(const STqConsumer* pConsumer, STqSerializedHead** ppHead) {
+ int32_t sz = tEncodeSTqConsumer(NULL, pConsumer);
+
if (sz > (*ppHead)->ssize) {
- void* tmpPtr = realloc(*ppHead, sz);
+ void* tmpPtr = realloc(*ppHead, sizeof(STqSerializedHead) + sz);
if (tmpPtr == NULL) {
free(*ppHead);
+ terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
return -1;
}
*ppHead = tmpPtr;
@@ -96,45 +167,155 @@ int tqSerializeConsumer(const STqConsumerHandle* pConsumer, STqSerializedHead**
}
void* ptr = (*ppHead)->content;
- *(int64_t*)ptr = pConsumer->consumerId;
- ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
- *(int64_t*)ptr = pConsumer->epoch;
- ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
- memcpy(ptr, pConsumer->topics, TSDB_TOPIC_FNAME_LEN);
- ptr = POINTER_SHIFT(ptr, TSDB_TOPIC_FNAME_LEN);
- *(int32_t*)ptr = num;
- ptr = POINTER_SHIFT(ptr, sizeof(int32_t));
- for (int32_t i = 0; i < num; i++) {
- STqTopicHandle* pTopic = taosArrayGet(pConsumer->topics, i);
- memcpy(ptr, pTopic->topicName, TSDB_TOPIC_FNAME_LEN);
- ptr = POINTER_SHIFT(ptr, TSDB_TOPIC_FNAME_LEN);
- *(int64_t*)ptr = pTopic->committedOffset;
- POINTER_SHIFT(ptr, sizeof(int64_t));
+ void* abuf = ptr;
+ tEncodeSTqConsumer(&abuf, pConsumer);
+
+ return 0;
+}
+
+int32_t tqDeserializeConsumer(STQ* pTq, const STqSerializedHead* pHead, STqConsumer** ppConsumer) {
+ const void* str = pHead->content;
+ *ppConsumer = calloc(1, sizeof(STqConsumer));
+ if (*ppConsumer == NULL) {
+ terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
+ return -1;
+ }
+ if (tDecodeSTqConsumer(str, *ppConsumer) == NULL) {
+ terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
+ return -1;
+ }
+ STqConsumer* pConsumer = *ppConsumer;
+ int32_t sz = taosArrayGetSize(pConsumer->topics);
+ for (int32_t i = 0; i < sz; i++) {
+ STqTopic* pTopic = taosArrayGet(pConsumer->topics, i);
+ pTopic->pReadhandle = walOpenReadHandle(pTq->pWal);
+ if (pTopic->pReadhandle == NULL) {
+ ASSERT(false);
+ }
+ for (int i = 0; i < TQ_BUFFER_SIZE; i++) {
+ pTopic->buffer.output[i].status = 0;
+ STqReadHandle* pReadHandle = tqInitSubmitMsgScanner(pTq->pMeta);
+ SReadHandle handle = {.reader = pReadHandle, .meta = pTq->pMeta};
+ pTopic->buffer.output[i].pReadHandle = pReadHandle;
+ pTopic->buffer.output[i].task = qCreateStreamExecTaskInfo(pTopic->qmsg, &handle);
+ }
}
return 0;
}
-const void* tqDeserializeConsumer(const STqSerializedHead* pHead, STqConsumerHandle** ppConsumer) {
- STqConsumerHandle* pConsumer = *ppConsumer;
- const void* ptr = pHead->content;
- pConsumer->consumerId = *(int64_t*)ptr;
- ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
- pConsumer->epoch = *(int64_t*)ptr;
- ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
- memcpy(pConsumer->cgroup, ptr, TSDB_TOPIC_FNAME_LEN);
- ptr = POINTER_SHIFT(ptr, TSDB_TOPIC_FNAME_LEN);
- int32_t sz = *(int32_t*)ptr;
- ptr = POINTER_SHIFT(ptr, sizeof(int32_t));
- pConsumer->topics = taosArrayInit(sz, sizeof(STqTopicHandle));
- for (int32_t i = 0; i < sz; i++) {
- /*STqTopicHandle* topicHandle = */
- /*taosArrayPush(pConsumer->topics, );*/
+int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) {
+ SMqConsumeReq* pReq = pMsg->pCont;
+ int64_t consumerId = pReq->consumerId;
+ int64_t fetchOffset = pReq->offset;
+ /*int64_t blockingTime = pReq->blockingTime;*/
+
+ SMqConsumeRsp rsp = {.consumerId = consumerId, .numOfTopics = 0, .pBlockData = NULL};
+
+ STqConsumer* pConsumer = tqHandleGet(pTq->tqMeta, consumerId);
+ if (pConsumer == NULL) {
+ pMsg->pCont = NULL;
+ pMsg->contLen = 0;
+ pMsg->code = -1;
+ rpcSendResponse(pMsg);
+ return 0;
}
- return NULL;
+ int sz = taosArrayGetSize(pConsumer->topics);
+ ASSERT(sz == 1);
+ STqTopic* pTopic = taosArrayGet(pConsumer->topics, 0);
+ ASSERT(strcmp(pTopic->topicName, pReq->topic) == 0);
+ ASSERT(pConsumer->consumerId == consumerId);
+
+ if (pReq->reqType == TMQ_REQ_TYPE_COMMIT_ONLY) {
+ pTopic->committedOffset = pReq->offset;
+ pMsg->pCont = NULL;
+ pMsg->contLen = 0;
+ pMsg->code = 0;
+ rpcSendResponse(pMsg);
+ return 0;
+ }
+
+ if (pReq->reqType == TMQ_REQ_TYPE_CONSUME_AND_COMMIT) {
+ pTopic->committedOffset = pReq->offset - 1;
+ }
+
+ rsp.committedOffset = pTopic->committedOffset;
+ rsp.reqOffset = pReq->offset;
+ rsp.skipLogNum = 0;
+
+ SWalHead* pHead;
+ while (1) {
+ int8_t pos = fetchOffset % TQ_BUFFER_SIZE;
+ if (walReadWithHandle(pTopic->pReadhandle, fetchOffset) < 0) {
+ break;
+ }
+ pHead = pTopic->pReadhandle->pHead;
+ if (pHead->head.msgType == TDMT_VND_SUBMIT) {
+ SSubmitMsg* pCont = (SSubmitMsg*)&pHead->head.body;
+ qTaskInfo_t task = pTopic->buffer.output[pos].task;
+ qSetStreamInput(task, pCont);
+ SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));
+ while (1) {
+ SSDataBlock* pDataBlock;
+ uint64_t ts;
+ if (qExecTask(task, &pDataBlock, &ts) < 0) {
+ ASSERT(false);
+ }
+ if (pDataBlock == NULL) {
+ fetchOffset++;
+ rsp.skipLogNum++;
+ break;
+ }
+
+ taosArrayPush(pRes, pDataBlock);
+ rsp.schemas = pTopic->buffer.output[pos].pReadHandle->pSchemaWrapper;
+ rsp.rspOffset = fetchOffset;
+ pTopic->currentOffset = fetchOffset;
+
+ rsp.numOfTopics = 1;
+ rsp.pBlockData = pRes;
+
+ int32_t tlen = tEncodeSMqConsumeRsp(NULL, &rsp);
+ void* buf = rpcMallocCont(tlen);
+ if (buf == NULL) {
+ pMsg->code = -1;
+ return -1;
+ }
+
+ void* abuf = buf;
+ tEncodeSMqConsumeRsp(&abuf, &rsp);
+ taosArrayDestroyEx(rsp.pBlockData, (void (*)(void*))tDeleteSSDataBlock);
+ pMsg->pCont = buf;
+ pMsg->contLen = tlen;
+ pMsg->code = 0;
+ rpcSendResponse(pMsg);
+ return 0;
+ }
+ } else {
+ fetchOffset++;
+ rsp.skipLogNum++;
+ }
+ }
+
+ int32_t tlen = tEncodeSMqConsumeRsp(NULL, &rsp);
+ void* buf = rpcMallocCont(tlen);
+ if (buf == NULL) {
+ pMsg->code = -1;
+ return -1;
+ }
+
+ void* abuf = buf;
+ tEncodeSMqConsumeRsp(&abuf, &rsp);
+ rsp.pBlockData = NULL;
+ pMsg->pCont = buf;
+ pMsg->contLen = tlen;
+ pMsg->code = 0;
+ rpcSendResponse(pMsg);
+ return 0;
}
-int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) {
+#if 0
+int32_t tqProcessConsumeReqV0(STQ* pTq, SRpcMsg* pMsg) {
SMqConsumeReq* pReq = pMsg->pCont;
int64_t reqId = pReq->reqId;
int64_t consumerId = pReq->consumerId;
@@ -145,7 +326,7 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) {
/*printf("vg %d get consume req\n", pReq->head.vgId);*/
- STqConsumerHandle* pConsumer = tqHandleGet(pTq->tqMeta, consumerId);
+ STqConsumer* pConsumer = tqHandleGet(pTq->tqMeta, consumerId);
if (pConsumer == NULL) {
pMsg->pCont = NULL;
pMsg->contLen = 0;
@@ -156,10 +337,10 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) {
int sz = taosArrayGetSize(pConsumer->topics);
for (int i = 0; i < sz; i++) {
- STqTopicHandle* pTopic = taosArrayGet(pConsumer->topics, i);
+ STqTopic* pTopic = taosArrayGet(pConsumer->topics, i);
// TODO: support multiple topic in one req
if (strcmp(pTopic->topicName, pReq->topic) != 0) {
- /*ASSERT(false);*/
+ ASSERT(false);
continue;
}
@@ -195,6 +376,7 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) {
break;
}
if (walReadWithHandle(pTopic->pReadhandle, fetchOffset) < 0) {
+ printf("read offset %ld\n", fetchOffset);
// check err
atomic_store_8(&pTopic->buffer.output[pos].status, 0);
skip = 1;
@@ -203,10 +385,10 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) {
// read until find TDMT_VND_SUBMIT
pHead = pTopic->pReadhandle->pHead;
if (pHead->head.msgType == TDMT_VND_SUBMIT) {
- break;
}
rsp.skipLogNum++;
if (walReadWithHandle(pTopic->pReadhandle, fetchOffset) < 0) {
+ printf("read offset %ld\n", fetchOffset);
atomic_store_8(&pTopic->buffer.output[pos].status, 0);
skip = 1;
break;
@@ -218,6 +400,7 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) {
SSubmitMsg* pCont = (SSubmitMsg*)&pHead->head.body;
qTaskInfo_t task = pTopic->buffer.output[pos].task;
+ printf("current fetch offset %ld\n", fetchOffset);
qSetStreamInput(task, pCont);
// SArray
@@ -237,6 +420,7 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) {
// TODO copy
rsp.schemas = pTopic->buffer.output[pos].pReadHandle->pSchemaWrapper;
rsp.rspOffset = fetchOffset;
+ pTopic->currentOffset = fetchOffset;
atomic_store_8(&pTopic->buffer.output[pos].status, 0);
@@ -268,28 +452,27 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) {
}
void* abuf = buf;
tEncodeSMqConsumeRsp(&abuf, &rsp);
+
if (rsp.pBlockData) {
taosArrayDestroyEx(rsp.pBlockData, (void (*)(void*))tDeleteSSDataBlock);
rsp.pBlockData = NULL;
- /*for (int i = 0; i < taosArrayGetSize(rsp.pBlockData); i++) {*/
- /*SSDataBlock* pBlock = taosArrayGet(rsp.pBlockData, i);*/
- /*tDeleteSSDataBlock(pBlock);*/
- /*}*/
- /*taosArrayDestroy(rsp.pBlockData);*/
}
+
pMsg->pCont = buf;
pMsg->contLen = tlen;
pMsg->code = 0;
rpcSendResponse(pMsg);
return 0;
}
+#endif
int32_t tqProcessRebReq(STQ* pTq, char* msg) {
SMqMVRebReq req = {0};
tDecodeSMqMVRebReq(msg, &req);
- STqConsumerHandle* pConsumer = tqHandleGet(pTq->tqMeta, req.oldConsumerId);
+ STqConsumer* pConsumer = tqHandleGet(pTq->tqMeta, req.oldConsumerId);
ASSERT(pConsumer);
+ pConsumer->consumerId = req.newConsumerId;
tqHandleMovePut(pTq->tqMeta, req.newConsumerId, pConsumer);
tqHandleCommit(pTq->tqMeta, req.newConsumerId);
tqHandlePurge(pTq->tqMeta, req.oldConsumerId);
@@ -302,19 +485,20 @@ int32_t tqProcessSetConnReq(STQ* pTq, char* msg) {
tDecodeSMqSetCVgReq(msg, &req);
/*printf("vg %d set to consumer from %ld to %ld\n", req.vgId, req.oldConsumerId, req.newConsumerId);*/
- STqConsumerHandle* pConsumer = calloc(1, sizeof(STqConsumerHandle));
+ STqConsumer* pConsumer = calloc(1, sizeof(STqConsumer));
if (pConsumer == NULL) {
terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
return -1;
}
strcpy(pConsumer->cgroup, req.cgroup);
- pConsumer->topics = taosArrayInit(0, sizeof(STqTopicHandle));
+ pConsumer->topics = taosArrayInit(0, sizeof(STqTopic));
pConsumer->consumerId = req.consumerId;
pConsumer->epoch = 0;
- STqTopicHandle* pTopic = calloc(1, sizeof(STqTopicHandle));
+ STqTopic* pTopic = calloc(1, sizeof(STqTopic));
if (pTopic == NULL) {
+ taosArrayDestroy(pConsumer->topics);
free(pConsumer);
return -1;
}
@@ -330,6 +514,7 @@ int32_t tqProcessSetConnReq(STQ* pTq, char* msg) {
pTopic->buffer.lastOffset = -1;
pTopic->pReadhandle = walOpenReadHandle(pTq->pWal);
if (pTopic->pReadhandle == NULL) {
+ ASSERT(false);
}
for (int i = 0; i < TQ_BUFFER_SIZE; i++) {
pTopic->buffer.output[i].status = 0;
@@ -344,143 +529,3 @@ int32_t tqProcessSetConnReq(STQ* pTq, char* msg) {
terrno = TSDB_CODE_SUCCESS;
return 0;
}
-
-STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta) {
- STqReadHandle* pReadHandle = malloc(sizeof(STqReadHandle));
- if (pReadHandle == NULL) {
- return NULL;
- }
- pReadHandle->pVnodeMeta = pMeta;
- pReadHandle->pMsg = NULL;
- pReadHandle->ver = -1;
- pReadHandle->pColIdList = NULL;
- pReadHandle->sver = -1;
- pReadHandle->pSchema = NULL;
- pReadHandle->pSchemaWrapper = NULL;
- return pReadHandle;
-}
-
-void tqReadHandleSetMsg(STqReadHandle* pReadHandle, SSubmitMsg* pMsg, int64_t ver) {
- pReadHandle->pMsg = pMsg;
- pMsg->length = htonl(pMsg->length);
- pMsg->numOfBlocks = htonl(pMsg->numOfBlocks);
- tInitSubmitMsgIter(pMsg, &pReadHandle->msgIter);
- pReadHandle->ver = ver;
- memset(&pReadHandle->blkIter, 0, sizeof(SSubmitBlkIter));
-}
-
-bool tqNextDataBlock(STqReadHandle* pHandle) {
- while (1) {
- if (tGetSubmitMsgNext(&pHandle->msgIter, &pHandle->pBlock) < 0) {
- return false;
- }
- if (pHandle->pBlock == NULL) return false;
-
- pHandle->pBlock->uid = htobe64(pHandle->pBlock->uid);
- /*if (pHandle->tbUid == pHandle->pBlock->uid) {*/
- ASSERT(pHandle->tbIdHash);
- void* ret = taosHashGet(pHandle->tbIdHash, &pHandle->pBlock->uid, sizeof(int64_t));
- if (ret != NULL) {
- pHandle->pBlock->tid = htonl(pHandle->pBlock->tid);
- pHandle->pBlock->sversion = htonl(pHandle->pBlock->sversion);
- pHandle->pBlock->dataLen = htonl(pHandle->pBlock->dataLen);
- pHandle->pBlock->schemaLen = htonl(pHandle->pBlock->schemaLen);
- pHandle->pBlock->numOfRows = htons(pHandle->pBlock->numOfRows);
- return true;
- }
- }
- return false;
-}
-
-int tqRetrieveDataBlockInfo(STqReadHandle* pHandle, SDataBlockInfo* pBlockInfo) {
- /*int32_t sversion = pHandle->pBlock->sversion;*/
- /*SSchemaWrapper* pSchema = metaGetTableSchema(pHandle->pMeta, pHandle->pBlock->uid, sversion, false);*/
- pBlockInfo->numOfCols = taosArrayGetSize(pHandle->pColIdList);
- pBlockInfo->rows = pHandle->pBlock->numOfRows;
- pBlockInfo->uid = pHandle->pBlock->uid;
- return 0;
-}
-
-SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) {
- /*int32_t sversion = pHandle->pBlock->sversion;*/
- // TODO set to real sversion
- int32_t sversion = 0;
- if (pHandle->sver != sversion) {
- pHandle->pSchema = metaGetTbTSchema(pHandle->pVnodeMeta, pHandle->pBlock->uid, sversion);
-
- tb_uid_t quid;
- STbCfg* pTbCfg = metaGetTbInfoByUid(pHandle->pVnodeMeta, pHandle->pBlock->uid);
- if (pTbCfg->type == META_CHILD_TABLE) {
- quid = pTbCfg->ctbCfg.suid;
- } else {
- quid = pHandle->pBlock->uid;
- }
- pHandle->pSchemaWrapper = metaGetTableSchema(pHandle->pVnodeMeta, quid, sversion, true);
- pHandle->sver = sversion;
- }
-
- STSchema* pTschema = pHandle->pSchema;
- SSchemaWrapper* pSchemaWrapper = pHandle->pSchemaWrapper;
-
- int32_t numOfRows = pHandle->pBlock->numOfRows;
- int32_t numOfCols = pHandle->pSchema->numOfCols;
- int32_t colNumNeed = taosArrayGetSize(pHandle->pColIdList);
-
- // TODO: stable case
- if (colNumNeed > pSchemaWrapper->nCols) {
- colNumNeed = pSchemaWrapper->nCols;
- }
-
- SArray* pArray = taosArrayInit(colNumNeed, sizeof(SColumnInfoData));
- if (pArray == NULL) {
- return NULL;
- }
-
- int j = 0;
- for (int32_t i = 0; i < colNumNeed; i++) {
- int32_t colId = *(int32_t*)taosArrayGet(pHandle->pColIdList, i);
- while (j < pSchemaWrapper->nCols && pSchemaWrapper->pSchema[j].colId < colId) {
- j++;
- }
- SSchema* pColSchema = &pSchemaWrapper->pSchema[j];
- SColumnInfoData colInfo = {0};
- int sz = numOfRows * pColSchema->bytes;
- colInfo.info.bytes = pColSchema->bytes;
- colInfo.info.colId = colId;
- colInfo.info.type = pColSchema->type;
-
- colInfo.pData = calloc(1, sz);
- if (colInfo.pData == NULL) {
- // TODO free
- taosArrayDestroy(pArray);
- return NULL;
- }
- taosArrayPush(pArray, &colInfo);
- }
-
- STSRowIter iter = {0};
- tdSTSRowIterInit(&iter, pTschema);
- STSRow* row;
- // int32_t kvIdx = 0;
- int32_t curRow = 0;
- tInitSubmitBlkIter(pHandle->pBlock, &pHandle->blkIter);
- while ((row = tGetSubmitBlkNext(&pHandle->blkIter)) != NULL) {
- tdSTSRowIterReset(&iter, row);
- // get all wanted col of that block
- for (int32_t i = 0; i < colNumNeed; i++) {
- SColumnInfoData* pColData = taosArrayGet(pArray, i);
- STColumn* pCol = schemaColAt(pTschema, i);
- // TODO
- ASSERT(pCol->colId == pColData->info.colId);
- // void* val = tdGetMemRowDataOfColEx(row, pCol->colId, pCol->type, TD_DATA_ROW_HEAD_SIZE + pCol->offset, &kvIdx);
- SCellVal sVal = {0};
- if (!tdSTSRowIterNext(&iter, pCol->colId, pCol->type, &sVal)) {
- // TODO: reach end
- break;
- }
- memcpy(POINTER_SHIFT(pColData->pData, curRow * pCol->bytes), sVal.val, pCol->bytes);
- }
- curRow++;
- }
- return pArray;
-}
diff --git a/source/dnode/vnode/src/tq/tqMetaStore.c b/source/dnode/vnode/src/tq/tqMetaStore.c
index 121be98572..d40cc2294f 100644
--- a/source/dnode/vnode/src/tq/tqMetaStore.c
+++ b/source/dnode/vnode/src/tq/tqMetaStore.c
@@ -68,20 +68,21 @@ static inline int tqReadLastPage(int fd, STqIdxPageBuf* pBuf) {
return lseek(fd, offset, SEEK_SET);
}
-STqMetaStore* tqStoreOpen(const char* path, FTqSerialize serializer, FTqDeserialize deserializer, FTqDelete deleter,
- int32_t tqConfigFlag) {
- STqMetaStore* pMeta = malloc(sizeof(STqMetaStore));
+STqMetaStore* tqStoreOpen(STQ* pTq, const char* path, FTqSerialize serializer, FTqDeserialize deserializer,
+ FTqDelete deleter, int32_t tqConfigFlag) {
+ STqMetaStore* pMeta = calloc(1, sizeof(STqMetaStore));
if (pMeta == NULL) {
terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
return NULL;
}
- memset(pMeta, 0, sizeof(STqMetaStore));
+ pMeta->pTq = pTq;
// concat data file name and index file name
size_t pathLen = strlen(path);
pMeta->dirPath = malloc(pathLen + 1);
if (pMeta->dirPath == NULL) {
terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
+ free(pMeta);
return NULL;
}
strcpy(pMeta->dirPath, path);
@@ -103,12 +104,11 @@ STqMetaStore* tqStoreOpen(const char* path, FTqSerialize serializer, FTqDeserial
}
pMeta->idxFd = idxFd;
- pMeta->unpersistHead = malloc(sizeof(STqMetaList));
+ pMeta->unpersistHead = calloc(1, sizeof(STqMetaList));
if (pMeta->unpersistHead == NULL) {
terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
return NULL;
}
- memset(pMeta->unpersistHead, 0, sizeof(STqMetaList));
pMeta->unpersistHead->unpersistNext = pMeta->unpersistHead->unpersistPrev = pMeta->unpersistHead;
strcpy(name, path);
@@ -145,12 +145,11 @@ STqMetaStore* tqStoreOpen(const char* path, FTqSerialize serializer, FTqDeserial
ASSERT(idxBuf.head.writeOffset == idxRead);
// loop read every entry
for (int i = 0; i < idxBuf.head.writeOffset - TQ_IDX_PAGE_HEAD_SIZE; i += TQ_IDX_SIZE) {
- STqMetaList* pNode = malloc(sizeof(STqMetaList));
+ STqMetaList* pNode = calloc(1, sizeof(STqMetaList));
if (pNode == NULL) {
terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
// TODO: free memory
}
- memset(pNode, 0, sizeof(STqMetaList));
memcpy(&pNode->handle, &idxBuf.buffer[i], TQ_IDX_SIZE);
lseek(fileFd, pNode->handle.offset, SEEK_SET);
@@ -169,25 +168,25 @@ STqMetaStore* tqStoreOpen(const char* path, FTqSerialize serializer, FTqDeserial
}
if (serializedObj->action == TQ_ACTION_INUSE) {
if (serializedObj->ssize != sizeof(STqSerializedHead)) {
- pMeta->pDeserializer(serializedObj, &pNode->handle.valueInUse);
+ pMeta->pDeserializer(pTq, serializedObj, &pNode->handle.valueInUse);
} else {
pNode->handle.valueInUse = TQ_DELETE_TOKEN;
}
} else if (serializedObj->action == TQ_ACTION_INTXN) {
if (serializedObj->ssize != sizeof(STqSerializedHead)) {
- pMeta->pDeserializer(serializedObj, &pNode->handle.valueInTxn);
+ pMeta->pDeserializer(pTq, serializedObj, &pNode->handle.valueInTxn);
} else {
pNode->handle.valueInTxn = TQ_DELETE_TOKEN;
}
} else if (serializedObj->action == TQ_ACTION_INUSE_CONT) {
if (serializedObj->ssize != sizeof(STqSerializedHead)) {
- pMeta->pDeserializer(serializedObj, &pNode->handle.valueInUse);
+ pMeta->pDeserializer(pTq, serializedObj, &pNode->handle.valueInUse);
} else {
pNode->handle.valueInUse = TQ_DELETE_TOKEN;
}
STqSerializedHead* ptr = POINTER_SHIFT(serializedObj, serializedObj->ssize);
if (ptr->ssize != sizeof(STqSerializedHead)) {
- pMeta->pDeserializer(ptr, &pNode->handle.valueInTxn);
+ pMeta->pDeserializer(pTq, ptr, &pNode->handle.valueInTxn);
} else {
pNode->handle.valueInTxn = TQ_DELETE_TOKEN;
}
@@ -302,7 +301,7 @@ int32_t tqStorePersist(STqMetaStore* pMeta) {
pSHead->ver = TQ_SVER;
pSHead->checksum = 0;
pSHead->ssize = sizeof(STqSerializedHead);
- int allocatedSize = sizeof(STqSerializedHead);
+ /*int allocatedSize = sizeof(STqSerializedHead);*/
int offset = lseek(pMeta->fileFd, 0, SEEK_CUR);
tqReadLastPage(pMeta->idxFd, &idxBuf);
@@ -417,14 +416,14 @@ static int32_t tqHandlePutCommitted(STqMetaStore* pMeta, int64_t key, void* valu
pNode = pNode->next;
}
}
- STqMetaList* pNewNode = malloc(sizeof(STqMetaList));
+ STqMetaList* pNewNode = calloc(1, sizeof(STqMetaList));
if (pNewNode == NULL) {
terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
return -1;
}
- memset(pNewNode, 0, sizeof(STqMetaList));
pNewNode->handle.key = key;
pNewNode->handle.valueInUse = value;
+ pNewNode->next = pMeta->bucket[bucketKey];
// put into unpersist list
pNewNode->unpersistPrev = pMeta->unpersistHead;
pNewNode->unpersistNext = pMeta->unpersistHead->unpersistNext;
@@ -489,12 +488,11 @@ static inline int32_t tqHandlePutImpl(STqMetaStore* pMeta, int64_t key, void* va
pNode = pNode->next;
}
}
- STqMetaList* pNewNode = malloc(sizeof(STqMetaList));
+ STqMetaList* pNewNode = calloc(1, sizeof(STqMetaList));
if (pNewNode == NULL) {
terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
return -1;
}
- memset(pNewNode, 0, sizeof(STqMetaList));
pNewNode->handle.key = key;
pNewNode->handle.valueInTxn = value;
pNewNode->next = pMeta->bucket[bucketKey];
diff --git a/source/dnode/vnode/src/tq/tqOffset.c b/source/dnode/vnode/src/tq/tqOffset.c
new file mode 100644
index 0000000000..4115cb7313
--- /dev/null
+++ b/source/dnode/vnode/src/tq/tqOffset.c
@@ -0,0 +1,42 @@
+/*
+ * Copyright (c) 2019 TAOS Data, Inc.
+ *
+ * This program is free software: you can use, redistribute, and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3
+ * or later ("AGPL"), as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+#define _DEFAULT_SOURCE
+
+#include "tqOffset.h"
+
+enum ETqOffsetPersist {
+ TQ_OFFSET_PERSIST__LAZY = 1,
+ TQ_OFFSET_PERSIST__EAGER,
+};
+
+struct STqOffsetCfg {
+ int8_t persistPolicy;
+};
+
+struct STqOffsetStore {
+ STqOffsetCfg cfg;
+ SHashObj* pHash; // SHashObj
+};
+
+STqOffsetStore* STqOffsetOpen(STqOffsetCfg* pCfg) {
+ STqOffsetStore* pStore = malloc(sizeof(STqOffsetStore));
+ if (pStore == NULL) {
+ return NULL;
+ }
+ memcpy(&pStore->cfg, pCfg, sizeof(STqOffsetCfg));
+ pStore->pHash = taosHashInit(64, MurmurHash3_32, true, HASH_NO_LOCK);
+ return pStore;
+}
+
diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c
new file mode 100644
index 0000000000..9f76c6b76a
--- /dev/null
+++ b/source/dnode/vnode/src/tq/tqRead.c
@@ -0,0 +1,160 @@
+/*
+ * Copyright (c) 2019 TAOS Data, Inc.
+ *
+ * This program is free software: you can use, redistribute, and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3
+ * or later ("AGPL"), as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+#define _DEFAULT_SOURCE
+
+#include "vnode.h"
+
+STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta) {
+ STqReadHandle* pReadHandle = malloc(sizeof(STqReadHandle));
+ if (pReadHandle == NULL) {
+ return NULL;
+ }
+ pReadHandle->pVnodeMeta = pMeta;
+ pReadHandle->pMsg = NULL;
+ pReadHandle->ver = -1;
+ pReadHandle->pColIdList = NULL;
+ pReadHandle->sver = -1;
+ pReadHandle->pSchema = NULL;
+ pReadHandle->pSchemaWrapper = NULL;
+ return pReadHandle;
+}
+
+void tqReadHandleSetMsg(STqReadHandle* pReadHandle, SSubmitMsg* pMsg, int64_t ver) {
+ pReadHandle->pMsg = pMsg;
+ pMsg->length = htonl(pMsg->length);
+ pMsg->numOfBlocks = htonl(pMsg->numOfBlocks);
+ tInitSubmitMsgIter(pMsg, &pReadHandle->msgIter);
+ pReadHandle->ver = ver;
+ memset(&pReadHandle->blkIter, 0, sizeof(SSubmitBlkIter));
+}
+
+bool tqNextDataBlock(STqReadHandle* pHandle) {
+ while (1) {
+ if (tGetSubmitMsgNext(&pHandle->msgIter, &pHandle->pBlock) < 0) {
+ return false;
+ }
+ if (pHandle->pBlock == NULL) return false;
+
+ pHandle->pBlock->uid = htobe64(pHandle->pBlock->uid);
+ /*if (pHandle->tbUid == pHandle->pBlock->uid) {*/
+ ASSERT(pHandle->tbIdHash);
+ void* ret = taosHashGet(pHandle->tbIdHash, &pHandle->pBlock->uid, sizeof(int64_t));
+ if (ret != NULL) {
+ /*printf("retrieve one tb %ld\n", pHandle->pBlock->uid);*/
+ pHandle->pBlock->tid = htonl(pHandle->pBlock->tid);
+ pHandle->pBlock->sversion = htonl(pHandle->pBlock->sversion);
+ pHandle->pBlock->dataLen = htonl(pHandle->pBlock->dataLen);
+ pHandle->pBlock->schemaLen = htonl(pHandle->pBlock->schemaLen);
+ pHandle->pBlock->numOfRows = htons(pHandle->pBlock->numOfRows);
+ return true;
+ } else {
+ /*printf("skip one tb %ld\n", pHandle->pBlock->uid);*/
+ }
+ }
+ return false;
+}
+
+int tqRetrieveDataBlockInfo(STqReadHandle* pHandle, SDataBlockInfo* pBlockInfo) {
+ /*int32_t sversion = pHandle->pBlock->sversion;*/
+ /*SSchemaWrapper* pSchema = metaGetTableSchema(pHandle->pMeta, pHandle->pBlock->uid, sversion, false);*/
+ pBlockInfo->numOfCols = taosArrayGetSize(pHandle->pColIdList);
+ pBlockInfo->rows = pHandle->pBlock->numOfRows;
+ pBlockInfo->uid = pHandle->pBlock->uid;
+ return 0;
+}
+
+SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) {
+ /*int32_t sversion = pHandle->pBlock->sversion;*/
+ // TODO set to real sversion
+ int32_t sversion = 0;
+ if (pHandle->sver != sversion) {
+ pHandle->pSchema = metaGetTbTSchema(pHandle->pVnodeMeta, pHandle->pBlock->uid, sversion);
+
+ tb_uid_t quid;
+ STbCfg* pTbCfg = metaGetTbInfoByUid(pHandle->pVnodeMeta, pHandle->pBlock->uid);
+ if (pTbCfg->type == META_CHILD_TABLE) {
+ quid = pTbCfg->ctbCfg.suid;
+ } else {
+ quid = pHandle->pBlock->uid;
+ }
+ pHandle->pSchemaWrapper = metaGetTableSchema(pHandle->pVnodeMeta, quid, sversion, true);
+ pHandle->sver = sversion;
+ }
+
+ STSchema* pTschema = pHandle->pSchema;
+ SSchemaWrapper* pSchemaWrapper = pHandle->pSchemaWrapper;
+
+ int32_t numOfRows = pHandle->pBlock->numOfRows;
+ int32_t numOfCols = pHandle->pSchema->numOfCols;
+ int32_t colNumNeed = taosArrayGetSize(pHandle->pColIdList);
+
+ // TODO: stable case
+ if (colNumNeed > pSchemaWrapper->nCols) {
+ colNumNeed = pSchemaWrapper->nCols;
+ }
+
+ SArray* pArray = taosArrayInit(colNumNeed, sizeof(SColumnInfoData));
+ if (pArray == NULL) {
+ return NULL;
+ }
+
+ int j = 0;
+ for (int32_t i = 0; i < colNumNeed; i++) {
+ int32_t colId = *(int32_t*)taosArrayGet(pHandle->pColIdList, i);
+ while (j < pSchemaWrapper->nCols && pSchemaWrapper->pSchema[j].colId < colId) {
+ j++;
+ }
+ SSchema* pColSchema = &pSchemaWrapper->pSchema[j];
+ SColumnInfoData colInfo = {0};
+ int sz = numOfRows * pColSchema->bytes;
+ colInfo.info.bytes = pColSchema->bytes;
+ colInfo.info.colId = colId;
+ colInfo.info.type = pColSchema->type;
+
+ colInfo.pData = calloc(1, sz);
+ if (colInfo.pData == NULL) {
+ // TODO free
+ taosArrayDestroy(pArray);
+ return NULL;
+ }
+ taosArrayPush(pArray, &colInfo);
+ }
+
+ STSRowIter iter = {0};
+ tdSTSRowIterInit(&iter, pTschema);
+ STSRow* row;
+ // int32_t kvIdx = 0;
+ int32_t curRow = 0;
+ tInitSubmitBlkIter(pHandle->pBlock, &pHandle->blkIter);
+ while ((row = tGetSubmitBlkNext(&pHandle->blkIter)) != NULL) {
+ tdSTSRowIterReset(&iter, row);
+ // get all wanted col of that block
+ for (int32_t i = 0; i < colNumNeed; i++) {
+ SColumnInfoData* pColData = taosArrayGet(pArray, i);
+ STColumn* pCol = schemaColAt(pTschema, i);
+ // TODO
+ ASSERT(pCol->colId == pColData->info.colId);
+ // void* val = tdGetMemRowDataOfColEx(row, pCol->colId, pCol->type, TD_DATA_ROW_HEAD_SIZE + pCol->offset, &kvIdx);
+ SCellVal sVal = {0};
+ if (!tdSTSRowIterNext(&iter, pCol->colId, pCol->type, &sVal)) {
+ // TODO: reach end
+ break;
+ }
+ memcpy(POINTER_SHIFT(pColData->pData, curRow * pCol->bytes), sVal.val, pCol->bytes);
+ }
+ curRow++;
+ }
+ return pArray;
+}
diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c
index 9b44e14c9b..26c313f421 100644
--- a/source/dnode/vnode/src/tsdb/tsdbCommit.c
+++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c
@@ -18,7 +18,7 @@
#define TSDB_MAX_SUBBLOCKS 8
typedef struct {
- STable * pTable;
+ STable *pTable;
SSkipListIterator *pIter;
} SCommitIter;
@@ -34,11 +34,11 @@ typedef struct {
bool isLFileSame;
TSKEY minKey;
TSKEY maxKey;
- SArray * aBlkIdx; // SBlockIdx array
- STable * pTable;
- SArray * aSupBlk; // Table super-block array
- SArray * aSubBlk; // table sub-block array
- SDataCols * pDataCols;
+ SArray *aBlkIdx; // SBlockIdx array
+ STable *pTable;
+ SArray *aSupBlk; // Table super-block array
+ SArray *aSubBlk; // table sub-block array
+ SDataCols *pDataCols;
} SCommitH;
#define TSDB_DEFAULT_BLOCK_ROWS(maxRows) ((maxRows)*4 / 5)
@@ -90,7 +90,7 @@ int tsdbWriteBlockIdx(SDFile *pHeadf, SArray *pIdxA, void **ppBuf);
int tsdbApplyRtnOnFSet(STsdb *pRepo, SDFileSet *pSet, SRtn *pRtn) {
SDiskID did;
SDFileSet nSet = {0};
- STsdbFS * pfs = REPO_FS(pRepo);
+ STsdbFS *pfs = REPO_FS(pRepo);
int level;
ASSERT(pSet->fid >= pRtn->minFid);
@@ -135,12 +135,13 @@ int tsdbPrepareCommit(STsdb *pTsdb) {
pTsdb->imem = pTsdb->mem;
pTsdb->mem = NULL;
+ return 0;
}
int tsdbCommit(STsdb *pRepo) {
STsdbMemTable *pMem = pRepo->imem;
SCommitH commith = {0};
- SDFileSet * pSet = NULL;
+ SDFileSet *pSet = NULL;
int fid;
if (pRepo->imem == NULL) return 0;
@@ -303,7 +304,7 @@ static void tsdbSeekCommitIter(SCommitH *pCommith, TSKEY key) {
}
static int tsdbNextCommitFid(SCommitH *pCommith) {
- STsdb * pRepo = TSDB_COMMIT_REPO(pCommith);
+ STsdb *pRepo = TSDB_COMMIT_REPO(pCommith);
STsdbCfg *pCfg = REPO_CFG(pRepo);
int fid = TSDB_IVLD_FID;
@@ -336,7 +337,7 @@ static void tsdbDestroyCommitH(SCommitH *pCommith) {
}
static int tsdbCommitToFile(SCommitH *pCommith, SDFileSet *pSet, int fid) {
- STsdb * pRepo = TSDB_COMMIT_REPO(pCommith);
+ STsdb *pRepo = TSDB_COMMIT_REPO(pCommith);
STsdbCfg *pCfg = REPO_CFG(pRepo);
ASSERT(pSet == NULL || pSet->fid == fid);
@@ -391,12 +392,12 @@ static int tsdbCommitToFile(SCommitH *pCommith, SDFileSet *pSet, int fid) {
}
static int tsdbCreateCommitIters(SCommitH *pCommith) {
- STsdb * pRepo = TSDB_COMMIT_REPO(pCommith);
- STsdbMemTable * pMem = pRepo->imem;
+ STsdb *pRepo = TSDB_COMMIT_REPO(pCommith);
+ STsdbMemTable *pMem = pRepo->imem;
SSkipListIterator *pSlIter;
- SCommitIter * pCommitIter;
- SSkipListNode * pNode;
- STbData * pTbData;
+ SCommitIter *pCommitIter;
+ SSkipListNode *pNode;
+ STbData *pTbData;
pCommith->niters = SL_SIZE(pMem->pSlIdx);
pCommith->iters = (SCommitIter *)calloc(pCommith->niters, sizeof(SCommitIter));
@@ -452,7 +453,7 @@ static void tsdbResetCommitFile(SCommitH *pCommith) {
static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid) {
SDiskID did;
- STsdb * pRepo = TSDB_COMMIT_REPO(pCommith);
+ STsdb *pRepo = TSDB_COMMIT_REPO(pCommith);
SDFileSet *pWSet = TSDB_COMMIT_WRITE_FSET(pCommith);
if (tfsAllocDisk(pRepo->pTfs, tsdbGetFidLevel(fid, &(pCommith->rtn)), &did) < 0) {
@@ -583,7 +584,7 @@ int tsdbWriteBlockInfoImpl(SDFile *pHeadf, STable *pTable, SArray *pSupA, SArray
uint32_t tlen;
SBlockInfo *pBlkInfo;
int64_t offset;
- SBlock * pBlock;
+ SBlock *pBlock;
memset(pIdx, 0, sizeof(*pIdx));
@@ -1130,7 +1131,7 @@ static int tsdbComparKeyBlock(const void *arg1, const void *arg2) {
int tsdbWriteBlockImpl(STsdb *pRepo, STable *pTable, SDFile *pDFile, SDataCols *pDataCols, SBlock *pBlock, bool isLast,
bool isSuper, void **ppBuf, void **ppCBuf) {
- STsdbCfg * pCfg = REPO_CFG(pRepo);
+ STsdbCfg *pCfg = REPO_CFG(pRepo);
SBlockData *pBlockData;
int64_t offset = 0;
int rowsToWrite = pDataCols->numOfRows;
@@ -1147,7 +1148,7 @@ int tsdbWriteBlockImpl(STsdb *pRepo, STable *pTable, SDFile *pDFile, SDataCols *
// Get # of cols not all NULL(not including key column)
int nColsNotAllNull = 0;
for (int ncol = 1; ncol < pDataCols->numOfCols; ncol++) { // ncol from 1, we skip the timestamp column
- SDataCol * pDataCol = pDataCols->cols + ncol;
+ SDataCol *pDataCol = pDataCols->cols + ncol;
SBlockCol *pBlockCol = pBlockData->cols + nColsNotAllNull;
if (isAllRowsNull(pDataCol)) { // all data to commit are NULL, just ignore it
@@ -1188,7 +1189,7 @@ int tsdbWriteBlockImpl(STsdb *pRepo, STable *pTable, SDFile *pDFile, SDataCols *
// All not NULL columns finish
if (ncol != 0 && tcol >= nColsNotAllNull) break;
- SDataCol * pDataCol = pDataCols->cols + ncol;
+ SDataCol *pDataCol = pDataCols->cols + ncol;
SBlockCol *pBlockCol = pBlockData->cols + tcol;
if (ncol != 0 && (pDataCol->colId != pBlockCol->colId)) continue;
@@ -1212,7 +1213,7 @@ int tsdbWriteBlockImpl(STsdb *pRepo, STable *pTable, SDFile *pDFile, SDataCols *
}
#endif
- void * tptr;
+ void *tptr;
// Make room
if (tsdbMakeRoom(ppBuf, lsize + tlen + COMP_OVERFLOW_BYTES + sizeof(TSCKSUM)) < 0) {
@@ -1278,7 +1279,7 @@ int tsdbWriteBlockImpl(STsdb *pRepo, STable *pTable, SDFile *pDFile, SDataCols *
pBlock->keyFirst = dataColsKeyFirst(pDataCols);
pBlock->keyLast = dataColsKeyLast(pDataCols);
- tsdbDebug("vgId:%d uid:%"PRId64" a block of data is written to file %s, offset %" PRId64
+ tsdbDebug("vgId:%d uid:%" PRId64 " a block of data is written to file %s, offset %" PRId64
" numOfRows %d len %d numOfCols %" PRId16 " keyFirst %" PRId64 " keyLast %" PRId64,
REPO_ID(pRepo), TABLE_TID(pTable), TSDB_FILE_FULL_NAME(pDFile), offset, rowsToWrite, pBlock->len,
pBlock->numOfCols, pBlock->keyFirst, pBlock->keyLast);
@@ -1294,9 +1295,9 @@ static int tsdbWriteBlock(SCommitH *pCommith, SDFile *pDFile, SDataCols *pDataCo
}
static int tsdbWriteBlockInfo(SCommitH *pCommih) {
- SDFile * pHeadf = TSDB_COMMIT_HEAD_FILE(pCommih);
+ SDFile *pHeadf = TSDB_COMMIT_HEAD_FILE(pCommih);
SBlockIdx blkIdx;
- STable * pTable = TSDB_COMMIT_TABLE(pCommih);
+ STable *pTable = TSDB_COMMIT_TABLE(pCommih);
if (tsdbWriteBlockInfoImpl(pHeadf, pTable, pCommih->aSupBlk, pCommih->aSubBlk, (void **)(&(TSDB_COMMIT_BUF(pCommih))),
&blkIdx) < 0) {
@@ -1316,11 +1317,11 @@ static int tsdbWriteBlockInfo(SCommitH *pCommih) {
}
static int tsdbCommitMemData(SCommitH *pCommith, SCommitIter *pIter, TSKEY keyLimit, bool toData) {
- STsdb * pRepo = TSDB_COMMIT_REPO(pCommith);
- STsdbCfg * pCfg = REPO_CFG(pRepo);
+ STsdb *pRepo = TSDB_COMMIT_REPO(pCommith);
+ STsdbCfg *pCfg = REPO_CFG(pRepo);
SMergeInfo mInfo;
int32_t defaultRows = TSDB_COMMIT_DEFAULT_ROWS(pCommith);
- SDFile * pDFile;
+ SDFile *pDFile;
bool isLast;
SBlock block;
@@ -1349,16 +1350,16 @@ static int tsdbCommitMemData(SCommitH *pCommith, SCommitIter *pIter, TSKEY keyLi
}
static int tsdbMergeMemData(SCommitH *pCommith, SCommitIter *pIter, int bidx) {
- STsdb * pRepo = TSDB_COMMIT_REPO(pCommith);
- STsdbCfg * pCfg = REPO_CFG(pRepo);
+ STsdb *pRepo = TSDB_COMMIT_REPO(pCommith);
+ STsdbCfg *pCfg = REPO_CFG(pRepo);
int nBlocks = pCommith->readh.pBlkIdx->numOfBlocks;
- SBlock * pBlock = pCommith->readh.pBlkInfo->blocks + bidx;
+ SBlock *pBlock = pCommith->readh.pBlkInfo->blocks + bidx;
TSKEY keyLimit;
int16_t colId = PRIMARYKEY_TIMESTAMP_COL_ID;
SMergeInfo mInfo;
SBlock subBlocks[TSDB_MAX_SUBBLOCKS];
SBlock block, supBlock;
- SDFile * pDFile;
+ SDFile *pDFile;
if (bidx == nBlocks - 1) {
keyLimit = pCommith->maxKey;
@@ -1474,10 +1475,10 @@ static int tsdbCommitAddBlock(SCommitH *pCommith, const SBlock *pSupBlock, const
static int tsdbMergeBlockData(SCommitH *pCommith, SCommitIter *pIter, SDataCols *pDataCols, TSKEY keyLimit,
bool isLastOneBlock) {
- STsdb * pRepo = TSDB_COMMIT_REPO(pCommith);
+ STsdb *pRepo = TSDB_COMMIT_REPO(pCommith);
STsdbCfg *pCfg = REPO_CFG(pRepo);
SBlock block;
- SDFile * pDFile;
+ SDFile *pDFile;
bool isLast;
int32_t defaultRows = TSDB_COMMIT_DEFAULT_ROWS(pCommith);
@@ -1598,7 +1599,7 @@ static void tsdbCloseCommitFile(SCommitH *pCommith, bool hasError) {
}
static bool tsdbCanAddSubBlock(SCommitH *pCommith, SBlock *pBlock, SMergeInfo *pInfo) {
- STsdb * pRepo = TSDB_COMMIT_REPO(pCommith);
+ STsdb *pRepo = TSDB_COMMIT_REPO(pCommith);
STsdbCfg *pCfg = REPO_CFG(pRepo);
int mergeRows = pBlock->numOfRows + pInfo->rowsInserted - pInfo->rowsDeleteSucceed;
diff --git a/source/dnode/vnode/src/vnd/vnodeWrite.c b/source/dnode/vnode/src/vnd/vnodeWrite.c
index 4d048bc6e2..46e60329ae 100644
--- a/source/dnode/vnode/src/vnd/vnodeWrite.c
+++ b/source/dnode/vnode/src/vnd/vnodeWrite.c
@@ -16,19 +16,6 @@
#include "tq.h"
#include "vnd.h"
-#if 0
-int vnodeProcessNoWalWMsgs(SVnode *pVnode, SRpcMsg *pMsg) {
- switch (pMsg->msgType) {
- case TDMT_VND_MQ_SET_CUR:
- if (tqSetCursor(pVnode->pTq, pMsg->pCont) < 0) {
- // TODO: handle error
- }
- break;
- }
- return 0;
-}
-#endif
-
int vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs) {
SRpcMsg *pMsg;
@@ -36,11 +23,12 @@ int vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs) {
pMsg = *(SRpcMsg **)taosArrayGet(pMsgs, i);
// ser request version
- void * pBuf = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
+ void *pBuf = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int64_t ver = pVnode->state.processed++;
- taosEncodeFixedU64(&pBuf, ver);
+ taosEncodeFixedI64(&pBuf, ver);
if (walWrite(pVnode->pWal, ver, pMsg->msgType, pMsg->pCont, pMsg->contLen) < 0) {
+ /*ASSERT(false);*/
// TODO: handle error
}
}
@@ -55,7 +43,7 @@ int vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs) {
int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
SVCreateTbReq vCreateTbReq;
SVCreateTbBatchReq vCreateTbBatchReq;
- void * ptr = vnodeMalloc(pVnode, pMsg->contLen);
+ void *ptr = vnodeMalloc(pVnode, pMsg->contLen);
if (ptr == NULL) {
// TODO: handle error
}
@@ -64,8 +52,8 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
memcpy(ptr, pMsg->pCont, pMsg->contLen);
// todo: change the interface here
- uint64_t ver;
- taosDecodeFixedU64(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &ver);
+ int64_t ver;
+ taosDecodeFixedI64(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &ver);
if (tqPushMsg(pVnode->pTq, ptr, ver) < 0) {
// TODO: handle error
}
@@ -132,7 +120,6 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
} break;
case TDMT_VND_MQ_REB: {
if (tqProcessRebReq(pVnode->pTq, POINTER_SHIFT(ptr, sizeof(SMsgHead))) < 0) {
-
}
} break;
default:
diff --git a/source/libs/CMakeLists.txt b/source/libs/CMakeLists.txt
index 1d23f333b2..049b69991f 100644
--- a/source/libs/CMakeLists.txt
+++ b/source/libs/CMakeLists.txt
@@ -13,3 +13,4 @@ add_subdirectory(function)
add_subdirectory(qcom)
add_subdirectory(qworker)
add_subdirectory(tfs)
+add_subdirectory(nodes)
\ No newline at end of file
diff --git a/source/nodes/CMakeLists.txt b/source/libs/nodes/CMakeLists.txt
similarity index 84%
rename from source/nodes/CMakeLists.txt
rename to source/libs/nodes/CMakeLists.txt
index b30534f3f2..9a826e034c 100644
--- a/source/nodes/CMakeLists.txt
+++ b/source/libs/nodes/CMakeLists.txt
@@ -2,7 +2,7 @@ aux_source_directory(src NODES_SRC)
add_library(nodes STATIC ${NODES_SRC})
target_include_directories(
nodes
- PUBLIC "${CMAKE_SOURCE_DIR}/include/nodes"
+ PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/nodes"
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
)
target_link_libraries(
diff --git a/source/nodes/src/nodesCloneFuncs.c b/source/libs/nodes/src/nodesCloneFuncs.c
similarity index 100%
rename from source/nodes/src/nodesCloneFuncs.c
rename to source/libs/nodes/src/nodesCloneFuncs.c
diff --git a/source/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c
similarity index 100%
rename from source/nodes/src/nodesCodeFuncs.c
rename to source/libs/nodes/src/nodesCodeFuncs.c
diff --git a/source/nodes/src/nodesEqualFuncs.c b/source/libs/nodes/src/nodesEqualFuncs.c
similarity index 100%
rename from source/nodes/src/nodesEqualFuncs.c
rename to source/libs/nodes/src/nodesEqualFuncs.c
diff --git a/source/nodes/src/nodesTraverseFuncs.c b/source/libs/nodes/src/nodesTraverseFuncs.c
similarity index 100%
rename from source/nodes/src/nodesTraverseFuncs.c
rename to source/libs/nodes/src/nodesTraverseFuncs.c
diff --git a/source/nodes/src/nodesUtilFuncs.c b/source/libs/nodes/src/nodesUtilFuncs.c
similarity index 100%
rename from source/nodes/src/nodesUtilFuncs.c
rename to source/libs/nodes/src/nodesUtilFuncs.c
diff --git a/source/nodes/test/CMakeLists.txt b/source/libs/nodes/test/CMakeLists.txt
similarity index 100%
rename from source/nodes/test/CMakeLists.txt
rename to source/libs/nodes/test/CMakeLists.txt
diff --git a/source/nodes/test/nodesTest.cpp b/source/libs/nodes/test/nodesTest.cpp
similarity index 100%
rename from source/nodes/test/nodesTest.cpp
rename to source/libs/nodes/test/nodesTest.cpp