fix:[TD-31097]remove useless file & process return value
This commit is contained in:
parent
c1d899aa1a
commit
48a898df7d
|
@ -1,66 +0,0 @@
|
||||||
// A simple demo for asynchronous subscription.
|
|
||||||
// compile with:
|
|
||||||
// gcc -o subscribe_demo subscribe_demo.c -ltaos
|
|
||||||
|
|
||||||
#include <stdio.h>
|
|
||||||
#include <stdlib.h>
|
|
||||||
#include <string.h>
|
|
||||||
#include <taos.h>
|
|
||||||
|
|
||||||
int nTotalRows;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @brief callback function of subscription.
|
|
||||||
*
|
|
||||||
* @param tsub
|
|
||||||
* @param res
|
|
||||||
* @param param. the additional parameter passed to taos_subscribe
|
|
||||||
* @param code. error code
|
|
||||||
*/
|
|
||||||
void subscribe_callback(TAOS_SUB* tsub, TAOS_RES* res, void* param, int code) {
|
|
||||||
if (code != 0) {
|
|
||||||
printf("error: %d\n", code);
|
|
||||||
exit(EXIT_FAILURE);
|
|
||||||
}
|
|
||||||
|
|
||||||
TAOS_ROW row = NULL;
|
|
||||||
int num_fields = taos_num_fields(res);
|
|
||||||
TAOS_FIELD* fields = taos_fetch_fields(res);
|
|
||||||
int nRows = 0;
|
|
||||||
|
|
||||||
while ((row = taos_fetch_row(res))) {
|
|
||||||
char buf[4096] = {0};
|
|
||||||
taos_print_row(buf, row, fields, num_fields);
|
|
||||||
puts(buf);
|
|
||||||
nRows++;
|
|
||||||
}
|
|
||||||
|
|
||||||
nTotalRows += nRows;
|
|
||||||
printf("%d rows consumed.\n", nRows);
|
|
||||||
}
|
|
||||||
|
|
||||||
int main() {
|
|
||||||
TAOS* taos = taos_connect("localhost", "root", "taosdata", NULL, 6030);
|
|
||||||
if (taos == NULL) {
|
|
||||||
printf("failed to connect to server\n");
|
|
||||||
exit(EXIT_FAILURE);
|
|
||||||
}
|
|
||||||
|
|
||||||
int restart = 1; // if the topic already exists, where to subscribe from the begin.
|
|
||||||
const char* topic = "topic-meter-current-bg-10";
|
|
||||||
const char* sql = "select * from power.meters where current > 10";
|
|
||||||
void* param = NULL; // additional parameter.
|
|
||||||
int interval = 2000; // consumption interval in microseconds.
|
|
||||||
TAOS_SUB* tsub = taos_subscribe(taos, restart, topic, sql, subscribe_callback, NULL, interval);
|
|
||||||
|
|
||||||
// wait for insert from others process. you can open TDengine CLI to insert some records for test.
|
|
||||||
|
|
||||||
getchar(); // press Enter to stop
|
|
||||||
|
|
||||||
printf("total rows consumed: %d\n", nTotalRows);
|
|
||||||
int keep = 0; // whether to keep subscribe process
|
|
||||||
taos_unsubscribe(tsub, keep);
|
|
||||||
|
|
||||||
taos_close(taos);
|
|
||||||
taos_cleanup();
|
|
||||||
}
|
|
|
@ -3639,9 +3639,9 @@ static FORCE_INLINE void tqOffsetResetToLog(STqOffsetVal* pOffsetVal, int64_t ve
|
||||||
|
|
||||||
int32_t tEncodeSTqOffsetVal(SEncoder* pEncoder, const STqOffsetVal* pOffsetVal);
|
int32_t tEncodeSTqOffsetVal(SEncoder* pEncoder, const STqOffsetVal* pOffsetVal);
|
||||||
int32_t tDecodeSTqOffsetVal(SDecoder* pDecoder, STqOffsetVal* pOffsetVal);
|
int32_t tDecodeSTqOffsetVal(SDecoder* pDecoder, STqOffsetVal* pOffsetVal);
|
||||||
int32_t tFormatOffset(char* buf, int32_t maxLen, const STqOffsetVal* pVal);
|
void tFormatOffset(char* buf, int32_t maxLen, const STqOffsetVal* pVal);
|
||||||
bool tOffsetEqual(const STqOffsetVal* pLeft, const STqOffsetVal* pRight);
|
bool tOffsetEqual(const STqOffsetVal* pLeft, const STqOffsetVal* pRight);
|
||||||
int32_t tOffsetCopy(STqOffsetVal* pLeft, const STqOffsetVal* pRight);
|
void tOffsetCopy(STqOffsetVal* pLeft, const STqOffsetVal* pRight);
|
||||||
void tOffsetDestroy(void* pVal);
|
void tOffsetDestroy(void* pVal);
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
|
|
@ -9180,7 +9180,7 @@ int32_t tDecodeSTqOffsetVal(SDecoder *pDecoder, STqOffsetVal *pOffsetVal) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tFormatOffset(char *buf, int32_t maxLen, const STqOffsetVal *pVal) {
|
void tFormatOffset(char *buf, int32_t maxLen, const STqOffsetVal *pVal) {
|
||||||
if (pVal->type == TMQ_OFFSET__RESET_NONE) {
|
if (pVal->type == TMQ_OFFSET__RESET_NONE) {
|
||||||
(void)snprintf(buf, maxLen, "none");
|
(void)snprintf(buf, maxLen, "none");
|
||||||
} else if (pVal->type == TMQ_OFFSET__RESET_EARLIEST) {
|
} else if (pVal->type == TMQ_OFFSET__RESET_EARLIEST) {
|
||||||
|
@ -9192,7 +9192,7 @@ int32_t tFormatOffset(char *buf, int32_t maxLen, const STqOffsetVal *pVal) {
|
||||||
} else if (pVal->type == TMQ_OFFSET__SNAPSHOT_DATA || pVal->type == TMQ_OFFSET__SNAPSHOT_META) {
|
} else if (pVal->type == TMQ_OFFSET__SNAPSHOT_DATA || pVal->type == TMQ_OFFSET__SNAPSHOT_META) {
|
||||||
if (IS_VAR_DATA_TYPE(pVal->primaryKey.type)) {
|
if (IS_VAR_DATA_TYPE(pVal->primaryKey.type)) {
|
||||||
char *tmp = taosMemoryCalloc(1, pVal->primaryKey.nData + 1);
|
char *tmp = taosMemoryCalloc(1, pVal->primaryKey.nData + 1);
|
||||||
if (tmp == NULL) return terrno;
|
if (tmp == NULL) return;
|
||||||
(void)memcpy(tmp, pVal->primaryKey.pData, pVal->primaryKey.nData);
|
(void)memcpy(tmp, pVal->primaryKey.pData, pVal->primaryKey.nData);
|
||||||
(void)snprintf(buf, maxLen, "tsdb:%" PRId64 "|%" PRId64 ",pk type:%d,val:%s", pVal->uid, pVal->ts,
|
(void)snprintf(buf, maxLen, "tsdb:%" PRId64 "|%" PRId64 ",pk type:%d,val:%s", pVal->uid, pVal->ts,
|
||||||
pVal->primaryKey.type, tmp);
|
pVal->primaryKey.type, tmp);
|
||||||
|
@ -9202,8 +9202,6 @@ int32_t tFormatOffset(char *buf, int32_t maxLen, const STqOffsetVal *pVal) {
|
||||||
pVal->primaryKey.type, pVal->primaryKey.val);
|
pVal->primaryKey.type, pVal->primaryKey.val);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
bool tOffsetEqual(const STqOffsetVal *pLeft, const STqOffsetVal *pRight) {
|
bool tOffsetEqual(const STqOffsetVal *pLeft, const STqOffsetVal *pRight) {
|
||||||
|
@ -9226,16 +9224,17 @@ bool tOffsetEqual(const STqOffsetVal *pLeft, const STqOffsetVal *pRight) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tOffsetCopy(STqOffsetVal *pLeft, const STqOffsetVal *pRight) {
|
void tOffsetCopy(STqOffsetVal *pLeft, const STqOffsetVal *pRight) {
|
||||||
tOffsetDestroy(pLeft);
|
tOffsetDestroy(pLeft);
|
||||||
*pLeft = *pRight;
|
*pLeft = *pRight;
|
||||||
if (IS_VAR_DATA_TYPE(pRight->primaryKey.type)) {
|
if (IS_VAR_DATA_TYPE(pRight->primaryKey.type)) {
|
||||||
if ((pLeft->primaryKey.pData = taosMemoryMalloc(pRight->primaryKey.nData)) == NULL) {
|
pLeft->primaryKey.pData = taosMemoryMalloc(pRight->primaryKey.nData);
|
||||||
return terrno;
|
if (pLeft->primaryKey.pData == NULL) {
|
||||||
|
uError("failed to allocate memory for offset");
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
(void)memcpy(pLeft->primaryKey.pData, pRight->primaryKey.pData, pRight->primaryKey.nData);
|
(void)memcpy(pLeft->primaryKey.pData, pRight->primaryKey.pData, pRight->primaryKey.nData);
|
||||||
}
|
}
|
||||||
return 0;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void tOffsetDestroy(void *param) {
|
void tOffsetDestroy(void *param) {
|
||||||
|
|
Loading…
Reference in New Issue