fix uninitialized
This commit is contained in:
parent
7f84cc1f79
commit
573aad5cc1
|
@ -2,6 +2,8 @@ build/
|
|||
compile_commands.json
|
||||
.cache
|
||||
.ycm_extra_conf.py
|
||||
.tasks
|
||||
.vimspector.json
|
||||
.vscode/
|
||||
.idea/
|
||||
cmake-build-debug/
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
#include <stdio.h>
|
||||
#include <string.h>
|
||||
#include <assert.h>
|
||||
#include <time.h>
|
||||
#include "taos.h"
|
||||
|
||||
static int running = 1;
|
||||
|
@ -65,7 +66,7 @@ int32_t init_env() {
|
|||
taos_free_result(pRes);
|
||||
|
||||
|
||||
char* sql = "select * from st1";
|
||||
const char* sql = "select * from st1";
|
||||
pRes = tmq_create_topic(pConn, "test_stb_topic_1", sql, strlen(sql));
|
||||
/*if (taos_errno(pRes) != 0) {*/
|
||||
/*printf("failed to create topic test_stb_topic_1, reason:%s\n", taos_errstr(pRes));*/
|
||||
|
@ -112,14 +113,20 @@ void basic_consume_loop(tmq_t *tmq,
|
|||
printf("subscribe err\n");
|
||||
return;
|
||||
}
|
||||
|
||||
int32_t cnt = 0;
|
||||
clock_t startTime = clock();
|
||||
while (running) {
|
||||
tmq_message_t *tmqmessage = tmq_consumer_poll(tmq, 500);
|
||||
if (tmq) {
|
||||
msg_process(tmqmessage);
|
||||
tmq_message_t *tmqmessage = tmq_consumer_poll(tmq, 0);
|
||||
if (tmqmessage) {
|
||||
cnt++;
|
||||
/*msg_process(tmqmessage);*/
|
||||
tmq_message_destroy(tmqmessage);
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
clock_t endTime = clock();
|
||||
printf("log cnt: %d %f s\n", cnt, (double)(endTime - startTime) / CLOCKS_PER_SEC);
|
||||
|
||||
err = tmq_consumer_close(tmq);
|
||||
if (err)
|
||||
|
@ -163,6 +170,6 @@ int main() {
|
|||
code = init_env();
|
||||
tmq_t* tmq = build_consumer();
|
||||
tmq_list_t* topic_list = build_topic_list();
|
||||
/*basic_consume_loop(tmq, topic_list);*/
|
||||
sync_consume_loop(tmq, topic_list);
|
||||
basic_consume_loop(tmq, topic_list);
|
||||
/*sync_consume_loop(tmq, topic_list);*/
|
||||
}
|
||||
|
|
|
@ -213,7 +213,7 @@ typedef struct tmq_message_t tmq_message_t;
|
|||
typedef void(tmq_commit_cb(tmq_t *, tmq_resp_err_t, tmq_topic_vgroup_list_t *, void *param));
|
||||
|
||||
DLL_EXPORT tmq_list_t *tmq_list_new();
|
||||
DLL_EXPORT int32_t tmq_list_append(tmq_list_t *, char *);
|
||||
DLL_EXPORT int32_t tmq_list_append(tmq_list_t *, const char *);
|
||||
|
||||
DLL_EXPORT TAOS_RES *tmq_create_topic(TAOS *taos, const char *name, const char *sql, int sqlLen);
|
||||
DLL_EXPORT tmq_t *tmq_consumer_new(void *conn, tmq_conf_t *conf, char *errstr, int32_t errstrLen);
|
||||
|
|
|
@ -38,22 +38,22 @@ extern "C" {
|
|||
|
||||
#define TSWAP(a, b, c) \
|
||||
do { \
|
||||
typeof(a) __tmp = (a); \
|
||||
__typeof(a) __tmp = (a); \
|
||||
(a) = (b); \
|
||||
(b) = __tmp; \
|
||||
} while (0)
|
||||
|
||||
#define TMAX(a, b) \
|
||||
({ \
|
||||
typeof(a) __a = (a); \
|
||||
typeof(b) __b = (b); \
|
||||
__typeof(a) __a = (a); \
|
||||
__typeof(b) __b = (b); \
|
||||
(__a > __b) ? __a : __b; \
|
||||
})
|
||||
|
||||
#define TMIN(a, b) \
|
||||
({ \
|
||||
typeof(a) __a = (a); \
|
||||
typeof(b) __b = (b); \
|
||||
__typeof(a) __a = (a); \
|
||||
__typeof(b) __b = (b); \
|
||||
(__a < __b) ? __a : __b; \
|
||||
})
|
||||
#endif
|
||||
|
|
|
@ -13,6 +13,8 @@
|
|||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#define _DEFAULT_SOURCE
|
||||
|
||||
#include "clientInt.h"
|
||||
#include "clientLog.h"
|
||||
#include "parser.h"
|
||||
|
@ -146,7 +148,7 @@ tmq_list_t* tmq_list_new() {
|
|||
return ptr;
|
||||
}
|
||||
|
||||
int32_t tmq_list_append(tmq_list_t* ptr, char* src) {
|
||||
int32_t tmq_list_append(tmq_list_t* ptr, const char* src) {
|
||||
if (ptr->cnt >= ptr->tot - 1) return -1;
|
||||
ptr->elems[ptr->cnt] = strdup(src);
|
||||
ptr->cnt++;
|
||||
|
@ -366,7 +368,7 @@ TAOS_RES* tmq_create_topic(TAOS* taos, const char* topicName, const char* sql, i
|
|||
.igExists = 1,
|
||||
.physicalPlan = (char*)pStr,
|
||||
.sql = (char*)sql,
|
||||
.logicalPlan = "no logic plan",
|
||||
.logicalPlan = (char*)"no logic plan",
|
||||
};
|
||||
|
||||
int tlen = tSerializeSCMCreateTopicReq(NULL, &req);
|
||||
|
@ -512,7 +514,7 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
|
|||
return -1;
|
||||
}
|
||||
tDecodeSMqConsumeRsp(pMsg->pData, pRsp);
|
||||
/*printf("rsp %ld %ld\n", pRsp->committedOffset, pRsp->rspOffset);*/
|
||||
/*printf("rsp %ld %ld %d\n", pRsp->committedOffset, pRsp->rspOffset, pRsp->numOfTopics);*/
|
||||
if (pRsp->numOfTopics == 0) {
|
||||
/*printf("no data\n");*/
|
||||
free(pRsp);
|
||||
|
|
|
@ -630,14 +630,6 @@ void operateVal(void *dst, void *s1, void *s2, int32_t optr, int32_t type) {
|
|||
}
|
||||
}
|
||||
|
||||
#define TSWAP(a, b, c) \
|
||||
do { \
|
||||
typeof(a) __tmp = (a); \
|
||||
(a) = (b); \
|
||||
(b) = __tmp; \
|
||||
} while (0)
|
||||
|
||||
|
||||
void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size, void* buf) {
|
||||
switch (type) {
|
||||
case TSDB_DATA_TYPE_INT:
|
||||
|
|
|
@ -14,6 +14,7 @@
|
|||
*/
|
||||
|
||||
#define _DEFAULT_SOURCE
|
||||
|
||||
#include "mndConsumer.h"
|
||||
#include "mndDb.h"
|
||||
#include "mndDnode.h"
|
||||
|
@ -54,13 +55,14 @@ void mndCleanupConsumer(SMnode *pMnode) {}
|
|||
|
||||
SSdbRaw *mndConsumerActionEncode(SMqConsumerObj *pConsumer) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
void* buf = NULL;
|
||||
int32_t tlen = tEncodeSMqConsumerObj(NULL, pConsumer);
|
||||
int32_t size = sizeof(int32_t) + tlen + MND_CONSUMER_RESERVE_SIZE;
|
||||
|
||||
SSdbRaw *pRaw = sdbAllocRaw(SDB_CONSUMER, MND_CONSUMER_VER_NUMBER, size);
|
||||
if (pRaw == NULL) goto CM_ENCODE_OVER;
|
||||
|
||||
void *buf = malloc(tlen);
|
||||
buf = malloc(tlen);
|
||||
if (buf == NULL) goto CM_ENCODE_OVER;
|
||||
|
||||
void *abuf = buf;
|
||||
|
@ -88,6 +90,7 @@ CM_ENCODE_OVER:
|
|||
|
||||
SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
void* buf = NULL;
|
||||
|
||||
int8_t sver = 0;
|
||||
if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto CM_DECODE_OVER;
|
||||
|
@ -106,7 +109,7 @@ SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw) {
|
|||
int32_t dataPos = 0;
|
||||
int32_t len;
|
||||
SDB_GET_INT32(pRaw, dataPos, &len, CM_DECODE_OVER);
|
||||
void *buf = malloc(len);
|
||||
buf = malloc(len);
|
||||
if (buf == NULL) goto CM_DECODE_OVER;
|
||||
SDB_GET_BINARY(pRaw, dataPos, buf, len, CM_DECODE_OVER);
|
||||
SDB_GET_RESERVE(pRaw, dataPos, MND_CONSUMER_RESERVE_SIZE, CM_DECODE_OVER);
|
||||
|
|
|
@ -12,6 +12,7 @@
|
|||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
#define _DEFAULT_SOURCE
|
||||
|
||||
#include "mndSubscribe.h"
|
||||
#include "mndConsumer.h"
|
||||
|
@ -372,13 +373,14 @@ void mndCleanupSubscribe(SMnode *pMnode) {}
|
|||
|
||||
static SSdbRaw *mndSubActionEncode(SMqSubscribeObj *pSub) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
void* buf = NULL;
|
||||
int32_t tlen = tEncodeSubscribeObj(NULL, pSub);
|
||||
int32_t size = sizeof(int32_t) + tlen + MND_SUBSCRIBE_RESERVE_SIZE;
|
||||
|
||||
SSdbRaw *pRaw = sdbAllocRaw(SDB_SUBSCRIBE, MND_SUBSCRIBE_VER_NUMBER, size);
|
||||
if (pRaw == NULL) goto SUB_ENCODE_OVER;
|
||||
|
||||
void *buf = malloc(tlen);
|
||||
buf = malloc(tlen);
|
||||
if (buf == NULL) goto SUB_ENCODE_OVER;
|
||||
|
||||
void *abuf = buf;
|
||||
|
@ -406,6 +408,7 @@ SUB_ENCODE_OVER:
|
|||
|
||||
static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
void* buf = NULL;
|
||||
|
||||
int8_t sver = 0;
|
||||
if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto SUB_DECODE_OVER;
|
||||
|
@ -425,7 +428,7 @@ static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw) {
|
|||
int32_t dataPos = 0;
|
||||
int32_t tlen;
|
||||
SDB_GET_INT32(pRaw, dataPos, &tlen, SUB_DECODE_OVER);
|
||||
void *buf = malloc(tlen + 1);
|
||||
buf = malloc(tlen + 1);
|
||||
if (buf == NULL) goto SUB_DECODE_OVER;
|
||||
SDB_GET_BINARY(pRaw, dataPos, buf, tlen, SUB_DECODE_OVER);
|
||||
SDB_GET_RESERVE(pRaw, dataPos, MND_SUBSCRIBE_RESERVE_SIZE, SUB_DECODE_OVER);
|
||||
|
|
|
@ -16,9 +16,9 @@
|
|||
#ifndef _TD_TQ_INT_H_
|
||||
#define _TD_TQ_INT_H_
|
||||
|
||||
#include "tq.h"
|
||||
#include "meta.h"
|
||||
#include "tlog.h"
|
||||
#include "tq.h"
|
||||
#include "trpc.h"
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
|
@ -26,29 +26,48 @@ extern "C" {
|
|||
|
||||
extern int32_t tqDebugFlag;
|
||||
|
||||
#define tqFatal(...) { if (tqDebugFlag & DEBUG_FATAL) { taosPrintLog("TQ FATAL ", 255, __VA_ARGS__); }}
|
||||
#define tqError(...) { if (tqDebugFlag & DEBUG_ERROR) { taosPrintLog("TQ ERROR ", 255, __VA_ARGS__); }}
|
||||
#define tqWarn(...) { if (tqDebugFlag & DEBUG_WARN) { taosPrintLog("TQ WARN ", 255, __VA_ARGS__); }}
|
||||
#define tqInfo(...) { if (tqDebugFlag & DEBUG_INFO) { taosPrintLog("TQ ", 255, __VA_ARGS__); }}
|
||||
#define tqDebug(...) { if (tqDebugFlag & DEBUG_DEBUG) { taosPrintLog("TQ ", tqDebugFlag, __VA_ARGS__); }}
|
||||
#define tqTrace(...) { if (tqDebugFlag & DEBUG_TRACE) { taosPrintLog("TQ ", tqDebugFlag, __VA_ARGS__); }}
|
||||
#define tqFatal(...) \
|
||||
{ \
|
||||
if (tqDebugFlag & DEBUG_FATAL) { \
|
||||
taosPrintLog("TQ FATAL ", 255, __VA_ARGS__); \
|
||||
} \
|
||||
}
|
||||
#define tqError(...) \
|
||||
{ \
|
||||
if (tqDebugFlag & DEBUG_ERROR) { \
|
||||
taosPrintLog("TQ ERROR ", 255, __VA_ARGS__); \
|
||||
} \
|
||||
}
|
||||
#define tqWarn(...) \
|
||||
{ \
|
||||
if (tqDebugFlag & DEBUG_WARN) { \
|
||||
taosPrintLog("TQ WARN ", 255, __VA_ARGS__); \
|
||||
} \
|
||||
}
|
||||
#define tqInfo(...) \
|
||||
{ \
|
||||
if (tqDebugFlag & DEBUG_INFO) { \
|
||||
taosPrintLog("TQ ", 255, __VA_ARGS__); \
|
||||
} \
|
||||
}
|
||||
#define tqDebug(...) \
|
||||
{ \
|
||||
if (tqDebugFlag & DEBUG_DEBUG) { \
|
||||
taosPrintLog("TQ ", tqDebugFlag, __VA_ARGS__); \
|
||||
} \
|
||||
}
|
||||
#define tqTrace(...) \
|
||||
{ \
|
||||
if (tqDebugFlag & DEBUG_TRACE) { \
|
||||
taosPrintLog("TQ ", tqDebugFlag, __VA_ARGS__); \
|
||||
} \
|
||||
}
|
||||
|
||||
// create persistent storage for meta info such as consuming offset
|
||||
// return value > 0: cgId
|
||||
// return value <= 0: error code
|
||||
// int tqCreateTCGroup(STQ*, const char* topic, int cgId, tqBufferHandle** handle);
|
||||
// create ring buffer in memory and load consuming offset
|
||||
// int tqOpenTCGroup(STQ*, const char* topic, int cgId);
|
||||
// destroy ring buffer and persist consuming offset
|
||||
// int tqCloseTCGroup(STQ*, const char* topic, int cgId);
|
||||
// delete persistent storage for meta info
|
||||
// int tqDropTCGroup(STQ*, const char* topic, int cgId);
|
||||
|
||||
//int tqSerializeGroup(const STqGroup*, STqSerializedHead**);
|
||||
//const void* tqDeserializeGroup(const STqSerializedHead* pHead, STqGroup** ppGroup);
|
||||
int tqSerializeConsumer(const STqConsumerHandle*, STqSerializedHead**);
|
||||
const void* tqDeserializeConsumer(const STqSerializedHead* pHead, STqConsumerHandle**);
|
||||
|
||||
static int FORCE_INLINE tqQueryExecuting(int32_t status) { return status; }
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -12,19 +12,12 @@
|
|||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
#define _DEFAULT_SOURCE
|
||||
|
||||
#include "tcompare.h"
|
||||
#include "tqInt.h"
|
||||
#include "tqMetaStore.h"
|
||||
|
||||
// static
|
||||
// read next version data
|
||||
//
|
||||
// send to fetch queue
|
||||
//
|
||||
// handle management message
|
||||
//
|
||||
|
||||
int tqInit() {
|
||||
int8_t old = atomic_val_compare_exchange_8(&tqMgmt.inited, 0, 1);
|
||||
if (old == 1) return 0;
|
||||
|
|
|
@ -170,7 +170,7 @@ int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver) {
|
|||
}
|
||||
|
||||
if (pRead->pHead->head.version != ver) {
|
||||
/*wError("unexpected wal log version: %ld, read request version:%ld", pRead->pHead->head.version, ver);*/
|
||||
wError("unexpected wal log version: %ld, read request version:%ld", pRead->pHead->head.version, ver);
|
||||
pRead->curVersion = -1;
|
||||
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
||||
return -1;
|
||||
|
@ -178,7 +178,7 @@ int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver) {
|
|||
|
||||
code = walValidBodyCksum(pRead->pHead);
|
||||
if (code != 0) {
|
||||
/*wError("unexpected wal log version: checksum not passed");*/
|
||||
wError("unexpected wal log version: checksum not passed");
|
||||
pRead->curVersion = -1;
|
||||
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
||||
return -1;
|
||||
|
|
|
@ -13,6 +13,8 @@
|
|||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#define _DEFAULT_SOURCE
|
||||
|
||||
#include "os.h"
|
||||
#include "osString.h"
|
||||
|
||||
|
|
|
@ -13,6 +13,8 @@
|
|||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#define _DEFAULT_SOURCE
|
||||
|
||||
#include "tbuffer.h"
|
||||
#include "exception.h"
|
||||
#include "os.h"
|
||||
|
|
|
@ -19,7 +19,6 @@
|
|||
#include "tnote.h"
|
||||
#include "tutil.h"
|
||||
#include "ulog.h"
|
||||
//#include "zlib.h"
|
||||
|
||||
#define MAX_LOGLINE_SIZE (1000)
|
||||
#define MAX_LOGLINE_BUFFER_SIZE (MAX_LOGLINE_SIZE + 10)
|
||||
|
|
|
@ -1,3 +1,20 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#define _DEFAULT_SOURCE
|
||||
|
||||
#include "tpagedfile.h"
|
||||
#include "thash.h"
|
||||
#include "stddef.h"
|
||||
|
|
Loading…
Reference in New Issue