Merge remote-tracking branch 'origin/3.0' into feature/privilege

This commit is contained in:
Shengliang Guan 2022-02-15 16:50:15 +08:00
commit 2e6bdc2b6b
32 changed files with 928 additions and 611 deletions

View File

@ -13,16 +13,14 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <assert.h>
#include <stdio.h>
#include <string.h>
#include <assert.h>
#include <time.h>
#include "taos.h"
static int running = 1;
static void msg_process(tmq_message_t* message) {
tmqShowMsg(message);
}
static int running = 1;
static void msg_process(tmq_message_t* message) { tmqShowMsg(message); }
int32_t init_env() {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
@ -44,29 +42,28 @@ int32_t init_env() {
}
taos_free_result(pRes);
/*pRes = taos_query(pConn, "create stable if not exists st1 (ts timestamp, k int) tags(a int)");*/
/*if (taos_errno(pRes) != 0) {*/
/*printf("failed to create super table 123_$^), reason:%s\n", taos_errstr(pRes));*/
/*return -1;*/
/*}*/
/*taos_free_result(pRes);*/
pRes = taos_query(pConn, "create stable if not exists st1 (ts timestamp, k int) tags(a int)");
if (taos_errno(pRes) != 0) {
printf("failed to create super table 123_$^), reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
/*pRes = taos_query(pConn, "create table if not exists tu using st1 tags(1)");*/
/*if (taos_errno(pRes) != 0) {*/
/*printf("failed to create child table tu, reason:%s\n", taos_errstr(pRes));*/
/*return -1;*/
/*}*/
/*taos_free_result(pRes);*/
pRes = taos_query(pConn, "create table if not exists tu1 using st1 tags(1)");
if (taos_errno(pRes) != 0) {
printf("failed to create child table tu1, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
/*pRes = taos_query(pConn, "create table if not exists tu2 using st1 tags(2)");*/
/*if (taos_errno(pRes) != 0) {*/
/*printf("failed to create child table tu2, reason:%s\n", taos_errstr(pRes));*/
/*return -1;*/
/*}*/
/*taos_free_result(pRes);*/
pRes = taos_query(pConn, "create table if not exists tu2 using st1 tags(2)");
if (taos_errno(pRes) != 0) {
printf("failed to create child table tu2, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
const char* sql = "select * from st1";
const char* sql = "select * from tu1";
pRes = tmq_create_topic(pConn, "test_stb_topic_1", sql, strlen(sql));
if (taos_errno(pRes) != 0) {
printf("failed to create topic test_stb_topic_1, reason:%s\n", taos_errstr(pRes));
@ -95,7 +92,7 @@ tmq_t* build_consumer() {
tmq_list_t* topic_list = tmq_list_new();
tmq_list_append(topic_list, "test_stb_topic_1");
tmq_subscribe(tmq, topic_list);
return NULL;
return NULL;
}
tmq_list_t* build_topic_list() {
@ -104,8 +101,7 @@ tmq_list_t* build_topic_list() {
return topic_list;
}
void basic_consume_loop(tmq_t *tmq,
tmq_list_t *topics) {
void basic_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
tmq_resp_err_t err;
if ((err = tmq_subscribe(tmq, topics))) {
@ -116,12 +112,12 @@ void basic_consume_loop(tmq_t *tmq,
int32_t cnt = 0;
/*clock_t startTime = clock();*/
while (running) {
tmq_message_t *tmqmessage = tmq_consumer_poll(tmq, 500);
tmq_message_t* tmqmessage = tmq_consumer_poll(tmq, 500);
if (tmqmessage) {
cnt++;
msg_process(tmqmessage);
tmq_message_destroy(tmqmessage);
/*} else {*/
/*} else {*/
/*break;*/
}
}
@ -135,11 +131,10 @@ void basic_consume_loop(tmq_t *tmq,
fprintf(stderr, "%% Consumer closed\n");
}
void sync_consume_loop(tmq_t *tmq,
tmq_list_t *topics) {
void sync_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
static const int MIN_COMMIT_COUNT = 1000;
int msg_count = 0;
int msg_count = 0;
tmq_resp_err_t err;
if ((err = tmq_subscribe(tmq, topics))) {
@ -148,15 +143,47 @@ void sync_consume_loop(tmq_t *tmq,
}
while (running) {
tmq_message_t *tmqmessage = tmq_consumer_poll(tmq, 500);
tmq_message_t* tmqmessage = tmq_consumer_poll(tmq, 500);
if (tmqmessage) {
msg_process(tmqmessage);
tmq_message_destroy(tmqmessage);
if ((++msg_count % MIN_COMMIT_COUNT) == 0)
tmq_commit(tmq, NULL, 0);
if ((++msg_count % MIN_COMMIT_COUNT) == 0) tmq_commit(tmq, NULL, 0);
}
}
}
err = tmq_consumer_close(tmq);
if (err)
fprintf(stderr, "%% Failed to close consumer: %s\n", tmq_err2str(err));
else
fprintf(stderr, "%% Consumer closed\n");
}
void perf_loop(tmq_t* tmq, tmq_list_t* topics) {
tmq_resp_err_t err;
if ((err = tmq_subscribe(tmq, topics))) {
fprintf(stderr, "%% Failed to start consuming topics: %s\n", tmq_err2str(err));
printf("subscribe err\n");
return;
}
int32_t batchCnt = 0;
int32_t skipLogNum = 0;
clock_t startTime = clock();
while (running) {
tmq_message_t* tmqmessage = tmq_consumer_poll(tmq, 500);
if (tmqmessage) {
batchCnt++;
skipLogNum += tmqGetSkipLogNum(tmqmessage);
/*msg_process(tmqmessage);*/
tmq_message_destroy(tmqmessage);
} else {
break;
}
}
clock_t endTime = clock();
printf("log batch cnt: %d, skip log cnt: %d, time used:%f s\n", batchCnt, skipLogNum,
(double)(endTime - startTime) / CLOCKS_PER_SEC);
err = tmq_consumer_close(tmq);
if (err)
@ -168,8 +195,9 @@ void sync_consume_loop(tmq_t *tmq,
int main() {
int code;
code = init_env();
tmq_t* tmq = build_consumer();
tmq_t* tmq = build_consumer();
tmq_list_t* topic_list = build_topic_list();
basic_consume_loop(tmq, topic_list);
perf_loop(tmq, topic_list);
/*basic_consume_loop(tmq, topic_list);*/
/*sync_consume_loop(tmq, topic_list);*/
}

View File

@ -16,8 +16,8 @@
#ifndef TDENGINE_TAOS_H
#define TDENGINE_TAOS_H
#include <stdint.h>
#include <stdbool.h>
#include <stdint.h>
#ifdef __cplusplus
extern "C" {
@ -31,26 +31,26 @@ typedef void TAOS_SUB;
typedef void **TAOS_ROW;
// Data type definition
#define TSDB_DATA_TYPE_NULL 0 // 1 bytes
#define TSDB_DATA_TYPE_BOOL 1 // 1 bytes
#define TSDB_DATA_TYPE_TINYINT 2 // 1 byte
#define TSDB_DATA_TYPE_SMALLINT 3 // 2 bytes
#define TSDB_DATA_TYPE_INT 4 // 4 bytes
#define TSDB_DATA_TYPE_BIGINT 5 // 8 bytes
#define TSDB_DATA_TYPE_FLOAT 6 // 4 bytes
#define TSDB_DATA_TYPE_DOUBLE 7 // 8 bytes
#define TSDB_DATA_TYPE_BINARY 8 // string, alias for varchar
#define TSDB_DATA_TYPE_TIMESTAMP 9 // 8 bytes
#define TSDB_DATA_TYPE_NCHAR 10 // unicode string
#define TSDB_DATA_TYPE_UTINYINT 11 // 1 byte
#define TSDB_DATA_TYPE_USMALLINT 12 // 2 bytes
#define TSDB_DATA_TYPE_UINT 13 // 4 bytes
#define TSDB_DATA_TYPE_UBIGINT 14 // 8 bytes
#define TSDB_DATA_TYPE_VARCHAR 15 // string
#define TSDB_DATA_TYPE_VARBINARY 16 // binary
#define TSDB_DATA_TYPE_JSON 17 // json
#define TSDB_DATA_TYPE_DECIMAL 18 // decimal
#define TSDB_DATA_TYPE_BLOB 19 // binary
#define TSDB_DATA_TYPE_NULL 0 // 1 bytes
#define TSDB_DATA_TYPE_BOOL 1 // 1 bytes
#define TSDB_DATA_TYPE_TINYINT 2 // 1 byte
#define TSDB_DATA_TYPE_SMALLINT 3 // 2 bytes
#define TSDB_DATA_TYPE_INT 4 // 4 bytes
#define TSDB_DATA_TYPE_BIGINT 5 // 8 bytes
#define TSDB_DATA_TYPE_FLOAT 6 // 4 bytes
#define TSDB_DATA_TYPE_DOUBLE 7 // 8 bytes
#define TSDB_DATA_TYPE_BINARY 8 // string, alias for varchar
#define TSDB_DATA_TYPE_TIMESTAMP 9 // 8 bytes
#define TSDB_DATA_TYPE_NCHAR 10 // unicode string
#define TSDB_DATA_TYPE_UTINYINT 11 // 1 byte
#define TSDB_DATA_TYPE_USMALLINT 12 // 2 bytes
#define TSDB_DATA_TYPE_UINT 13 // 4 bytes
#define TSDB_DATA_TYPE_UBIGINT 14 // 8 bytes
#define TSDB_DATA_TYPE_VARCHAR 15 // string
#define TSDB_DATA_TYPE_VARBINARY 16 // binary
#define TSDB_DATA_TYPE_JSON 17 // json
#define TSDB_DATA_TYPE_DECIMAL 18 // decimal
#define TSDB_DATA_TYPE_BLOB 19 // binary
typedef enum {
TSDB_OPTION_LOCALE,
@ -63,9 +63,9 @@ typedef enum {
typedef enum {
TSDB_SML_UNKNOWN_PROTOCOL = 0,
TSDB_SML_LINE_PROTOCOL = 1,
TSDB_SML_TELNET_PROTOCOL = 2,
TSDB_SML_JSON_PROTOCOL = 3,
TSDB_SML_LINE_PROTOCOL = 1,
TSDB_SML_TELNET_PROTOCOL = 2,
TSDB_SML_JSON_PROTOCOL = 3,
} TSDB_SML_PROTOCOL_TYPE;
typedef enum {
@ -79,28 +79,28 @@ typedef enum {
} TSDB_SML_TIMESTAMP_TYPE;
typedef struct taosField {
char name[65];
int8_t type;
int32_t bytes;
char name[65];
int8_t type;
int32_t bytes;
} TAOS_FIELD;
#ifdef _TD_GO_DLL_
#define DLL_EXPORT __declspec(dllexport)
#define DLL_EXPORT __declspec(dllexport)
#else
#define DLL_EXPORT
#define DLL_EXPORT
#endif
typedef void (*__taos_async_fn_t)(void *param, TAOS_RES *, int code);
typedef struct TAOS_BIND {
int buffer_type;
void * buffer;
uintptr_t buffer_length; // unused
uintptr_t *length;
int * is_null;
int buffer_type;
void *buffer;
uintptr_t buffer_length; // unused
uintptr_t *length;
int *is_null;
int is_unsigned; // unused
int * error; // unused
int is_unsigned; // unused
int *error; // unused
union {
int64_t ts;
int8_t b;
@ -113,22 +113,23 @@ typedef struct TAOS_BIND {
unsigned char *bin;
char *nchar;
} u;
unsigned int allocated;
unsigned int allocated;
} TAOS_BIND;
typedef struct TAOS_MULTI_BIND {
int buffer_type;
void *buffer;
uintptr_t buffer_length;
int32_t *length;
char *is_null;
int num;
int buffer_type;
void *buffer;
uintptr_t buffer_length;
int32_t *length;
char *is_null;
int num;
} TAOS_MULTI_BIND;
DLL_EXPORT void taos_cleanup(void);
DLL_EXPORT int taos_options(TSDB_OPTION option, const void *arg, ...);
DLL_EXPORT TAOS *taos_connect(const char *ip, const char *user, const char *pass, const char *db, uint16_t port);
DLL_EXPORT TAOS *taos_connect_l(const char *ip, int ipLen, const char *user, int userLen, const char *pass, int passLen, const char *db, int dbLen, uint16_t port);
DLL_EXPORT TAOS *taos_connect_l(const char *ip, int ipLen, const char *user, int userLen, const char *pass, int passLen,
const char *db, int dbLen, uint16_t port);
DLL_EXPORT TAOS *taos_connect_auth(const char *ip, const char *user, const char *auth, const char *db, uint16_t port);
DLL_EXPORT void taos_close(TAOS *taos);
@ -136,62 +137,63 @@ const char *taos_data_type(int type);
DLL_EXPORT TAOS_STMT *taos_stmt_init(TAOS *taos);
DLL_EXPORT int taos_stmt_prepare(TAOS_STMT *stmt, const char *sql, unsigned long length);
DLL_EXPORT int taos_stmt_set_tbname_tags(TAOS_STMT* stmt, const char* name, TAOS_BIND* tags);
DLL_EXPORT int taos_stmt_set_tbname(TAOS_STMT* stmt, const char* name);
DLL_EXPORT int taos_stmt_set_sub_tbname(TAOS_STMT* stmt, const char* name);
DLL_EXPORT int taos_stmt_set_tbname_tags(TAOS_STMT *stmt, const char *name, TAOS_BIND *tags);
DLL_EXPORT int taos_stmt_set_tbname(TAOS_STMT *stmt, const char *name);
DLL_EXPORT int taos_stmt_set_sub_tbname(TAOS_STMT *stmt, const char *name);
DLL_EXPORT int taos_stmt_is_insert(TAOS_STMT *stmt, int *insert);
DLL_EXPORT int taos_stmt_num_params(TAOS_STMT *stmt, int *nums);
DLL_EXPORT int taos_stmt_get_param(TAOS_STMT *stmt, int idx, int *type, int *bytes);
DLL_EXPORT int taos_stmt_bind_param(TAOS_STMT *stmt, TAOS_BIND *bind);
DLL_EXPORT int taos_stmt_bind_param_batch(TAOS_STMT* stmt, TAOS_MULTI_BIND* bind);
DLL_EXPORT int taos_stmt_bind_single_param_batch(TAOS_STMT* stmt, TAOS_MULTI_BIND* bind, int colIdx);
DLL_EXPORT int taos_stmt_add_batch(TAOS_STMT *stmt);
DLL_EXPORT int taos_stmt_execute(TAOS_STMT *stmt);
DLL_EXPORT TAOS_RES * taos_stmt_use_result(TAOS_STMT *stmt);
DLL_EXPORT int taos_stmt_close(TAOS_STMT *stmt);
DLL_EXPORT char * taos_stmt_errstr(TAOS_STMT *stmt);
DLL_EXPORT int taos_stmt_is_insert(TAOS_STMT *stmt, int *insert);
DLL_EXPORT int taos_stmt_num_params(TAOS_STMT *stmt, int *nums);
DLL_EXPORT int taos_stmt_get_param(TAOS_STMT *stmt, int idx, int *type, int *bytes);
DLL_EXPORT int taos_stmt_bind_param(TAOS_STMT *stmt, TAOS_BIND *bind);
DLL_EXPORT int taos_stmt_bind_param_batch(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind);
DLL_EXPORT int taos_stmt_bind_single_param_batch(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind, int colIdx);
DLL_EXPORT int taos_stmt_add_batch(TAOS_STMT *stmt);
DLL_EXPORT int taos_stmt_execute(TAOS_STMT *stmt);
DLL_EXPORT TAOS_RES *taos_stmt_use_result(TAOS_STMT *stmt);
DLL_EXPORT int taos_stmt_close(TAOS_STMT *stmt);
DLL_EXPORT char *taos_stmt_errstr(TAOS_STMT *stmt);
DLL_EXPORT TAOS_RES *taos_query(TAOS *taos, const char *sql);
DLL_EXPORT TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen);
DLL_EXPORT TAOS_ROW taos_fetch_row(TAOS_RES *res);
DLL_EXPORT int taos_result_precision(TAOS_RES *res); // get the time precision of result
DLL_EXPORT void taos_free_result(TAOS_RES *res);
DLL_EXPORT int taos_field_count(TAOS_RES *res);
DLL_EXPORT int taos_num_fields(TAOS_RES *res);
DLL_EXPORT int taos_affected_rows(TAOS_RES *res);
DLL_EXPORT int taos_result_precision(TAOS_RES *res); // get the time precision of result
DLL_EXPORT void taos_free_result(TAOS_RES *res);
DLL_EXPORT int taos_field_count(TAOS_RES *res);
DLL_EXPORT int taos_num_fields(TAOS_RES *res);
DLL_EXPORT int taos_affected_rows(TAOS_RES *res);
DLL_EXPORT TAOS_FIELD *taos_fetch_fields(TAOS_RES *res);
DLL_EXPORT int taos_select_db(TAOS *taos, const char *db);
DLL_EXPORT int taos_print_row(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields);
DLL_EXPORT void taos_stop_query(TAOS_RES *res);
DLL_EXPORT bool taos_is_null(TAOS_RES *res, int32_t row, int32_t col);
DLL_EXPORT int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows);
DLL_EXPORT int taos_validate_sql(TAOS *taos, const char *sql);
DLL_EXPORT int taos_select_db(TAOS *taos, const char *db);
DLL_EXPORT int taos_print_row(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields);
DLL_EXPORT void taos_stop_query(TAOS_RES *res);
DLL_EXPORT bool taos_is_null(TAOS_RES *res, int32_t row, int32_t col);
DLL_EXPORT int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows);
DLL_EXPORT int taos_validate_sql(TAOS *taos, const char *sql);
DLL_EXPORT int* taos_fetch_lengths(TAOS_RES *res);
DLL_EXPORT int *taos_fetch_lengths(TAOS_RES *res);
DLL_EXPORT const char *taos_get_server_info(TAOS *taos);
DLL_EXPORT const char *taos_get_client_info();
DLL_EXPORT const char *taos_errstr(TAOS_RES *tres);
DLL_EXPORT int taos_errno(TAOS_RES *tres);
DLL_EXPORT int taos_errno(TAOS_RES *tres);
DLL_EXPORT void taos_query_a(TAOS *taos, const char *sql, __taos_async_fn_t fp, void *param);
DLL_EXPORT void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, void *param);
typedef void (*__taos_sub_fn_t)(TAOS_SUB* tsub, TAOS_RES *res, void* param, int code);
DLL_EXPORT TAOS_SUB *taos_subscribe(TAOS* taos, int restart, const char* topic, const char *sql, __taos_sub_fn_t fp, void *param, int interval);
typedef void (*__taos_sub_fn_t)(TAOS_SUB *tsub, TAOS_RES *res, void *param, int code);
DLL_EXPORT TAOS_SUB *taos_subscribe(TAOS *taos, int restart, const char *topic, const char *sql, __taos_sub_fn_t fp,
void *param, int interval);
DLL_EXPORT TAOS_RES *taos_consume(TAOS_SUB *tsub);
DLL_EXPORT void taos_unsubscribe(TAOS_SUB *tsub, int keepProgress);
DLL_EXPORT TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sql, void (*fp)(void *param, TAOS_RES *, TAOS_ROW row),
int64_t stime, void *param, void (*callback)(void *));
DLL_EXPORT void taos_close_stream(TAOS_STREAM *tstr);
int64_t stime, void *param, void (*callback)(void *));
DLL_EXPORT void taos_close_stream(TAOS_STREAM *tstr);
DLL_EXPORT int taos_load_table_info(TAOS *taos, const char* tableNameList);
DLL_EXPORT TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int protocol, int precision);
DLL_EXPORT int taos_load_table_info(TAOS *taos, const char *tableNameList);
DLL_EXPORT TAOS_RES *taos_schemaless_insert(TAOS *taos, char *lines[], int numLines, int protocol, int precision);
/* --------------------------TMQ INTERFACE------------------------------- */
@ -215,10 +217,10 @@ typedef void(tmq_commit_cb(tmq_t *, tmq_resp_err_t, tmq_topic_vgroup_list_t *, v
DLL_EXPORT tmq_list_t *tmq_list_new();
DLL_EXPORT int32_t tmq_list_append(tmq_list_t *, const char *);
DLL_EXPORT TAOS_RES *tmq_create_topic(TAOS *taos, const char *name, const char *sql, int sqlLen);
DLL_EXPORT tmq_t *tmq_consumer_new(void *conn, tmq_conf_t *conf, char *errstr, int32_t errstrLen);
DLL_EXPORT void tmq_message_destroy(tmq_message_t* tmq_message);
DLL_EXPORT const char* tmq_err2str(tmq_resp_err_t);
DLL_EXPORT TAOS_RES *tmq_create_topic(TAOS *taos, const char *name, const char *sql, int sqlLen);
DLL_EXPORT tmq_t *tmq_consumer_new(void *conn, tmq_conf_t *conf, char *errstr, int32_t errstrLen);
DLL_EXPORT void tmq_message_destroy(tmq_message_t *tmq_message);
DLL_EXPORT const char *tmq_err2str(tmq_resp_err_t);
/* ------------------------TMQ CONSUMER INTERFACE------------------------ */
DLL_EXPORT tmq_resp_err_t tmq_subscribe(tmq_t *tmq, tmq_list_t *topic_list);
@ -227,7 +229,7 @@ DLL_EXPORT tmq_resp_err_t tmq_unsubscribe(tmq_t* tmq);
DLL_EXPORT tmq_resp_err_t tmq_subscription(tmq_t* tmq, tmq_topic_vgroup_list_t** topics);
#endif
DLL_EXPORT tmq_message_t *tmq_consumer_poll(tmq_t *tmq, int64_t blocking_time);
DLL_EXPORT tmq_resp_err_t tmq_consumer_close(tmq_t* tmq);
DLL_EXPORT tmq_resp_err_t tmq_consumer_close(tmq_t *tmq);
#if 0
DLL_EXPORT tmq_resp_err_t tmq_assign(tmq_t* tmq, const tmq_topic_vgroup_list_t* vgroups);
DLL_EXPORT tmq_resp_err_t tmq_assignment(tmq_t* tmq, tmq_topic_vgroup_list_t** vgroups);
@ -251,8 +253,9 @@ DLL_EXPORT void tmq_conf_destroy(tmq_conf_t *conf);
DLL_EXPORT tmq_conf_res_t tmq_conf_set(tmq_conf_t *conf, const char *key, const char *value);
DLL_EXPORT void tmq_conf_set_offset_commit_cb(tmq_conf_t *conf, tmq_commit_cb *cb);
//temporary used function for demo only
void tmqShowMsg(tmq_message_t* tmq_message);
// temporary used function for demo only
void tmqShowMsg(tmq_message_t *tmq_message);
int32_t tmqGetSkipLogNum(tmq_message_t *tmq_message);
#ifdef __cplusplus
}

View File

@ -17,35 +17,35 @@
#define TDENGINE_COMMON_H
#include "taosdef.h"
#include "tmsg.h"
#include "tarray.h"
#include "tmsg.h"
#include "tvariant.h"
//typedef struct STimeWindow {
// TSKEY skey;
// TSKEY ekey;
//} STimeWindow;
// typedef struct STimeWindow {
// TSKEY skey;
// TSKEY ekey;
// } STimeWindow;
//typedef struct {
// int32_t dataLen;
// char name[TSDB_TABLE_FNAME_LEN];
// char *data;
//} STagData;
// typedef struct {
// int32_t dataLen;
// char name[TSDB_TABLE_FNAME_LEN];
// char *data;
// } STagData;
//typedef struct SSchema {
// uint8_t type;
// char name[TSDB_COL_NAME_LEN];
// int16_t colId;
// int16_t bytes;
//} SSchema;
// typedef struct SSchema {
// uint8_t type;
// char name[TSDB_COL_NAME_LEN];
// int16_t colId;
// int16_t bytes;
// } SSchema;
#define TMQ_REQ_TYPE_COMMIT_ONLY 0
#define TMQ_REQ_TYPE_CONSUME_ONLY 1
#define TMQ_REQ_TYPE_COMMIT_ONLY 0
#define TMQ_REQ_TYPE_CONSUME_ONLY 1
#define TMQ_REQ_TYPE_CONSUME_AND_COMMIT 2
typedef struct {
uint32_t numOfTables;
SArray *pGroupList;
SHashObj *map; // speedup acquire the tableQueryInfo by table uid
SArray* pGroupList;
SHashObj* map; // speedup acquire the tableQueryInfo by table uid
} STableGroupInfo;
typedef struct SColumnDataAgg {
@ -67,25 +67,25 @@ typedef struct SDataBlockInfo {
typedef struct SConstantItem {
SColumnInfo info;
int32_t startRow; // run-length-encoding to save the space for multiple rows
int32_t startRow; // run-length-encoding to save the space for multiple rows
int32_t endRow;
SVariant value;
} SConstantItem;
// info.numOfCols = taosArrayGetSize(pDataBlock) + taosArrayGetSize(pConstantList);
typedef struct SSDataBlock {
SColumnDataAgg *pBlockAgg;
SArray *pDataBlock; // SArray<SColumnInfoData>
SArray *pConstantList; // SArray<SConstantItem>, it is a constant/tags value of the corresponding result value.
SDataBlockInfo info;
SColumnDataAgg* pBlockAgg;
SArray* pDataBlock; // SArray<SColumnInfoData>
SArray* pConstantList; // SArray<SConstantItem>, it is a constant/tags value of the corresponding result value.
SDataBlockInfo info;
} SSDataBlock;
// pBlockAgg->numOfNull == info.rows, all data are null
// pBlockAgg->numOfNull == 0, no data are null.
typedef struct SColumnInfoData {
SColumnInfo info; // TODO filter info needs to be removed
char *nullbitmap;//
char *pData; // the corresponding block data in memory
SColumnInfo info; // TODO filter info needs to be removed
char* nullbitmap; //
char* pData; // the corresponding block data in memory
} SColumnInfoData;
static FORCE_INLINE int32_t tEncodeDataBlock(void** buf, const SSDataBlock* pBlock) {
@ -110,7 +110,7 @@ static FORCE_INLINE int32_t tEncodeDataBlock(void** buf, const SSDataBlock* pBlo
return tlen;
}
static FORCE_INLINE void* tDecodeDataBlock(void* buf, SSDataBlock* pBlock) {
static FORCE_INLINE void* tDecodeDataBlock(const void* buf, SSDataBlock* pBlock) {
int32_t sz;
buf = taosDecodeFixedI64(buf, &pBlock->info.uid);
@ -127,7 +127,7 @@ static FORCE_INLINE void* tDecodeDataBlock(void* buf, SSDataBlock* pBlock) {
buf = taosDecodeBinary(buf, (void**)&data.pData, colSz);
taosArrayPush(pBlock->pDataBlock, &data);
}
return buf;
return (void*)buf;
}
static FORCE_INLINE int32_t tEncodeSMqConsumeRsp(void** buf, const SMqConsumeRsp* pRsp) {
@ -146,7 +146,7 @@ static FORCE_INLINE int32_t tEncodeSMqConsumeRsp(void** buf, const SMqConsumeRsp
}
tlen += taosEncodeFixedI32(buf, sz);
for (int32_t i = 0; i < sz; i++) {
SSDataBlock* pBlock = (SSDataBlock*) taosArrayGet(pRsp->pBlockData, i);
SSDataBlock* pBlock = (SSDataBlock*)taosArrayGet(pRsp->pBlockData, i);
tlen += tEncodeDataBlock(buf, pBlock);
}
return tlen;
@ -179,19 +179,18 @@ static FORCE_INLINE void tDeleteSSDataBlock(SSDataBlock* pBlock) {
return;
}
//int32_t numOfOutput = pBlock->info.numOfCols;
// int32_t numOfOutput = pBlock->info.numOfCols;
int32_t sz = taosArrayGetSize(pBlock->pDataBlock);
for(int32_t i = 0; i < sz; ++i) {
for (int32_t i = 0; i < sz; ++i) {
SColumnInfoData* pColInfoData = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, i);
tfree(pColInfoData->pData);
}
taosArrayDestroy(pBlock->pDataBlock);
tfree(pBlock->pBlockAgg);
//tfree(pBlock);
// tfree(pBlock);
}
static FORCE_INLINE void tDeleteSMqConsumeRsp(SMqConsumeRsp* pRsp) {
if (pRsp->schemas) {
if (pRsp->schemas->nCols) {
@ -199,53 +198,53 @@ static FORCE_INLINE void tDeleteSMqConsumeRsp(SMqConsumeRsp* pRsp) {
}
free(pRsp->schemas);
}
taosArrayDestroyEx(pRsp->pBlockData, (void(*)(void*))tDeleteSSDataBlock);
pRsp->pBlockData = NULL;
//for (int i = 0; i < taosArrayGetSize(pRsp->pBlockData); i++) {
//SSDataBlock* pDataBlock = (SSDataBlock*)taosArrayGet(pRsp->pBlockData, i);
//tDeleteSSDataBlock(pDataBlock);
taosArrayDestroyEx(pRsp->pBlockData, (void (*)(void*))tDeleteSSDataBlock);
pRsp->pBlockData = NULL;
// for (int i = 0; i < taosArrayGetSize(pRsp->pBlockData); i++) {
// SSDataBlock* pDataBlock = (SSDataBlock*)taosArrayGet(pRsp->pBlockData, i);
// tDeleteSSDataBlock(pDataBlock);
//}
}
//======================================================================================================================
// the following structure shared by parser and executor
typedef struct SColumn {
uint64_t uid;
char name[TSDB_COL_NAME_LEN];
int8_t flag; // column type: normal column, tag, or user-input column (integer/float/string)
SColumnInfo info;
uint64_t uid;
char name[TSDB_COL_NAME_LEN];
int8_t flag; // column type: normal column, tag, or user-input column (integer/float/string)
SColumnInfo info;
} SColumn;
typedef struct SLimit {
int64_t limit;
int64_t offset;
int64_t limit;
int64_t offset;
} SLimit;
typedef struct SOrder {
uint32_t order;
SColumn col;
uint32_t order;
SColumn col;
} SOrder;
typedef struct SGroupbyExpr {
SArray* columnInfo; // SArray<SColIndex>, group by columns information
bool groupbyTag; // group by tag or column
SArray* columnInfo; // SArray<SColIndex>, group by columns information
bool groupbyTag; // group by tag or column
} SGroupbyExpr;
// the structure for sql function in select clause
typedef struct SSqlExpr {
char token[TSDB_COL_NAME_LEN]; // original token
SSchema resSchema;
char token[TSDB_COL_NAME_LEN]; // original token
SSchema resSchema;
int32_t numOfCols;
SColumn* pColumns; // data columns that are required by query
int32_t interBytes; // inter result buffer size
int16_t numOfParams; // argument value of each function
SVariant param[3]; // parameters are not more than 3
int32_t numOfCols;
SColumn* pColumns; // data columns that are required by query
int32_t interBytes; // inter result buffer size
int16_t numOfParams; // argument value of each function
SVariant param[3]; // parameters are not more than 3
} SSqlExpr;
typedef struct SExprInfo {
struct SSqlExpr base;
struct tExprNode *pExpr;
struct SSqlExpr base;
struct tExprNode* pExpr;
} SExprInfo;
typedef struct SStateWindow {
@ -253,11 +252,11 @@ typedef struct SStateWindow {
} SStateWindow;
typedef struct SSessionWindow {
int64_t gap; // gap between two session window(in microseconds)
int64_t gap; // gap between two session window(in microseconds)
SColumn col;
} SSessionWindow;
#define QUERY_ASC_FORWARD_STEP 1
#define QUERY_ASC_FORWARD_STEP 1
#define QUERY_DESC_FORWARD_STEP -1
#define GET_FORWARD_DIRECTION_FACTOR(ord) (((ord) == TSDB_ORDER_ASC) ? QUERY_ASC_FORWARD_STEP : QUERY_DESC_FORWARD_STEP)

View File

@ -23,7 +23,7 @@ extern "C" {
#include "taos.h"
#include "tdef.h"
typedef uint64_t tb_uid_t;
typedef int64_t tb_uid_t;
#define TSWINDOW_INITIALIZER ((STimeWindow){INT64_MIN, INT64_MAX})
#define TSWINDOW_DESC_INITIALIZER ((STimeWindow){INT64_MAX, INT64_MIN})
@ -38,12 +38,12 @@ typedef enum {
} EQType;
typedef enum {
TSDB_SUPER_TABLE = 1, // super table
TSDB_CHILD_TABLE = 2, // table created from super table
TSDB_NORMAL_TABLE = 3, // ordinary table
TSDB_STREAM_TABLE = 4, // table created from stream computing
TSDB_TEMP_TABLE = 5, // temp table created by nest query
TSDB_TABLE_MAX = 6
TSDB_SUPER_TABLE = 1, // super table
TSDB_CHILD_TABLE = 2, // table created from super table
TSDB_NORMAL_TABLE = 3, // ordinary table
TSDB_STREAM_TABLE = 4, // table created from stream computing
TSDB_TEMP_TABLE = 5, // temp table created by nest query
TSDB_TABLE_MAX = 6
} ETableType;
typedef enum {

View File

@ -25,9 +25,9 @@ extern "C" {
#include "taoserror.h"
#include "tarray.h"
#include "tcoding.h"
#include "trow.h"
#include "thash.h"
#include "tlist.h"
#include "trow.h"
/* ------------------------ MESSAGE DEFINITIONS ------------------------ */
#define TD_MSG_NUMBER_
@ -69,7 +69,7 @@ typedef uint16_t tmsg_t;
#define TSDB_IE_TYPE_DNODE_STATE 7
typedef enum {
HEARTBEAT_TYPE_MQ = 0,
HEARTBEAT_TYPE_MQ = 0,
HEARTBEAT_TYPE_QUERY = 1,
// types can be added here
//
@ -82,7 +82,6 @@ enum {
HEARTBEAT_KEY_MQ_TMP,
};
typedef enum _mgmt_table {
TSDB_MGMT_TABLE_START,
TSDB_MGMT_TABLE_ACCT,
@ -192,14 +191,14 @@ typedef struct {
// Submit message for one table
typedef struct SSubmitBlk {
uint64_t uid; // table unique id
int32_t tid; // table id
int32_t padding; // TODO just for padding here
int32_t sversion; // data schema version
int32_t dataLen; // data part length, not including the SSubmitBlk head
int32_t schemaLen; // schema length, if length is 0, no schema exists
int16_t numOfRows; // total number of rows in current submit block
char data[];
int64_t uid; // table unique id
int32_t tid; // table id
int32_t padding; // TODO just for padding here
int32_t sversion; // data schema version
int32_t dataLen; // data part length, not including the SSubmitBlk head
int32_t schemaLen; // schema length, if length is 0, no schema exists
int16_t numOfRows; // total number of rows in current submit block
char data[];
} SSubmitBlk;
typedef struct {
@ -226,7 +225,7 @@ typedef struct {
typedef struct {
int32_t totalLen;
int32_t len;
STSRow *row;
STSRow* row;
} SSubmitBlkIter;
typedef struct {
@ -303,9 +302,9 @@ typedef struct {
} SConnectReq;
typedef struct SEpSet {
int8_t inUse;
int8_t numOfEps;
SEp eps[TSDB_MAX_REPLICA];
int8_t inUse;
int8_t numOfEps;
SEp eps[TSDB_MAX_REPLICA];
} SEpSet;
static FORCE_INLINE int taosEncodeSEpSet(void** buf, const SEpSet* pEp) {
@ -328,7 +327,6 @@ static FORCE_INLINE void* taosDecodeSEpSet(void* buf, SEpSet* pEp) {
}
return buf;
}
static FORCE_INLINE int32_t tEncodeSEpSet(SCoder* pEncoder, const SEpSet* pEp) {
if (tEncodeI8(pEncoder, pEp->inUse) < 0) return -1;
if (tEncodeI8(pEncoder, pEp->numOfEps) < 0) return -1;
@ -736,9 +734,9 @@ typedef struct {
} SDnodeCfg;
typedef struct {
int32_t id;
int8_t isMnode;
SEp ep;
int32_t id;
int8_t isMnode;
SEp ep;
} SDnodeEp;
typedef struct {
@ -818,10 +816,10 @@ typedef struct {
// todo refactor
typedef struct SVgroupInfo {
int32_t vgId;
uint32_t hashBegin;
uint32_t hashEnd;
SEpSet epset;
int32_t vgId;
uint32_t hashBegin;
uint32_t hashEnd;
SEpSet epset;
} SVgroupInfo;
typedef struct {
@ -1036,7 +1034,7 @@ typedef struct SSubQueryMsg {
uint64_t queryId;
uint64_t taskId;
int8_t taskType;
uint32_t sqlLen; // the query sql,
uint32_t sqlLen; // the query sql,
uint32_t phyLen;
char msg[];
} SSubQueryMsg;
@ -1055,7 +1053,6 @@ typedef struct {
uint64_t taskId;
} SQueryContinueReq;
typedef struct {
SMsgHead header;
uint64_t sId;
@ -1256,10 +1253,10 @@ typedef struct {
} SMqTmrMsg;
typedef struct {
const char* key;
SArray* lostConsumers; //SArray<int64_t>
SArray* removedConsumers; //SArray<int64_t>
SArray* newConsumers; //SArray<int64_t>
const char* key;
SArray* lostConsumers; // SArray<int64_t>
SArray* removedConsumers; // SArray<int64_t>
SArray* newConsumers; // SArray<int64_t>
} SMqRebSubscribe;
static FORCE_INLINE SMqRebSubscribe* tNewSMqRebSubscribe(const char* key) {
@ -1289,10 +1286,11 @@ _err:
return NULL;
}
// this message is sent from mnode to mnode(read thread to write thread), so there is no need for serialization / deserialization
// this message is sent from mnode to mnode(read thread to write thread), so there is no need for serialization /
// deserialization
typedef struct {
//SArray* rebSubscribes; //SArray<SMqRebSubscribe>
SHashObj* rebSubHash; // SHashObj<key, SMqRebSubscribe>
// SArray* rebSubscribes; //SArray<SMqRebSubscribe>
SHashObj* rebSubHash; // SHashObj<key, SMqRebSubscribe>
} SMqDoRebalanceMsg;
#if 0
@ -1460,9 +1458,9 @@ static FORCE_INLINE int32_t tEncodeSMsgHead(void** buf, const SMsgHead* pMsg) {
}
typedef struct SMqHbRsp {
int8_t status; //idle or not
int8_t status; // idle or not
int8_t vnodeChanged;
int8_t epChanged; // should use new epset
int8_t epChanged; // should use new epset
int8_t reserved;
SEpSet epSet;
} SMqHbRsp;
@ -1485,7 +1483,7 @@ static FORCE_INLINE void* taosDecodeSMqHbRsp(void* buf, SMqHbRsp* pRsp) {
}
typedef struct SMqHbOneTopicBatchRsp {
char topicName[TSDB_TOPIC_FNAME_LEN];
char topicName[TSDB_TOPIC_FNAME_LEN];
SArray* rsps; // SArray<SMqHbRsp>
} SMqHbOneTopicBatchRsp;
@ -1515,8 +1513,8 @@ static FORCE_INLINE void* taosDecodeSMqHbOneTopicBatchRsp(void* buf, SMqHbOneTop
}
typedef struct SMqHbBatchRsp {
int64_t consumerId;
SArray* batchRsps; // SArray<SMqHbOneTopicBatchRsp>
int64_t consumerId;
SArray* batchRsps; // SArray<SMqHbOneTopicBatchRsp>
} SMqHbBatchRsp;
static FORCE_INLINE int taosEncodeSMqHbBatchRsp(void** buf, const SMqHbBatchRsp* pBatchRsp) {
@ -1525,7 +1523,7 @@ static FORCE_INLINE int taosEncodeSMqHbBatchRsp(void** buf, const SMqHbBatchRsp*
int32_t sz;
tlen += taosEncodeFixedI32(buf, sz);
for (int32_t i = 0; i < sz; i++) {
SMqHbOneTopicBatchRsp* pRsp = (SMqHbOneTopicBatchRsp*) taosArrayGet(pBatchRsp->batchRsps, i);
SMqHbOneTopicBatchRsp* pRsp = (SMqHbOneTopicBatchRsp*)taosArrayGet(pBatchRsp->batchRsps, i);
tlen += taosEncodeSMqHbOneTopicBatchRsp(buf, pRsp);
}
return tlen;
@ -1537,7 +1535,7 @@ static FORCE_INLINE void* taosDecodeSMqHbBatchRsp(void* buf, SMqHbBatchRsp* pBat
buf = taosDecodeFixedI32(buf, &sz);
pBatchRsp->batchRsps = taosArrayInit(sz, sizeof(SMqHbOneTopicBatchRsp));
for (int32_t i = 0; i < sz; i++) {
SMqHbOneTopicBatchRsp rsp;
SMqHbOneTopicBatchRsp rsp;
buf = taosDecodeSMqHbOneTopicBatchRsp(buf, &rsp);
buf = taosArrayPush(pBatchRsp->batchRsps, &rsp);
}
@ -1709,10 +1707,10 @@ static FORCE_INLINE void* taosDecodeSMqHbTopicInfoMsg(void* buf, SMqHbTopicInfo*
}
typedef struct SMqHbMsg {
int32_t status; // ask hb endpoint
int32_t epoch;
int64_t consumerId;
SArray* pTopics; // SArray<SMqHbTopicInfo>
int32_t status; // ask hb endpoint
int32_t epoch;
int64_t consumerId;
SArray* pTopics; // SArray<SMqHbTopicInfo>
} SMqHbMsg;
static FORCE_INLINE int taosEncodeSMqMsg(void** buf, const SMqHbMsg* pMsg) {
@ -1745,15 +1743,15 @@ static FORCE_INLINE void* taosDecodeSMqMsg(void* buf, SMqHbMsg* pMsg) {
}
typedef struct {
int64_t leftForVer;
int32_t vgId;
int64_t consumerId;
char topicName[TSDB_TOPIC_FNAME_LEN];
char cgroup[TSDB_CONSUMER_GROUP_LEN];
char* sql;
char* logicalPlan;
char* physicalPlan;
char* qmsg;
int64_t leftForVer;
int32_t vgId;
int64_t consumerId;
char topicName[TSDB_TOPIC_FNAME_LEN];
char cgroup[TSDB_CONSUMER_GROUP_LEN];
char* sql;
char* logicalPlan;
char* physicalPlan;
char* qmsg;
} SMqSetCVgReq;
static FORCE_INLINE int32_t tEncodeSMqSetCVgReq(void** buf, const SMqSetCVgReq* pReq) {
@ -1784,16 +1782,16 @@ static FORCE_INLINE void* tDecodeSMqSetCVgReq(void* buf, SMqSetCVgReq* pReq) {
}
typedef struct {
int64_t leftForVer;
int32_t vgId;
int64_t oldConsumerId;
int64_t newConsumerId;
//char topicName[TSDB_TOPIC_FNAME_LEN];
//char cgroup[TSDB_CONSUMER_GROUP_LEN];
//char* sql;
//char* logicalPlan;
//char* physicalPlan;
//char* qmsg;
int64_t leftForVer;
int32_t vgId;
int64_t oldConsumerId;
int64_t newConsumerId;
// char topicName[TSDB_TOPIC_FNAME_LEN];
// char cgroup[TSDB_CONSUMER_GROUP_LEN];
// char* sql;
// char* logicalPlan;
// char* physicalPlan;
// char* qmsg;
} SMqMVRebReq;
static FORCE_INLINE int32_t tEncodeSMqMVRebReq(void** buf, const SMqMVRebReq* pReq) {
@ -1802,13 +1800,13 @@ static FORCE_INLINE int32_t tEncodeSMqMVRebReq(void** buf, const SMqMVRebReq* pR
tlen += taosEncodeFixedI32(buf, pReq->vgId);
tlen += taosEncodeFixedI64(buf, pReq->oldConsumerId);
tlen += taosEncodeFixedI64(buf, pReq->newConsumerId);
//tlen += taosEncodeString(buf, pReq->topicName);
//tlen += taosEncodeString(buf, pReq->cgroup);
//tlen += taosEncodeString(buf, pReq->sql);
//tlen += taosEncodeString(buf, pReq->logicalPlan);
//tlen += taosEncodeString(buf, pReq->physicalPlan);
//tlen += taosEncodeString(buf, pReq->qmsg);
//tlen += tEncodeSSubQueryMsg(buf, &pReq->msg);
// tlen += taosEncodeString(buf, pReq->topicName);
// tlen += taosEncodeString(buf, pReq->cgroup);
// tlen += taosEncodeString(buf, pReq->sql);
// tlen += taosEncodeString(buf, pReq->logicalPlan);
// tlen += taosEncodeString(buf, pReq->physicalPlan);
// tlen += taosEncodeString(buf, pReq->qmsg);
// tlen += tEncodeSSubQueryMsg(buf, &pReq->msg);
return tlen;
}
@ -1817,13 +1815,13 @@ static FORCE_INLINE void* tDecodeSMqMVRebReq(void* buf, SMqMVRebReq* pReq) {
buf = taosDecodeFixedI32(buf, &pReq->vgId);
buf = taosDecodeFixedI64(buf, &pReq->oldConsumerId);
buf = taosDecodeFixedI64(buf, &pReq->newConsumerId);
//buf = taosDecodeStringTo(buf, pReq->topicName);
//buf = taosDecodeStringTo(buf, pReq->cgroup);
//buf = taosDecodeString(buf, &pReq->sql);
//buf = taosDecodeString(buf, &pReq->logicalPlan);
//buf = taosDecodeString(buf, &pReq->physicalPlan);
//buf = taosDecodeString(buf, &pReq->qmsg);
//buf = tDecodeSSubQueryMsg(buf, &pReq->msg);
// buf = taosDecodeStringTo(buf, pReq->topicName);
// buf = taosDecodeStringTo(buf, pReq->cgroup);
// buf = taosDecodeString(buf, &pReq->sql);
// buf = taosDecodeString(buf, &pReq->logicalPlan);
// buf = taosDecodeString(buf, &pReq->physicalPlan);
// buf = taosDecodeString(buf, &pReq->qmsg);
// buf = tDecodeSSubQueryMsg(buf, &pReq->msg);
return buf;
}
@ -1845,7 +1843,7 @@ typedef struct {
typedef struct {
uint32_t nCols;
SSchema *pSchema;
SSchema* pSchema;
} SSchemaWrapper;
static FORCE_INLINE int32_t taosEncodeSSchema(void** buf, const SSchema* pSchema) {
@ -1884,7 +1882,7 @@ static FORCE_INLINE int32_t tDecodeSSchema(SCoder* pDecoder, SSchema* pSchema) {
static FORCE_INLINE int32_t tEncodeSSchemaWrapper(void** buf, const SSchemaWrapper* pSW) {
int32_t tlen = 0;
tlen += taosEncodeFixedU32(buf, pSW->nCols);
for (int32_t i = 0; i < pSW->nCols; i ++) {
for (int32_t i = 0; i < pSW->nCols; i++) {
tlen += taosEncodeSSchema(buf, &pSW->pSchema[i]);
}
return tlen;
@ -1892,20 +1890,21 @@ static FORCE_INLINE int32_t tEncodeSSchemaWrapper(void** buf, const SSchemaWrapp
static FORCE_INLINE void* tDecodeSSchemaWrapper(void* buf, SSchemaWrapper* pSW) {
buf = taosDecodeFixedU32(buf, &pSW->nCols);
pSW->pSchema = (SSchema*) calloc(pSW->nCols, sizeof(SSchema));
pSW->pSchema = (SSchema*)calloc(pSW->nCols, sizeof(SSchema));
if (pSW->pSchema == NULL) {
return NULL;
}
for (int32_t i = 0; i < pSW->nCols; i ++) {
for (int32_t i = 0; i < pSW->nCols; i++) {
buf = taosDecodeSSchema(buf, &pSW->pSchema[i]);
}
return buf;
}
typedef struct {
int64_t uid;
int32_t numOfRows;
char* colData;
int64_t uid;
int32_t numOfRows;
char* colData;
} SMqTbData;
typedef struct {
@ -1927,24 +1926,24 @@ typedef struct {
int64_t rspOffset;
int32_t skipLogNum;
int32_t numOfTopics;
SArray* pBlockData; //SArray<SSDataBlock>
SArray* pBlockData; // SArray<SSDataBlock>
} SMqConsumeRsp;
// one req for one vg+topic
typedef struct {
SMsgHead head;
//0: commit only, current offset
//1: consume only, poll next offset
//2: commit current and consume next offset
int32_t reqType;
SMsgHead head;
// 0: commit only, current offset
// 1: consume only, poll next offset
// 2: commit current and consume next offset
int32_t reqType;
int64_t reqId;
int64_t consumerId;
int64_t blockingTime;
char cgroup[TSDB_CONSUMER_GROUP_LEN];
int64_t reqId;
int64_t consumerId;
int64_t blockingTime;
char cgroup[TSDB_CONSUMER_GROUP_LEN];
int64_t offset;
char topic[TSDB_TOPIC_FNAME_LEN];
int64_t offset;
char topic[TSDB_TOPIC_FNAME_LEN];
} SMqConsumeReq;
typedef struct {
@ -1954,7 +1953,7 @@ typedef struct {
typedef struct {
char topic[TSDB_TOPIC_FNAME_LEN];
SArray* vgs; // SArray<SMqSubVgEp>
SArray* vgs; // SArray<SMqSubVgEp>
} SMqSubTopicEp;
typedef struct {
@ -1964,9 +1963,7 @@ typedef struct {
SArray* topics; // SArray<SMqSubTopicEp>
} SMqCMGetSubEpRsp;
static FORCE_INLINE void tDeleteSMqSubTopicEp(SMqSubTopicEp* pSubTopicEp) {
taosArrayDestroy(pSubTopicEp->vgs);
}
static FORCE_INLINE void tDeleteSMqSubTopicEp(SMqSubTopicEp* pSubTopicEp) { taosArrayDestroy(pSubTopicEp->vgs); }
static FORCE_INLINE int32_t tEncodeSMqSubVgEp(void** buf, const SMqSubVgEp* pVgEp) {
int32_t tlen = 0;
@ -1982,7 +1979,7 @@ static FORCE_INLINE void* tDecodeSMqSubVgEp(void* buf, SMqSubVgEp* pVgEp) {
}
static FORCE_INLINE void tDeleteSMqCMGetSubEpRsp(SMqCMGetSubEpRsp* pRsp) {
taosArrayDestroyEx(pRsp->topics, (void (*)(void*)) tDeleteSMqSubTopicEp);
taosArrayDestroyEx(pRsp->topics, (void (*)(void*))tDeleteSMqSubTopicEp);
}
static FORCE_INLINE int32_t tEncodeSMqSubTopicEp(void** buf, const SMqSubTopicEp* pTopicEp) {
@ -2049,4 +2046,4 @@ static FORCE_INLINE void* tDecodeSMqCMGetSubEpRsp(void* buf, SMqCMGetSubEpRsp* p
}
#endif
#endif /*_TD_COMMON_TAOS_MSG_H_*/
#endif /*_TD_COMMON_TAOS_MSG_H_*/

View File

@ -37,7 +37,7 @@ static FORCE_INLINE int taosEncodeFixedU8(void **buf, uint8_t value) {
return (int)sizeof(value);
}
static FORCE_INLINE void *taosDecodeFixedU8(void *buf, uint8_t *value) {
static FORCE_INLINE void *taosDecodeFixedU8(const void *buf, uint8_t *value) {
*value = ((uint8_t *)buf)[0];
return POINTER_SHIFT(buf, sizeof(*value));
}
@ -51,7 +51,7 @@ static FORCE_INLINE int taosEncodeFixedI8(void **buf, int8_t value) {
return (int)sizeof(value);
}
static FORCE_INLINE void *taosDecodeFixedI8(void *buf, int8_t *value) {
static FORCE_INLINE void *taosDecodeFixedI8(const void *buf, int8_t *value) {
*value = ((int8_t *)buf)[0];
return POINTER_SHIFT(buf, sizeof(*value));
}
@ -71,7 +71,7 @@ static FORCE_INLINE int taosEncodeFixedU16(void **buf, uint16_t value) {
return (int)sizeof(value);
}
static FORCE_INLINE void *taosDecodeFixedU16(void *buf, uint16_t *value) {
static FORCE_INLINE void *taosDecodeFixedU16(const void *buf, uint16_t *value) {
if (IS_LITTLE_ENDIAN()) {
memcpy(value, buf, sizeof(*value));
} else {
@ -87,9 +87,9 @@ static FORCE_INLINE int taosEncodeFixedI16(void **buf, int16_t value) {
return taosEncodeFixedU16(buf, ZIGZAGE(int16_t, value));
}
static FORCE_INLINE void *taosDecodeFixedI16(void *buf, int16_t *value) {
static FORCE_INLINE void *taosDecodeFixedI16(const void *buf, int16_t *value) {
uint16_t tvalue = 0;
void * ret = taosDecodeFixedU16(buf, &tvalue);
void *ret = taosDecodeFixedU16(buf, &tvalue);
*value = ZIGZAGD(int16_t, tvalue);
return ret;
}
@ -111,7 +111,7 @@ static FORCE_INLINE int taosEncodeFixedU32(void **buf, uint32_t value) {
return (int)sizeof(value);
}
static FORCE_INLINE void *taosDecodeFixedU32(void *buf, uint32_t *value) {
static FORCE_INLINE void *taosDecodeFixedU32(const void *buf, uint32_t *value) {
if (IS_LITTLE_ENDIAN()) {
memcpy(value, buf, sizeof(*value));
} else {
@ -129,9 +129,9 @@ static FORCE_INLINE int taosEncodeFixedI32(void **buf, int32_t value) {
return taosEncodeFixedU32(buf, ZIGZAGE(int32_t, value));
}
static FORCE_INLINE void *taosDecodeFixedI32(void *buf, int32_t *value) {
static FORCE_INLINE void *taosDecodeFixedI32(const void *buf, int32_t *value) {
uint32_t tvalue = 0;
void * ret = taosDecodeFixedU32(buf, &tvalue);
void *ret = taosDecodeFixedU32(buf, &tvalue);
*value = ZIGZAGD(int32_t, tvalue);
return ret;
}
@ -158,7 +158,7 @@ static FORCE_INLINE int taosEncodeFixedU64(void **buf, uint64_t value) {
return (int)sizeof(value);
}
static FORCE_INLINE void *taosDecodeFixedU64(void *buf, uint64_t *value) {
static FORCE_INLINE void *taosDecodeFixedU64(const void *buf, uint64_t *value) {
if (IS_LITTLE_ENDIAN()) {
memcpy(value, buf, sizeof(*value));
} else {
@ -180,9 +180,9 @@ static FORCE_INLINE int taosEncodeFixedI64(void **buf, int64_t value) {
return taosEncodeFixedU64(buf, ZIGZAGE(int64_t, value));
}
static FORCE_INLINE void *taosDecodeFixedI64(void *buf, int64_t *value) {
static FORCE_INLINE void *taosDecodeFixedI64(const void *buf, int64_t *value) {
uint64_t tvalue = 0;
void * ret = taosDecodeFixedU64(buf, &tvalue);
void *ret = taosDecodeFixedU64(buf, &tvalue);
*value = ZIGZAGD(int64_t, tvalue);
return ret;
}
@ -205,7 +205,7 @@ static FORCE_INLINE int taosEncodeVariantU16(void **buf, uint16_t value) {
return i + 1;
}
static FORCE_INLINE void *taosDecodeVariantU16(void *buf, uint16_t *value) {
static FORCE_INLINE void *taosDecodeVariantU16(const void *buf, uint16_t *value) {
int i = 0;
uint16_t tval = 0;
*value = 0;
@ -228,9 +228,9 @@ static FORCE_INLINE int taosEncodeVariantI16(void **buf, int16_t value) {
return taosEncodeVariantU16(buf, ZIGZAGE(int16_t, value));
}
static FORCE_INLINE void *taosDecodeVariantI16(void *buf, int16_t *value) {
static FORCE_INLINE void *taosDecodeVariantI16(const void *buf, int16_t *value) {
uint16_t tvalue = 0;
void * ret = taosDecodeVariantU16(buf, &tvalue);
void *ret = taosDecodeVariantU16(buf, &tvalue);
*value = ZIGZAGD(int16_t, tvalue);
return ret;
}
@ -253,7 +253,7 @@ static FORCE_INLINE int taosEncodeVariantU32(void **buf, uint32_t value) {
return i + 1;
}
static FORCE_INLINE void *taosDecodeVariantU32(void *buf, uint32_t *value) {
static FORCE_INLINE void *taosDecodeVariantU32(const void *buf, uint32_t *value) {
int i = 0;
uint32_t tval = 0;
*value = 0;
@ -276,9 +276,9 @@ static FORCE_INLINE int taosEncodeVariantI32(void **buf, int32_t value) {
return taosEncodeVariantU32(buf, ZIGZAGE(int32_t, value));
}
static FORCE_INLINE void *taosDecodeVariantI32(void *buf, int32_t *value) {
static FORCE_INLINE void *taosDecodeVariantI32(const void *buf, int32_t *value) {
uint32_t tvalue = 0;
void * ret = taosDecodeVariantU32(buf, &tvalue);
void *ret = taosDecodeVariantU32(buf, &tvalue);
*value = ZIGZAGD(int32_t, tvalue);
return ret;
}
@ -301,7 +301,7 @@ static FORCE_INLINE int taosEncodeVariantU64(void **buf, uint64_t value) {
return i + 1;
}
static FORCE_INLINE void *taosDecodeVariantU64(void *buf, uint64_t *value) {
static FORCE_INLINE void *taosDecodeVariantU64(const void *buf, uint64_t *value) {
int i = 0;
uint64_t tval = 0;
*value = 0;
@ -324,9 +324,9 @@ static FORCE_INLINE int taosEncodeVariantI64(void **buf, int64_t value) {
return taosEncodeVariantU64(buf, ZIGZAGE(int64_t, value));
}
static FORCE_INLINE void *taosDecodeVariantI64(void *buf, int64_t *value) {
static FORCE_INLINE void *taosDecodeVariantI64(const void *buf, int64_t *value) {
uint64_t tvalue = 0;
void * ret = taosDecodeVariantU64(buf, &tvalue);
void *ret = taosDecodeVariantU64(buf, &tvalue);
*value = ZIGZAGD(int64_t, tvalue);
return ret;
}
@ -346,7 +346,7 @@ static FORCE_INLINE int taosEncodeString(void **buf, const char *value) {
return tlen;
}
static FORCE_INLINE void *taosDecodeString(void *buf, char **value) {
static FORCE_INLINE void *taosDecodeString(const void *buf, char **value) {
uint64_t size = 0;
buf = taosDecodeVariantU64(buf, &size);
@ -360,7 +360,7 @@ static FORCE_INLINE void *taosDecodeString(void *buf, char **value) {
return POINTER_SHIFT(buf, size);
}
static FORCE_INLINE void *taosDecodeStringTo(void *buf, char *value) {
static FORCE_INLINE void *taosDecodeStringTo(const void *buf, char *value) {
uint64_t size = 0;
buf = taosDecodeVariantU64(buf, &size);
@ -373,7 +373,7 @@ static FORCE_INLINE void *taosDecodeStringTo(void *buf, char *value) {
// ---- binary
static FORCE_INLINE int taosEncodeBinary(void **buf, const void *value, int32_t valueLen) {
int tlen = 0;
int tlen = 0;
if (buf != NULL) {
memcpy(*buf, value, valueLen);
@ -384,8 +384,7 @@ static FORCE_INLINE int taosEncodeBinary(void **buf, const void *value, int32_t
return tlen;
}
static FORCE_INLINE void *taosDecodeBinary(void *buf, void **value, int32_t valueLen) {
static FORCE_INLINE void *taosDecodeBinary(const void *buf, void **value, int32_t valueLen) {
*value = malloc((size_t)valueLen);
if (*value == NULL) return NULL;
memcpy(*value, buf, (size_t)valueLen);
@ -393,8 +392,7 @@ static FORCE_INLINE void *taosDecodeBinary(void *buf, void **value, int32_t valu
return POINTER_SHIFT(buf, valueLen);
}
static FORCE_INLINE void *taosDecodeBinaryTo(void *buf, void *value, int32_t valueLen) {
static FORCE_INLINE void *taosDecodeBinaryTo(const void *buf, void *value, int32_t valueLen) {
memcpy(value, buf, (size_t)valueLen);
return POINTER_SHIFT(buf, valueLen);
}

View File

@ -3,5 +3,4 @@ add_subdirectory(util)
add_subdirectory(common)
add_subdirectory(libs)
add_subdirectory(client)
add_subdirectory(dnode)
add_subdirectory(nodes)
add_subdirectory(dnode)

View File

@ -452,6 +452,12 @@ static char* formatTimestamp(char* buf, int64_t val, int precision) {
return buf;
}
int32_t tmqGetSkipLogNum(tmq_message_t* tmq_message) {
if (tmq_message == NULL) return 0;
SMqConsumeRsp* pRsp = (SMqConsumeRsp*)tmq_message;
return pRsp->skipLogNum;
}
void tmqShowMsg(tmq_message_t* tmq_message) {
if (tmq_message == NULL) return;

View File

@ -60,8 +60,8 @@ void mndCleanupTopic(SMnode *pMnode) {}
SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
int32_t logicalPlanLen = strlen(pTopic->logicalPlan) + 1;
int32_t physicalPlanLen = strlen(pTopic->physicalPlan) + 1;
int32_t logicalPlanLen = strlen(pTopic->logicalPlan) + 1;
int32_t physicalPlanLen = strlen(pTopic->physicalPlan) + 1;
int32_t size = sizeof(SMqTopicObj) + logicalPlanLen + physicalPlanLen + pTopic->sqlLen + MND_TOPIC_RESERVE_SIZE;
SSdbRaw *pRaw = sdbAllocRaw(SDB_TOPIC, MND_TOPIC_VER_NUMBER, size);
if (pRaw == NULL) goto TOPIC_ENCODE_OVER;
@ -127,7 +127,7 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) {
SDB_GET_INT32(pRaw, dataPos, &pTopic->sqlLen, TOPIC_DECODE_OVER);
pTopic->sql = calloc(pTopic->sqlLen + 1, sizeof(char));
SDB_GET_BINARY(pRaw, dataPos, pTopic->sql, pTopic->sqlLen, TOPIC_DECODE_OVER);
SDB_GET_BINARY(pRaw, dataPos, pTopic->sql, pTopic->sqlLen, TOPIC_DECODE_OVER);
SDB_GET_INT32(pRaw, dataPos, &len, TOPIC_DECODE_OVER);
pTopic->logicalPlan = calloc(len + 1, sizeof(char));
@ -248,12 +248,21 @@ static int32_t mndCreateTopic(SMnode *pMnode, SMnodeMsg *pMsg, SCMCreateTopicReq
SSdbRaw *pTopicRaw = mndTopicActionEncode(&topicObj);
if (pTopicRaw == NULL) return -1;
if (sdbSetRawStatus(pTopicRaw, SDB_STATUS_READY) != 0) return -1;
/*STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pMsg->rpcMsg);*/
/*mndTransAppendRedolog(pTrans, pTopicRaw);*/
/*if (mndTransPrepare(pMnode, pTrans) != 0) {*/
/*mError("mq-createTopic-trans:%d, failed to prepare since %s", pTrans->id, terrstr());*/
/*mndTransDrop(pTrans);*/
/*return -1;*/
/*}*/
/*mndTransDrop(pTrans);*/
/*return 0;*/
return sdbWrite(pMnode->pSdb, pTopicRaw);
}
static int32_t mndProcessCreateTopicMsg(SMnodeMsg *pMsg) {
SMnode *pMnode = pMsg->pMnode;
char *msgStr = pMsg->rpcMsg.pCont;
SMnode *pMnode = pMsg->pMnode;
char *msgStr = pMsg->rpcMsg.pCont;
SCMCreateTopicReq createTopicReq = {0};
tDeserializeSCMCreateTopicReq(msgStr, &createTopicReq);
@ -288,13 +297,13 @@ static int32_t mndProcessCreateTopicMsg(SMnodeMsg *pMsg) {
int32_t code = mndCreateTopic(pMnode, pMsg, &createTopicReq, pDb);
mndReleaseDb(pMnode, pDb);
if (code != 0) {
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
mError("topic:%s, failed to create since %s", createTopicReq.name, terrstr());
return -1;
}
return TSDB_CODE_SUCCESS;
return 0;
}
static int32_t mndDropTopic(SMnode *pMnode, SMnodeMsg *pMsg, SMqTopicObj *pTopic) { return 0; }

View File

@ -38,7 +38,7 @@ extern "C" {
typedef struct STQ STQ;
// memory allocator provided by vnode
typedef struct STqMemRef {
typedef struct {
SMemAllocatorFactory* pAllocatorFactory;
SMemAllocator* pAllocator;
} STqMemRef;

View File

@ -35,12 +35,12 @@ typedef struct SDnode SDnode;
typedef int32_t (*PutReqToVQueryQFp)(SDnode *pDnode, struct SRpcMsg *pReq);
typedef int32_t (*SendReqToDnodeFp)(SDnode *pDnode, struct SEpSet *epSet, struct SRpcMsg *rpcMsg);
typedef struct STqCfg {
typedef struct {
// TODO
int32_t reserved;
} STqCfg;
typedef struct SVnodeCfg {
typedef struct {
int32_t vgId;
uint64_t dbId;
SDnode *pDnode;
@ -68,9 +68,9 @@ typedef struct {
SendReqToDnodeFp sendReqToDnodeFp;
} SVnodeOpt;
typedef struct STqReadHandle {
typedef struct {
int64_t ver;
uint64_t tbUid;
int64_t tbUid;
SHashObj *tbIdHash;
const SSubmitMsg *pMsg;
SSubmitBlk *pBlock;
@ -200,7 +200,7 @@ int32_t vnodeCompact(SVnode *pVnode);
int32_t vnodeSync(SVnode *pVnode);
int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad);
/* ------------------------- TQ QUERY -------------------------- */
/* ------------------------- TQ READ --------------------------- */
STqReadHandle *tqInitSubmitMsgScanner(SMeta *pMeta);
@ -208,12 +208,12 @@ static FORCE_INLINE void tqReadHandleSetColIdList(STqReadHandle *pReadHandle, SA
pReadHandle->pColIdList = pColIdList;
}
// static FORCE_INLINE void tqReadHandleSetTbUid(STqReadHandle* pHandle, const SArray* pTableIdList) {
// pHandle->tbUid = pTableIdList;
// static FORCE_INLINE void tqReadHandleSetTbUid(STqReadHandle* pHandle, int64_t tbUid) {
// pHandle->tbUid = tbUid;
//}
static FORCE_INLINE int tqReadHandleSetTbUidList(STqReadHandle *pHandle, const SArray *tbUidList) {
pHandle->tbIdHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), true, HASH_NO_LOCK);
pHandle->tbIdHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
if (pHandle->tbIdHash == NULL) {
return -1;
}

View File

@ -20,6 +20,7 @@
#include "tlog.h"
#include "tq.h"
#include "trpc.h"
#ifdef __cplusplus
extern "C" {
#endif
@ -110,10 +111,11 @@ typedef struct {
char content[];
} STqSerializedHead;
typedef int (*FTqSerialize)(const void* pObj, STqSerializedHead** ppHead);
typedef const void* (*FTqDeserialize)(const STqSerializedHead* pHead, void** ppObj);
typedef int32_t (*FTqSerialize)(const void* pObj, STqSerializedHead** ppHead);
typedef int32_t (*FTqDeserialize)(void* self, const STqSerializedHead* pHead, void** ppObj);
typedef void (*FTqDelete)(void*);
typedef struct STqMetaHandle {
typedef struct {
int64_t key;
int64_t offset;
int64_t serializedSize;
@ -131,6 +133,7 @@ typedef struct STqMetaList {
} STqMetaList;
typedef struct {
STQ* pTq;
STqMetaList* bucket[TQ_BUCKET_SIZE];
// a table head
STqMetaList* unpersistHead;
@ -187,21 +190,22 @@ typedef struct {
char* logicalPlan;
char* physicalPlan;
char* qmsg;
int64_t persistedOffset;
int64_t committedOffset;
int64_t currentOffset;
STqBuffer buffer;
SWalReadHandle* pReadhandle;
} STqTopicHandle;
} STqTopic;
typedef struct {
int64_t consumerId;
int64_t epoch;
char cgroup[TSDB_TOPIC_FNAME_LEN];
SArray* topics; // SArray<STqClientTopic>
} STqConsumerHandle;
SArray* topics; // SArray<STqTopicHandle>
} STqConsumer;
int tqSerializeConsumer(const STqConsumerHandle*, STqSerializedHead**);
const void* tqDeserializeConsumer(const STqSerializedHead* pHead, STqConsumerHandle**);
int32_t tqSerializeConsumer(const STqConsumer*, STqSerializedHead**);
int32_t tqDeserializeConsumer(STQ*, const STqSerializedHead*, STqConsumer**);
static int FORCE_INLINE tqQueryExecuting(int32_t status) { return status; }

View File

@ -23,8 +23,8 @@
extern "C" {
#endif
STqMetaStore* tqStoreOpen(const char* path, FTqSerialize pSerializer, FTqDeserialize pDeserializer, FTqDelete pDeleter,
int32_t tqConfigFlag);
STqMetaStore* tqStoreOpen(STQ* pTq, const char* path, FTqSerialize pSerializer, FTqDeserialize pDeserializer,
FTqDelete pDeleter, int32_t tqConfigFlag);
int32_t tqStoreClose(STqMetaStore*);
// int32_t tqStoreDelete(TqMetaStore*);
// int32_t tqStoreCommitAll(TqMetaStore*);

View File

@ -0,0 +1,40 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_TQ_OFFSET_H_
#define _TD_TQ_OFFSET_H_
#include "tqInt.h"
#ifdef __cplusplus
extern "C" {
#endif
typedef struct STqOffsetCfg STqOffsetCfg;
typedef struct STqOffsetStore STqOffsetStore;
STqOffsetStore* STqOffsetOpen(STqOffsetCfg*);
void STqOffsetClose(STqOffsetStore*);
int64_t tqOffsetFetch(STqOffsetStore* pStore, const char* subscribeKey);
int32_t tqOffsetCommit(STqOffsetStore* pStore, const char* subscribeKey, int64_t offset);
int32_t tqOffsetPersist(STqOffsetStore* pStore, const char* subscribeKey);
int32_t tqOffsetPersistAll(STqOffsetStore* pStore);
#ifdef __cplusplus
}
#endif
#endif /*_TD_TQ_OFFSET_H_*/

View File

@ -50,7 +50,8 @@ STQ* tqOpen(const char* path, SWal* pWal, SMeta* pMeta, STqCfg* tqConfig, SMemAl
// TODO: error code of buffer pool
}
#endif
pTq->tqMeta = tqStoreOpen(path, (FTqSerialize)tqSerializeConsumer, (FTqDeserialize)tqDeserializeConsumer, free, 0);
pTq->tqMeta =
tqStoreOpen(pTq, path, (FTqSerialize)tqSerializeConsumer, (FTqDeserialize)tqDeserializeConsumer, free, 0);
if (pTq->tqMeta == NULL) {
free(pTq);
#if 0
@ -76,19 +77,89 @@ int tqPushMsg(STQ* pTq, void* p, int64_t version) {
return 0;
}
int tqCommit(STQ* pTq) {
// do nothing
return 0;
int tqCommit(STQ* pTq) { return tqStorePersist(pTq->tqMeta); }
int32_t tqGetTopicHandleSize(const STqTopic* pTopic) {
return strlen(pTopic->topicName) + strlen(pTopic->sql) + strlen(pTopic->logicalPlan) + strlen(pTopic->physicalPlan) +
strlen(pTopic->qmsg) + sizeof(int64_t) * 3;
}
int tqSerializeConsumer(const STqConsumerHandle* pConsumer, STqSerializedHead** ppHead) {
int32_t num = taosArrayGetSize(pConsumer->topics);
int32_t sz = sizeof(STqSerializedHead) + sizeof(int64_t) * 2 + TSDB_TOPIC_FNAME_LEN +
num * (sizeof(int64_t) + TSDB_TOPIC_FNAME_LEN);
int32_t tqGetConsumerHandleSize(const STqConsumer* pConsumer) {
int num = taosArrayGetSize(pConsumer->topics);
int32_t sz = 0;
for (int i = 0; i < num; i++) {
STqTopic* pTopic = taosArrayGet(pConsumer->topics, i);
sz += tqGetTopicHandleSize(pTopic);
}
return sz;
}
static FORCE_INLINE int32_t tEncodeSTqTopic(void** buf, const STqTopic* pTopic) {
int32_t tlen = 0;
tlen += taosEncodeString(buf, pTopic->topicName);
/*tlen += taosEncodeString(buf, pTopic->sql);*/
/*tlen += taosEncodeString(buf, pTopic->logicalPlan);*/
/*tlen += taosEncodeString(buf, pTopic->physicalPlan);*/
tlen += taosEncodeString(buf, pTopic->qmsg);
tlen += taosEncodeFixedI64(buf, pTopic->persistedOffset);
tlen += taosEncodeFixedI64(buf, pTopic->committedOffset);
tlen += taosEncodeFixedI64(buf, pTopic->currentOffset);
return tlen;
}
static FORCE_INLINE const void* tDecodeSTqTopic(const void* buf, STqTopic* pTopic) {
buf = taosDecodeStringTo(buf, pTopic->topicName);
/*buf = taosDecodeString(buf, &pTopic->sql);*/
/*buf = taosDecodeString(buf, &pTopic->logicalPlan);*/
/*buf = taosDecodeString(buf, &pTopic->physicalPlan);*/
buf = taosDecodeString(buf, &pTopic->qmsg);
buf = taosDecodeFixedI64(buf, &pTopic->persistedOffset);
buf = taosDecodeFixedI64(buf, &pTopic->committedOffset);
buf = taosDecodeFixedI64(buf, &pTopic->currentOffset);
return buf;
}
static FORCE_INLINE int32_t tEncodeSTqConsumer(void** buf, const STqConsumer* pConsumer) {
int32_t sz;
int32_t tlen = 0;
tlen += taosEncodeFixedI64(buf, pConsumer->consumerId);
tlen += taosEncodeFixedI64(buf, pConsumer->epoch);
tlen += taosEncodeString(buf, pConsumer->cgroup);
sz = taosArrayGetSize(pConsumer->topics);
tlen += taosEncodeFixedI32(buf, sz);
for (int32_t i = 0; i < sz; i++) {
STqTopic* pTopic = taosArrayGet(pConsumer->topics, i);
tlen += tEncodeSTqTopic(buf, pTopic);
}
return tlen;
}
static FORCE_INLINE const void* tDecodeSTqConsumer(const void* buf, STqConsumer* pConsumer) {
int32_t sz;
buf = taosDecodeFixedI64(buf, &pConsumer->consumerId);
buf = taosDecodeFixedI64(buf, &pConsumer->epoch);
buf = taosDecodeStringTo(buf, pConsumer->cgroup);
buf = taosDecodeFixedI32(buf, &sz);
pConsumer->topics = taosArrayInit(sz, sizeof(STqTopic));
if (pConsumer->topics == NULL) return NULL;
for (int32_t i = 0; i < sz; i++) {
STqTopic pTopic;
buf = tDecodeSTqTopic(buf, &pTopic);
taosArrayPush(pConsumer->topics, &pTopic);
}
return buf;
}
int tqSerializeConsumer(const STqConsumer* pConsumer, STqSerializedHead** ppHead) {
int32_t sz = tEncodeSTqConsumer(NULL, pConsumer);
if (sz > (*ppHead)->ssize) {
void* tmpPtr = realloc(*ppHead, sz);
void* tmpPtr = realloc(*ppHead, sizeof(STqSerializedHead) + sz);
if (tmpPtr == NULL) {
free(*ppHead);
terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
return -1;
}
*ppHead = tmpPtr;
@ -96,45 +167,155 @@ int tqSerializeConsumer(const STqConsumerHandle* pConsumer, STqSerializedHead**
}
void* ptr = (*ppHead)->content;
*(int64_t*)ptr = pConsumer->consumerId;
ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
*(int64_t*)ptr = pConsumer->epoch;
ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
memcpy(ptr, pConsumer->topics, TSDB_TOPIC_FNAME_LEN);
ptr = POINTER_SHIFT(ptr, TSDB_TOPIC_FNAME_LEN);
*(int32_t*)ptr = num;
ptr = POINTER_SHIFT(ptr, sizeof(int32_t));
for (int32_t i = 0; i < num; i++) {
STqTopicHandle* pTopic = taosArrayGet(pConsumer->topics, i);
memcpy(ptr, pTopic->topicName, TSDB_TOPIC_FNAME_LEN);
ptr = POINTER_SHIFT(ptr, TSDB_TOPIC_FNAME_LEN);
*(int64_t*)ptr = pTopic->committedOffset;
POINTER_SHIFT(ptr, sizeof(int64_t));
void* abuf = ptr;
tEncodeSTqConsumer(&abuf, pConsumer);
return 0;
}
int32_t tqDeserializeConsumer(STQ* pTq, const STqSerializedHead* pHead, STqConsumer** ppConsumer) {
const void* str = pHead->content;
*ppConsumer = calloc(1, sizeof(STqConsumer));
if (*ppConsumer == NULL) {
terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
return -1;
}
if (tDecodeSTqConsumer(str, *ppConsumer) == NULL) {
terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
return -1;
}
STqConsumer* pConsumer = *ppConsumer;
int32_t sz = taosArrayGetSize(pConsumer->topics);
for (int32_t i = 0; i < sz; i++) {
STqTopic* pTopic = taosArrayGet(pConsumer->topics, i);
pTopic->pReadhandle = walOpenReadHandle(pTq->pWal);
if (pTopic->pReadhandle == NULL) {
ASSERT(false);
}
for (int i = 0; i < TQ_BUFFER_SIZE; i++) {
pTopic->buffer.output[i].status = 0;
STqReadHandle* pReadHandle = tqInitSubmitMsgScanner(pTq->pMeta);
SReadHandle handle = {.reader = pReadHandle, .meta = pTq->pMeta};
pTopic->buffer.output[i].pReadHandle = pReadHandle;
pTopic->buffer.output[i].task = qCreateStreamExecTaskInfo(pTopic->qmsg, &handle);
}
}
return 0;
}
const void* tqDeserializeConsumer(const STqSerializedHead* pHead, STqConsumerHandle** ppConsumer) {
STqConsumerHandle* pConsumer = *ppConsumer;
const void* ptr = pHead->content;
pConsumer->consumerId = *(int64_t*)ptr;
ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
pConsumer->epoch = *(int64_t*)ptr;
ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
memcpy(pConsumer->cgroup, ptr, TSDB_TOPIC_FNAME_LEN);
ptr = POINTER_SHIFT(ptr, TSDB_TOPIC_FNAME_LEN);
int32_t sz = *(int32_t*)ptr;
ptr = POINTER_SHIFT(ptr, sizeof(int32_t));
pConsumer->topics = taosArrayInit(sz, sizeof(STqTopicHandle));
for (int32_t i = 0; i < sz; i++) {
/*STqTopicHandle* topicHandle = */
/*taosArrayPush(pConsumer->topics, );*/
int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) {
SMqConsumeReq* pReq = pMsg->pCont;
int64_t consumerId = pReq->consumerId;
int64_t fetchOffset = pReq->offset;
/*int64_t blockingTime = pReq->blockingTime;*/
SMqConsumeRsp rsp = {.consumerId = consumerId, .numOfTopics = 0, .pBlockData = NULL};
STqConsumer* pConsumer = tqHandleGet(pTq->tqMeta, consumerId);
if (pConsumer == NULL) {
pMsg->pCont = NULL;
pMsg->contLen = 0;
pMsg->code = -1;
rpcSendResponse(pMsg);
return 0;
}
return NULL;
int sz = taosArrayGetSize(pConsumer->topics);
ASSERT(sz == 1);
STqTopic* pTopic = taosArrayGet(pConsumer->topics, 0);
ASSERT(strcmp(pTopic->topicName, pReq->topic) == 0);
ASSERT(pConsumer->consumerId == consumerId);
if (pReq->reqType == TMQ_REQ_TYPE_COMMIT_ONLY) {
pTopic->committedOffset = pReq->offset;
pMsg->pCont = NULL;
pMsg->contLen = 0;
pMsg->code = 0;
rpcSendResponse(pMsg);
return 0;
}
if (pReq->reqType == TMQ_REQ_TYPE_CONSUME_AND_COMMIT) {
pTopic->committedOffset = pReq->offset - 1;
}
rsp.committedOffset = pTopic->committedOffset;
rsp.reqOffset = pReq->offset;
rsp.skipLogNum = 0;
SWalHead* pHead;
while (1) {
int8_t pos = fetchOffset % TQ_BUFFER_SIZE;
if (walReadWithHandle(pTopic->pReadhandle, fetchOffset) < 0) {
break;
}
pHead = pTopic->pReadhandle->pHead;
if (pHead->head.msgType == TDMT_VND_SUBMIT) {
SSubmitMsg* pCont = (SSubmitMsg*)&pHead->head.body;
qTaskInfo_t task = pTopic->buffer.output[pos].task;
qSetStreamInput(task, pCont);
SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));
while (1) {
SSDataBlock* pDataBlock;
uint64_t ts;
if (qExecTask(task, &pDataBlock, &ts) < 0) {
ASSERT(false);
}
if (pDataBlock == NULL) {
fetchOffset++;
rsp.skipLogNum++;
break;
}
taosArrayPush(pRes, pDataBlock);
rsp.schemas = pTopic->buffer.output[pos].pReadHandle->pSchemaWrapper;
rsp.rspOffset = fetchOffset;
pTopic->currentOffset = fetchOffset;
rsp.numOfTopics = 1;
rsp.pBlockData = pRes;
int32_t tlen = tEncodeSMqConsumeRsp(NULL, &rsp);
void* buf = rpcMallocCont(tlen);
if (buf == NULL) {
pMsg->code = -1;
return -1;
}
void* abuf = buf;
tEncodeSMqConsumeRsp(&abuf, &rsp);
taosArrayDestroyEx(rsp.pBlockData, (void (*)(void*))tDeleteSSDataBlock);
pMsg->pCont = buf;
pMsg->contLen = tlen;
pMsg->code = 0;
rpcSendResponse(pMsg);
return 0;
}
} else {
fetchOffset++;
rsp.skipLogNum++;
}
}
int32_t tlen = tEncodeSMqConsumeRsp(NULL, &rsp);
void* buf = rpcMallocCont(tlen);
if (buf == NULL) {
pMsg->code = -1;
return -1;
}
void* abuf = buf;
tEncodeSMqConsumeRsp(&abuf, &rsp);
rsp.pBlockData = NULL;
pMsg->pCont = buf;
pMsg->contLen = tlen;
pMsg->code = 0;
rpcSendResponse(pMsg);
return 0;
}
int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) {
#if 0
int32_t tqProcessConsumeReqV0(STQ* pTq, SRpcMsg* pMsg) {
SMqConsumeReq* pReq = pMsg->pCont;
int64_t reqId = pReq->reqId;
int64_t consumerId = pReq->consumerId;
@ -145,7 +326,7 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) {
/*printf("vg %d get consume req\n", pReq->head.vgId);*/
STqConsumerHandle* pConsumer = tqHandleGet(pTq->tqMeta, consumerId);
STqConsumer* pConsumer = tqHandleGet(pTq->tqMeta, consumerId);
if (pConsumer == NULL) {
pMsg->pCont = NULL;
pMsg->contLen = 0;
@ -156,10 +337,10 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) {
int sz = taosArrayGetSize(pConsumer->topics);
for (int i = 0; i < sz; i++) {
STqTopicHandle* pTopic = taosArrayGet(pConsumer->topics, i);
STqTopic* pTopic = taosArrayGet(pConsumer->topics, i);
// TODO: support multiple topic in one req
if (strcmp(pTopic->topicName, pReq->topic) != 0) {
/*ASSERT(false);*/
ASSERT(false);
continue;
}
@ -195,6 +376,7 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) {
break;
}
if (walReadWithHandle(pTopic->pReadhandle, fetchOffset) < 0) {
printf("read offset %ld\n", fetchOffset);
// check err
atomic_store_8(&pTopic->buffer.output[pos].status, 0);
skip = 1;
@ -203,10 +385,10 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) {
// read until find TDMT_VND_SUBMIT
pHead = pTopic->pReadhandle->pHead;
if (pHead->head.msgType == TDMT_VND_SUBMIT) {
break;
}
rsp.skipLogNum++;
if (walReadWithHandle(pTopic->pReadhandle, fetchOffset) < 0) {
printf("read offset %ld\n", fetchOffset);
atomic_store_8(&pTopic->buffer.output[pos].status, 0);
skip = 1;
break;
@ -218,6 +400,7 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) {
SSubmitMsg* pCont = (SSubmitMsg*)&pHead->head.body;
qTaskInfo_t task = pTopic->buffer.output[pos].task;
printf("current fetch offset %ld\n", fetchOffset);
qSetStreamInput(task, pCont);
// SArray<SSDataBlock>
@ -237,6 +420,7 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) {
// TODO copy
rsp.schemas = pTopic->buffer.output[pos].pReadHandle->pSchemaWrapper;
rsp.rspOffset = fetchOffset;
pTopic->currentOffset = fetchOffset;
atomic_store_8(&pTopic->buffer.output[pos].status, 0);
@ -268,28 +452,27 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) {
}
void* abuf = buf;
tEncodeSMqConsumeRsp(&abuf, &rsp);
if (rsp.pBlockData) {
taosArrayDestroyEx(rsp.pBlockData, (void (*)(void*))tDeleteSSDataBlock);
rsp.pBlockData = NULL;
/*for (int i = 0; i < taosArrayGetSize(rsp.pBlockData); i++) {*/
/*SSDataBlock* pBlock = taosArrayGet(rsp.pBlockData, i);*/
/*tDeleteSSDataBlock(pBlock);*/
/*}*/
/*taosArrayDestroy(rsp.pBlockData);*/
}
pMsg->pCont = buf;
pMsg->contLen = tlen;
pMsg->code = 0;
rpcSendResponse(pMsg);
return 0;
}
#endif
int32_t tqProcessRebReq(STQ* pTq, char* msg) {
SMqMVRebReq req = {0};
tDecodeSMqMVRebReq(msg, &req);
STqConsumerHandle* pConsumer = tqHandleGet(pTq->tqMeta, req.oldConsumerId);
STqConsumer* pConsumer = tqHandleGet(pTq->tqMeta, req.oldConsumerId);
ASSERT(pConsumer);
pConsumer->consumerId = req.newConsumerId;
tqHandleMovePut(pTq->tqMeta, req.newConsumerId, pConsumer);
tqHandleCommit(pTq->tqMeta, req.newConsumerId);
tqHandlePurge(pTq->tqMeta, req.oldConsumerId);
@ -302,19 +485,20 @@ int32_t tqProcessSetConnReq(STQ* pTq, char* msg) {
tDecodeSMqSetCVgReq(msg, &req);
/*printf("vg %d set to consumer from %ld to %ld\n", req.vgId, req.oldConsumerId, req.newConsumerId);*/
STqConsumerHandle* pConsumer = calloc(1, sizeof(STqConsumerHandle));
STqConsumer* pConsumer = calloc(1, sizeof(STqConsumer));
if (pConsumer == NULL) {
terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
return -1;
}
strcpy(pConsumer->cgroup, req.cgroup);
pConsumer->topics = taosArrayInit(0, sizeof(STqTopicHandle));
pConsumer->topics = taosArrayInit(0, sizeof(STqTopic));
pConsumer->consumerId = req.consumerId;
pConsumer->epoch = 0;
STqTopicHandle* pTopic = calloc(1, sizeof(STqTopicHandle));
STqTopic* pTopic = calloc(1, sizeof(STqTopic));
if (pTopic == NULL) {
taosArrayDestroy(pConsumer->topics);
free(pConsumer);
return -1;
}
@ -330,6 +514,7 @@ int32_t tqProcessSetConnReq(STQ* pTq, char* msg) {
pTopic->buffer.lastOffset = -1;
pTopic->pReadhandle = walOpenReadHandle(pTq->pWal);
if (pTopic->pReadhandle == NULL) {
ASSERT(false);
}
for (int i = 0; i < TQ_BUFFER_SIZE; i++) {
pTopic->buffer.output[i].status = 0;
@ -344,143 +529,3 @@ int32_t tqProcessSetConnReq(STQ* pTq, char* msg) {
terrno = TSDB_CODE_SUCCESS;
return 0;
}
STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta) {
STqReadHandle* pReadHandle = malloc(sizeof(STqReadHandle));
if (pReadHandle == NULL) {
return NULL;
}
pReadHandle->pVnodeMeta = pMeta;
pReadHandle->pMsg = NULL;
pReadHandle->ver = -1;
pReadHandle->pColIdList = NULL;
pReadHandle->sver = -1;
pReadHandle->pSchema = NULL;
pReadHandle->pSchemaWrapper = NULL;
return pReadHandle;
}
void tqReadHandleSetMsg(STqReadHandle* pReadHandle, SSubmitMsg* pMsg, int64_t ver) {
pReadHandle->pMsg = pMsg;
pMsg->length = htonl(pMsg->length);
pMsg->numOfBlocks = htonl(pMsg->numOfBlocks);
tInitSubmitMsgIter(pMsg, &pReadHandle->msgIter);
pReadHandle->ver = ver;
memset(&pReadHandle->blkIter, 0, sizeof(SSubmitBlkIter));
}
bool tqNextDataBlock(STqReadHandle* pHandle) {
while (1) {
if (tGetSubmitMsgNext(&pHandle->msgIter, &pHandle->pBlock) < 0) {
return false;
}
if (pHandle->pBlock == NULL) return false;
pHandle->pBlock->uid = htobe64(pHandle->pBlock->uid);
/*if (pHandle->tbUid == pHandle->pBlock->uid) {*/
ASSERT(pHandle->tbIdHash);
void* ret = taosHashGet(pHandle->tbIdHash, &pHandle->pBlock->uid, sizeof(int64_t));
if (ret != NULL) {
pHandle->pBlock->tid = htonl(pHandle->pBlock->tid);
pHandle->pBlock->sversion = htonl(pHandle->pBlock->sversion);
pHandle->pBlock->dataLen = htonl(pHandle->pBlock->dataLen);
pHandle->pBlock->schemaLen = htonl(pHandle->pBlock->schemaLen);
pHandle->pBlock->numOfRows = htons(pHandle->pBlock->numOfRows);
return true;
}
}
return false;
}
int tqRetrieveDataBlockInfo(STqReadHandle* pHandle, SDataBlockInfo* pBlockInfo) {
/*int32_t sversion = pHandle->pBlock->sversion;*/
/*SSchemaWrapper* pSchema = metaGetTableSchema(pHandle->pMeta, pHandle->pBlock->uid, sversion, false);*/
pBlockInfo->numOfCols = taosArrayGetSize(pHandle->pColIdList);
pBlockInfo->rows = pHandle->pBlock->numOfRows;
pBlockInfo->uid = pHandle->pBlock->uid;
return 0;
}
SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) {
/*int32_t sversion = pHandle->pBlock->sversion;*/
// TODO set to real sversion
int32_t sversion = 0;
if (pHandle->sver != sversion) {
pHandle->pSchema = metaGetTbTSchema(pHandle->pVnodeMeta, pHandle->pBlock->uid, sversion);
tb_uid_t quid;
STbCfg* pTbCfg = metaGetTbInfoByUid(pHandle->pVnodeMeta, pHandle->pBlock->uid);
if (pTbCfg->type == META_CHILD_TABLE) {
quid = pTbCfg->ctbCfg.suid;
} else {
quid = pHandle->pBlock->uid;
}
pHandle->pSchemaWrapper = metaGetTableSchema(pHandle->pVnodeMeta, quid, sversion, true);
pHandle->sver = sversion;
}
STSchema* pTschema = pHandle->pSchema;
SSchemaWrapper* pSchemaWrapper = pHandle->pSchemaWrapper;
int32_t numOfRows = pHandle->pBlock->numOfRows;
int32_t numOfCols = pHandle->pSchema->numOfCols;
int32_t colNumNeed = taosArrayGetSize(pHandle->pColIdList);
// TODO: stable case
if (colNumNeed > pSchemaWrapper->nCols) {
colNumNeed = pSchemaWrapper->nCols;
}
SArray* pArray = taosArrayInit(colNumNeed, sizeof(SColumnInfoData));
if (pArray == NULL) {
return NULL;
}
int j = 0;
for (int32_t i = 0; i < colNumNeed; i++) {
int32_t colId = *(int32_t*)taosArrayGet(pHandle->pColIdList, i);
while (j < pSchemaWrapper->nCols && pSchemaWrapper->pSchema[j].colId < colId) {
j++;
}
SSchema* pColSchema = &pSchemaWrapper->pSchema[j];
SColumnInfoData colInfo = {0};
int sz = numOfRows * pColSchema->bytes;
colInfo.info.bytes = pColSchema->bytes;
colInfo.info.colId = colId;
colInfo.info.type = pColSchema->type;
colInfo.pData = calloc(1, sz);
if (colInfo.pData == NULL) {
// TODO free
taosArrayDestroy(pArray);
return NULL;
}
taosArrayPush(pArray, &colInfo);
}
STSRowIter iter = {0};
tdSTSRowIterInit(&iter, pTschema);
STSRow* row;
// int32_t kvIdx = 0;
int32_t curRow = 0;
tInitSubmitBlkIter(pHandle->pBlock, &pHandle->blkIter);
while ((row = tGetSubmitBlkNext(&pHandle->blkIter)) != NULL) {
tdSTSRowIterReset(&iter, row);
// get all wanted col of that block
for (int32_t i = 0; i < colNumNeed; i++) {
SColumnInfoData* pColData = taosArrayGet(pArray, i);
STColumn* pCol = schemaColAt(pTschema, i);
// TODO
ASSERT(pCol->colId == pColData->info.colId);
// void* val = tdGetMemRowDataOfColEx(row, pCol->colId, pCol->type, TD_DATA_ROW_HEAD_SIZE + pCol->offset, &kvIdx);
SCellVal sVal = {0};
if (!tdSTSRowIterNext(&iter, pCol->colId, pCol->type, &sVal)) {
// TODO: reach end
break;
}
memcpy(POINTER_SHIFT(pColData->pData, curRow * pCol->bytes), sVal.val, pCol->bytes);
}
curRow++;
}
return pArray;
}

View File

@ -68,20 +68,21 @@ static inline int tqReadLastPage(int fd, STqIdxPageBuf* pBuf) {
return lseek(fd, offset, SEEK_SET);
}
STqMetaStore* tqStoreOpen(const char* path, FTqSerialize serializer, FTqDeserialize deserializer, FTqDelete deleter,
int32_t tqConfigFlag) {
STqMetaStore* pMeta = malloc(sizeof(STqMetaStore));
STqMetaStore* tqStoreOpen(STQ* pTq, const char* path, FTqSerialize serializer, FTqDeserialize deserializer,
FTqDelete deleter, int32_t tqConfigFlag) {
STqMetaStore* pMeta = calloc(1, sizeof(STqMetaStore));
if (pMeta == NULL) {
terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
return NULL;
}
memset(pMeta, 0, sizeof(STqMetaStore));
pMeta->pTq = pTq;
// concat data file name and index file name
size_t pathLen = strlen(path);
pMeta->dirPath = malloc(pathLen + 1);
if (pMeta->dirPath == NULL) {
terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
free(pMeta);
return NULL;
}
strcpy(pMeta->dirPath, path);
@ -103,12 +104,11 @@ STqMetaStore* tqStoreOpen(const char* path, FTqSerialize serializer, FTqDeserial
}
pMeta->idxFd = idxFd;
pMeta->unpersistHead = malloc(sizeof(STqMetaList));
pMeta->unpersistHead = calloc(1, sizeof(STqMetaList));
if (pMeta->unpersistHead == NULL) {
terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
return NULL;
}
memset(pMeta->unpersistHead, 0, sizeof(STqMetaList));
pMeta->unpersistHead->unpersistNext = pMeta->unpersistHead->unpersistPrev = pMeta->unpersistHead;
strcpy(name, path);
@ -145,12 +145,11 @@ STqMetaStore* tqStoreOpen(const char* path, FTqSerialize serializer, FTqDeserial
ASSERT(idxBuf.head.writeOffset == idxRead);
// loop read every entry
for (int i = 0; i < idxBuf.head.writeOffset - TQ_IDX_PAGE_HEAD_SIZE; i += TQ_IDX_SIZE) {
STqMetaList* pNode = malloc(sizeof(STqMetaList));
STqMetaList* pNode = calloc(1, sizeof(STqMetaList));
if (pNode == NULL) {
terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
// TODO: free memory
}
memset(pNode, 0, sizeof(STqMetaList));
memcpy(&pNode->handle, &idxBuf.buffer[i], TQ_IDX_SIZE);
lseek(fileFd, pNode->handle.offset, SEEK_SET);
@ -169,25 +168,25 @@ STqMetaStore* tqStoreOpen(const char* path, FTqSerialize serializer, FTqDeserial
}
if (serializedObj->action == TQ_ACTION_INUSE) {
if (serializedObj->ssize != sizeof(STqSerializedHead)) {
pMeta->pDeserializer(serializedObj, &pNode->handle.valueInUse);
pMeta->pDeserializer(pTq, serializedObj, &pNode->handle.valueInUse);
} else {
pNode->handle.valueInUse = TQ_DELETE_TOKEN;
}
} else if (serializedObj->action == TQ_ACTION_INTXN) {
if (serializedObj->ssize != sizeof(STqSerializedHead)) {
pMeta->pDeserializer(serializedObj, &pNode->handle.valueInTxn);
pMeta->pDeserializer(pTq, serializedObj, &pNode->handle.valueInTxn);
} else {
pNode->handle.valueInTxn = TQ_DELETE_TOKEN;
}
} else if (serializedObj->action == TQ_ACTION_INUSE_CONT) {
if (serializedObj->ssize != sizeof(STqSerializedHead)) {
pMeta->pDeserializer(serializedObj, &pNode->handle.valueInUse);
pMeta->pDeserializer(pTq, serializedObj, &pNode->handle.valueInUse);
} else {
pNode->handle.valueInUse = TQ_DELETE_TOKEN;
}
STqSerializedHead* ptr = POINTER_SHIFT(serializedObj, serializedObj->ssize);
if (ptr->ssize != sizeof(STqSerializedHead)) {
pMeta->pDeserializer(ptr, &pNode->handle.valueInTxn);
pMeta->pDeserializer(pTq, ptr, &pNode->handle.valueInTxn);
} else {
pNode->handle.valueInTxn = TQ_DELETE_TOKEN;
}
@ -302,7 +301,7 @@ int32_t tqStorePersist(STqMetaStore* pMeta) {
pSHead->ver = TQ_SVER;
pSHead->checksum = 0;
pSHead->ssize = sizeof(STqSerializedHead);
int allocatedSize = sizeof(STqSerializedHead);
/*int allocatedSize = sizeof(STqSerializedHead);*/
int offset = lseek(pMeta->fileFd, 0, SEEK_CUR);
tqReadLastPage(pMeta->idxFd, &idxBuf);
@ -417,14 +416,14 @@ static int32_t tqHandlePutCommitted(STqMetaStore* pMeta, int64_t key, void* valu
pNode = pNode->next;
}
}
STqMetaList* pNewNode = malloc(sizeof(STqMetaList));
STqMetaList* pNewNode = calloc(1, sizeof(STqMetaList));
if (pNewNode == NULL) {
terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
return -1;
}
memset(pNewNode, 0, sizeof(STqMetaList));
pNewNode->handle.key = key;
pNewNode->handle.valueInUse = value;
pNewNode->next = pMeta->bucket[bucketKey];
// put into unpersist list
pNewNode->unpersistPrev = pMeta->unpersistHead;
pNewNode->unpersistNext = pMeta->unpersistHead->unpersistNext;
@ -489,12 +488,11 @@ static inline int32_t tqHandlePutImpl(STqMetaStore* pMeta, int64_t key, void* va
pNode = pNode->next;
}
}
STqMetaList* pNewNode = malloc(sizeof(STqMetaList));
STqMetaList* pNewNode = calloc(1, sizeof(STqMetaList));
if (pNewNode == NULL) {
terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
return -1;
}
memset(pNewNode, 0, sizeof(STqMetaList));
pNewNode->handle.key = key;
pNewNode->handle.valueInTxn = value;
pNewNode->next = pMeta->bucket[bucketKey];

View File

@ -0,0 +1,42 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "tqOffset.h"
enum ETqOffsetPersist {
TQ_OFFSET_PERSIST__LAZY = 1,
TQ_OFFSET_PERSIST__EAGER,
};
struct STqOffsetCfg {
int8_t persistPolicy;
};
struct STqOffsetStore {
STqOffsetCfg cfg;
SHashObj* pHash; // SHashObj<subscribeKey, offset>
};
STqOffsetStore* STqOffsetOpen(STqOffsetCfg* pCfg) {
STqOffsetStore* pStore = malloc(sizeof(STqOffsetStore));
if (pStore == NULL) {
return NULL;
}
memcpy(&pStore->cfg, pCfg, sizeof(STqOffsetCfg));
pStore->pHash = taosHashInit(64, MurmurHash3_32, true, HASH_NO_LOCK);
return pStore;
}

View File

@ -0,0 +1,160 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "vnode.h"
STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta) {
STqReadHandle* pReadHandle = malloc(sizeof(STqReadHandle));
if (pReadHandle == NULL) {
return NULL;
}
pReadHandle->pVnodeMeta = pMeta;
pReadHandle->pMsg = NULL;
pReadHandle->ver = -1;
pReadHandle->pColIdList = NULL;
pReadHandle->sver = -1;
pReadHandle->pSchema = NULL;
pReadHandle->pSchemaWrapper = NULL;
return pReadHandle;
}
void tqReadHandleSetMsg(STqReadHandle* pReadHandle, SSubmitMsg* pMsg, int64_t ver) {
pReadHandle->pMsg = pMsg;
pMsg->length = htonl(pMsg->length);
pMsg->numOfBlocks = htonl(pMsg->numOfBlocks);
tInitSubmitMsgIter(pMsg, &pReadHandle->msgIter);
pReadHandle->ver = ver;
memset(&pReadHandle->blkIter, 0, sizeof(SSubmitBlkIter));
}
bool tqNextDataBlock(STqReadHandle* pHandle) {
while (1) {
if (tGetSubmitMsgNext(&pHandle->msgIter, &pHandle->pBlock) < 0) {
return false;
}
if (pHandle->pBlock == NULL) return false;
pHandle->pBlock->uid = htobe64(pHandle->pBlock->uid);
/*if (pHandle->tbUid == pHandle->pBlock->uid) {*/
ASSERT(pHandle->tbIdHash);
void* ret = taosHashGet(pHandle->tbIdHash, &pHandle->pBlock->uid, sizeof(int64_t));
if (ret != NULL) {
/*printf("retrieve one tb %ld\n", pHandle->pBlock->uid);*/
pHandle->pBlock->tid = htonl(pHandle->pBlock->tid);
pHandle->pBlock->sversion = htonl(pHandle->pBlock->sversion);
pHandle->pBlock->dataLen = htonl(pHandle->pBlock->dataLen);
pHandle->pBlock->schemaLen = htonl(pHandle->pBlock->schemaLen);
pHandle->pBlock->numOfRows = htons(pHandle->pBlock->numOfRows);
return true;
} else {
/*printf("skip one tb %ld\n", pHandle->pBlock->uid);*/
}
}
return false;
}
int tqRetrieveDataBlockInfo(STqReadHandle* pHandle, SDataBlockInfo* pBlockInfo) {
/*int32_t sversion = pHandle->pBlock->sversion;*/
/*SSchemaWrapper* pSchema = metaGetTableSchema(pHandle->pMeta, pHandle->pBlock->uid, sversion, false);*/
pBlockInfo->numOfCols = taosArrayGetSize(pHandle->pColIdList);
pBlockInfo->rows = pHandle->pBlock->numOfRows;
pBlockInfo->uid = pHandle->pBlock->uid;
return 0;
}
SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) {
/*int32_t sversion = pHandle->pBlock->sversion;*/
// TODO set to real sversion
int32_t sversion = 0;
if (pHandle->sver != sversion) {
pHandle->pSchema = metaGetTbTSchema(pHandle->pVnodeMeta, pHandle->pBlock->uid, sversion);
tb_uid_t quid;
STbCfg* pTbCfg = metaGetTbInfoByUid(pHandle->pVnodeMeta, pHandle->pBlock->uid);
if (pTbCfg->type == META_CHILD_TABLE) {
quid = pTbCfg->ctbCfg.suid;
} else {
quid = pHandle->pBlock->uid;
}
pHandle->pSchemaWrapper = metaGetTableSchema(pHandle->pVnodeMeta, quid, sversion, true);
pHandle->sver = sversion;
}
STSchema* pTschema = pHandle->pSchema;
SSchemaWrapper* pSchemaWrapper = pHandle->pSchemaWrapper;
int32_t numOfRows = pHandle->pBlock->numOfRows;
int32_t numOfCols = pHandle->pSchema->numOfCols;
int32_t colNumNeed = taosArrayGetSize(pHandle->pColIdList);
// TODO: stable case
if (colNumNeed > pSchemaWrapper->nCols) {
colNumNeed = pSchemaWrapper->nCols;
}
SArray* pArray = taosArrayInit(colNumNeed, sizeof(SColumnInfoData));
if (pArray == NULL) {
return NULL;
}
int j = 0;
for (int32_t i = 0; i < colNumNeed; i++) {
int32_t colId = *(int32_t*)taosArrayGet(pHandle->pColIdList, i);
while (j < pSchemaWrapper->nCols && pSchemaWrapper->pSchema[j].colId < colId) {
j++;
}
SSchema* pColSchema = &pSchemaWrapper->pSchema[j];
SColumnInfoData colInfo = {0};
int sz = numOfRows * pColSchema->bytes;
colInfo.info.bytes = pColSchema->bytes;
colInfo.info.colId = colId;
colInfo.info.type = pColSchema->type;
colInfo.pData = calloc(1, sz);
if (colInfo.pData == NULL) {
// TODO free
taosArrayDestroy(pArray);
return NULL;
}
taosArrayPush(pArray, &colInfo);
}
STSRowIter iter = {0};
tdSTSRowIterInit(&iter, pTschema);
STSRow* row;
// int32_t kvIdx = 0;
int32_t curRow = 0;
tInitSubmitBlkIter(pHandle->pBlock, &pHandle->blkIter);
while ((row = tGetSubmitBlkNext(&pHandle->blkIter)) != NULL) {
tdSTSRowIterReset(&iter, row);
// get all wanted col of that block
for (int32_t i = 0; i < colNumNeed; i++) {
SColumnInfoData* pColData = taosArrayGet(pArray, i);
STColumn* pCol = schemaColAt(pTschema, i);
// TODO
ASSERT(pCol->colId == pColData->info.colId);
// void* val = tdGetMemRowDataOfColEx(row, pCol->colId, pCol->type, TD_DATA_ROW_HEAD_SIZE + pCol->offset, &kvIdx);
SCellVal sVal = {0};
if (!tdSTSRowIterNext(&iter, pCol->colId, pCol->type, &sVal)) {
// TODO: reach end
break;
}
memcpy(POINTER_SHIFT(pColData->pData, curRow * pCol->bytes), sVal.val, pCol->bytes);
}
curRow++;
}
return pArray;
}

View File

@ -18,7 +18,7 @@
#define TSDB_MAX_SUBBLOCKS 8
typedef struct {
STable * pTable;
STable *pTable;
SSkipListIterator *pIter;
} SCommitIter;
@ -34,11 +34,11 @@ typedef struct {
bool isLFileSame;
TSKEY minKey;
TSKEY maxKey;
SArray * aBlkIdx; // SBlockIdx array
STable * pTable;
SArray * aSupBlk; // Table super-block array
SArray * aSubBlk; // table sub-block array
SDataCols * pDataCols;
SArray *aBlkIdx; // SBlockIdx array
STable *pTable;
SArray *aSupBlk; // Table super-block array
SArray *aSubBlk; // table sub-block array
SDataCols *pDataCols;
} SCommitH;
#define TSDB_DEFAULT_BLOCK_ROWS(maxRows) ((maxRows)*4 / 5)
@ -90,7 +90,7 @@ int tsdbWriteBlockIdx(SDFile *pHeadf, SArray *pIdxA, void **ppBuf);
int tsdbApplyRtnOnFSet(STsdb *pRepo, SDFileSet *pSet, SRtn *pRtn) {
SDiskID did;
SDFileSet nSet = {0};
STsdbFS * pfs = REPO_FS(pRepo);
STsdbFS *pfs = REPO_FS(pRepo);
int level;
ASSERT(pSet->fid >= pRtn->minFid);
@ -135,12 +135,13 @@ int tsdbPrepareCommit(STsdb *pTsdb) {
pTsdb->imem = pTsdb->mem;
pTsdb->mem = NULL;
return 0;
}
int tsdbCommit(STsdb *pRepo) {
STsdbMemTable *pMem = pRepo->imem;
SCommitH commith = {0};
SDFileSet * pSet = NULL;
SDFileSet *pSet = NULL;
int fid;
if (pRepo->imem == NULL) return 0;
@ -303,7 +304,7 @@ static void tsdbSeekCommitIter(SCommitH *pCommith, TSKEY key) {
}
static int tsdbNextCommitFid(SCommitH *pCommith) {
STsdb * pRepo = TSDB_COMMIT_REPO(pCommith);
STsdb *pRepo = TSDB_COMMIT_REPO(pCommith);
STsdbCfg *pCfg = REPO_CFG(pRepo);
int fid = TSDB_IVLD_FID;
@ -336,7 +337,7 @@ static void tsdbDestroyCommitH(SCommitH *pCommith) {
}
static int tsdbCommitToFile(SCommitH *pCommith, SDFileSet *pSet, int fid) {
STsdb * pRepo = TSDB_COMMIT_REPO(pCommith);
STsdb *pRepo = TSDB_COMMIT_REPO(pCommith);
STsdbCfg *pCfg = REPO_CFG(pRepo);
ASSERT(pSet == NULL || pSet->fid == fid);
@ -391,12 +392,12 @@ static int tsdbCommitToFile(SCommitH *pCommith, SDFileSet *pSet, int fid) {
}
static int tsdbCreateCommitIters(SCommitH *pCommith) {
STsdb * pRepo = TSDB_COMMIT_REPO(pCommith);
STsdbMemTable * pMem = pRepo->imem;
STsdb *pRepo = TSDB_COMMIT_REPO(pCommith);
STsdbMemTable *pMem = pRepo->imem;
SSkipListIterator *pSlIter;
SCommitIter * pCommitIter;
SSkipListNode * pNode;
STbData * pTbData;
SCommitIter *pCommitIter;
SSkipListNode *pNode;
STbData *pTbData;
pCommith->niters = SL_SIZE(pMem->pSlIdx);
pCommith->iters = (SCommitIter *)calloc(pCommith->niters, sizeof(SCommitIter));
@ -452,7 +453,7 @@ static void tsdbResetCommitFile(SCommitH *pCommith) {
static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid) {
SDiskID did;
STsdb * pRepo = TSDB_COMMIT_REPO(pCommith);
STsdb *pRepo = TSDB_COMMIT_REPO(pCommith);
SDFileSet *pWSet = TSDB_COMMIT_WRITE_FSET(pCommith);
if (tfsAllocDisk(pRepo->pTfs, tsdbGetFidLevel(fid, &(pCommith->rtn)), &did) < 0) {
@ -583,7 +584,7 @@ int tsdbWriteBlockInfoImpl(SDFile *pHeadf, STable *pTable, SArray *pSupA, SArray
uint32_t tlen;
SBlockInfo *pBlkInfo;
int64_t offset;
SBlock * pBlock;
SBlock *pBlock;
memset(pIdx, 0, sizeof(*pIdx));
@ -1130,7 +1131,7 @@ static int tsdbComparKeyBlock(const void *arg1, const void *arg2) {
int tsdbWriteBlockImpl(STsdb *pRepo, STable *pTable, SDFile *pDFile, SDataCols *pDataCols, SBlock *pBlock, bool isLast,
bool isSuper, void **ppBuf, void **ppCBuf) {
STsdbCfg * pCfg = REPO_CFG(pRepo);
STsdbCfg *pCfg = REPO_CFG(pRepo);
SBlockData *pBlockData;
int64_t offset = 0;
int rowsToWrite = pDataCols->numOfRows;
@ -1147,7 +1148,7 @@ int tsdbWriteBlockImpl(STsdb *pRepo, STable *pTable, SDFile *pDFile, SDataCols *
// Get # of cols not all NULL(not including key column)
int nColsNotAllNull = 0;
for (int ncol = 1; ncol < pDataCols->numOfCols; ncol++) { // ncol from 1, we skip the timestamp column
SDataCol * pDataCol = pDataCols->cols + ncol;
SDataCol *pDataCol = pDataCols->cols + ncol;
SBlockCol *pBlockCol = pBlockData->cols + nColsNotAllNull;
if (isAllRowsNull(pDataCol)) { // all data to commit are NULL, just ignore it
@ -1188,7 +1189,7 @@ int tsdbWriteBlockImpl(STsdb *pRepo, STable *pTable, SDFile *pDFile, SDataCols *
// All not NULL columns finish
if (ncol != 0 && tcol >= nColsNotAllNull) break;
SDataCol * pDataCol = pDataCols->cols + ncol;
SDataCol *pDataCol = pDataCols->cols + ncol;
SBlockCol *pBlockCol = pBlockData->cols + tcol;
if (ncol != 0 && (pDataCol->colId != pBlockCol->colId)) continue;
@ -1212,7 +1213,7 @@ int tsdbWriteBlockImpl(STsdb *pRepo, STable *pTable, SDFile *pDFile, SDataCols *
}
#endif
void * tptr;
void *tptr;
// Make room
if (tsdbMakeRoom(ppBuf, lsize + tlen + COMP_OVERFLOW_BYTES + sizeof(TSCKSUM)) < 0) {
@ -1278,7 +1279,7 @@ int tsdbWriteBlockImpl(STsdb *pRepo, STable *pTable, SDFile *pDFile, SDataCols *
pBlock->keyFirst = dataColsKeyFirst(pDataCols);
pBlock->keyLast = dataColsKeyLast(pDataCols);
tsdbDebug("vgId:%d uid:%"PRId64" a block of data is written to file %s, offset %" PRId64
tsdbDebug("vgId:%d uid:%" PRId64 " a block of data is written to file %s, offset %" PRId64
" numOfRows %d len %d numOfCols %" PRId16 " keyFirst %" PRId64 " keyLast %" PRId64,
REPO_ID(pRepo), TABLE_TID(pTable), TSDB_FILE_FULL_NAME(pDFile), offset, rowsToWrite, pBlock->len,
pBlock->numOfCols, pBlock->keyFirst, pBlock->keyLast);
@ -1294,9 +1295,9 @@ static int tsdbWriteBlock(SCommitH *pCommith, SDFile *pDFile, SDataCols *pDataCo
}
static int tsdbWriteBlockInfo(SCommitH *pCommih) {
SDFile * pHeadf = TSDB_COMMIT_HEAD_FILE(pCommih);
SDFile *pHeadf = TSDB_COMMIT_HEAD_FILE(pCommih);
SBlockIdx blkIdx;
STable * pTable = TSDB_COMMIT_TABLE(pCommih);
STable *pTable = TSDB_COMMIT_TABLE(pCommih);
if (tsdbWriteBlockInfoImpl(pHeadf, pTable, pCommih->aSupBlk, pCommih->aSubBlk, (void **)(&(TSDB_COMMIT_BUF(pCommih))),
&blkIdx) < 0) {
@ -1316,11 +1317,11 @@ static int tsdbWriteBlockInfo(SCommitH *pCommih) {
}
static int tsdbCommitMemData(SCommitH *pCommith, SCommitIter *pIter, TSKEY keyLimit, bool toData) {
STsdb * pRepo = TSDB_COMMIT_REPO(pCommith);
STsdbCfg * pCfg = REPO_CFG(pRepo);
STsdb *pRepo = TSDB_COMMIT_REPO(pCommith);
STsdbCfg *pCfg = REPO_CFG(pRepo);
SMergeInfo mInfo;
int32_t defaultRows = TSDB_COMMIT_DEFAULT_ROWS(pCommith);
SDFile * pDFile;
SDFile *pDFile;
bool isLast;
SBlock block;
@ -1349,16 +1350,16 @@ static int tsdbCommitMemData(SCommitH *pCommith, SCommitIter *pIter, TSKEY keyLi
}
static int tsdbMergeMemData(SCommitH *pCommith, SCommitIter *pIter, int bidx) {
STsdb * pRepo = TSDB_COMMIT_REPO(pCommith);
STsdbCfg * pCfg = REPO_CFG(pRepo);
STsdb *pRepo = TSDB_COMMIT_REPO(pCommith);
STsdbCfg *pCfg = REPO_CFG(pRepo);
int nBlocks = pCommith->readh.pBlkIdx->numOfBlocks;
SBlock * pBlock = pCommith->readh.pBlkInfo->blocks + bidx;
SBlock *pBlock = pCommith->readh.pBlkInfo->blocks + bidx;
TSKEY keyLimit;
int16_t colId = PRIMARYKEY_TIMESTAMP_COL_ID;
SMergeInfo mInfo;
SBlock subBlocks[TSDB_MAX_SUBBLOCKS];
SBlock block, supBlock;
SDFile * pDFile;
SDFile *pDFile;
if (bidx == nBlocks - 1) {
keyLimit = pCommith->maxKey;
@ -1474,10 +1475,10 @@ static int tsdbCommitAddBlock(SCommitH *pCommith, const SBlock *pSupBlock, const
static int tsdbMergeBlockData(SCommitH *pCommith, SCommitIter *pIter, SDataCols *pDataCols, TSKEY keyLimit,
bool isLastOneBlock) {
STsdb * pRepo = TSDB_COMMIT_REPO(pCommith);
STsdb *pRepo = TSDB_COMMIT_REPO(pCommith);
STsdbCfg *pCfg = REPO_CFG(pRepo);
SBlock block;
SDFile * pDFile;
SDFile *pDFile;
bool isLast;
int32_t defaultRows = TSDB_COMMIT_DEFAULT_ROWS(pCommith);
@ -1598,7 +1599,7 @@ static void tsdbCloseCommitFile(SCommitH *pCommith, bool hasError) {
}
static bool tsdbCanAddSubBlock(SCommitH *pCommith, SBlock *pBlock, SMergeInfo *pInfo) {
STsdb * pRepo = TSDB_COMMIT_REPO(pCommith);
STsdb *pRepo = TSDB_COMMIT_REPO(pCommith);
STsdbCfg *pCfg = REPO_CFG(pRepo);
int mergeRows = pBlock->numOfRows + pInfo->rowsInserted - pInfo->rowsDeleteSucceed;

View File

@ -16,19 +16,6 @@
#include "tq.h"
#include "vnd.h"
#if 0
int vnodeProcessNoWalWMsgs(SVnode *pVnode, SRpcMsg *pMsg) {
switch (pMsg->msgType) {
case TDMT_VND_MQ_SET_CUR:
if (tqSetCursor(pVnode->pTq, pMsg->pCont) < 0) {
// TODO: handle error
}
break;
}
return 0;
}
#endif
int vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs) {
SRpcMsg *pMsg;
@ -36,11 +23,12 @@ int vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs) {
pMsg = *(SRpcMsg **)taosArrayGet(pMsgs, i);
// ser request version
void * pBuf = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
void *pBuf = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int64_t ver = pVnode->state.processed++;
taosEncodeFixedU64(&pBuf, ver);
taosEncodeFixedI64(&pBuf, ver);
if (walWrite(pVnode->pWal, ver, pMsg->msgType, pMsg->pCont, pMsg->contLen) < 0) {
/*ASSERT(false);*/
// TODO: handle error
}
}
@ -55,7 +43,7 @@ int vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs) {
int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
SVCreateTbReq vCreateTbReq;
SVCreateTbBatchReq vCreateTbBatchReq;
void * ptr = vnodeMalloc(pVnode, pMsg->contLen);
void *ptr = vnodeMalloc(pVnode, pMsg->contLen);
if (ptr == NULL) {
// TODO: handle error
}
@ -64,8 +52,8 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
memcpy(ptr, pMsg->pCont, pMsg->contLen);
// todo: change the interface here
uint64_t ver;
taosDecodeFixedU64(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &ver);
int64_t ver;
taosDecodeFixedI64(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &ver);
if (tqPushMsg(pVnode->pTq, ptr, ver) < 0) {
// TODO: handle error
}
@ -132,7 +120,6 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
} break;
case TDMT_VND_MQ_REB: {
if (tqProcessRebReq(pVnode->pTq, POINTER_SHIFT(ptr, sizeof(SMsgHead))) < 0) {
}
} break;
default:

View File

@ -13,3 +13,4 @@ add_subdirectory(function)
add_subdirectory(qcom)
add_subdirectory(qworker)
add_subdirectory(tfs)
add_subdirectory(nodes)

View File

@ -2,7 +2,7 @@ aux_source_directory(src NODES_SRC)
add_library(nodes STATIC ${NODES_SRC})
target_include_directories(
nodes
PUBLIC "${CMAKE_SOURCE_DIR}/include/nodes"
PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/nodes"
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
)
target_link_libraries(