diff --git a/docs/zh/14-reference/03-taos-sql/03-table.md b/docs/zh/14-reference/03-taos-sql/03-table.md index 2f0ae7100a..cad9190bd9 100644 --- a/docs/zh/14-reference/03-taos-sql/03-table.md +++ b/docs/zh/14-reference/03-taos-sql/03-table.md @@ -79,6 +79,18 @@ CREATE TABLE [IF NOT EXISTS] tb_name1 USING stb_name TAGS (tag_value1, ...) [IF 批量建表方式要求数据表必须以超级表为模板。 在不超出 SQL 语句长度限制的前提下,单条语句中的建表数量建议控制在 1000 ~ 3000 之间,将会获得比较理想的建表速度。 +### 使用 CSV 批量创建子表 + +```sql +CREATE TABLE [IF NOT EXISTS] USING [db_name.]stb_name (field1_name [, field2_name] ....) FILE csv_file_path; +``` + +**参数说明** + +1. FILE 语法表示数据来自于 CSV 文件(英文逗号分隔、英文单引号括住每个值),CSV 文件无需表头。CSV 文件中应仅包含 table name 与 tag 值。如需插入数据,请参考数据写入章节。 +2. 为指定的 stb_name 创建子表,该超级表必须已经存在。 +3. field_name 列表顺序与 CSV 文件各列内容顺序一致。列表中不允许出现重复项,且必须包含 `tbname`,可包含零个或多个超级表中已定义的标签列。未包含在列表中的标签值将被设置为 NULL。 + ## 修改普通表 ```sql diff --git a/include/common/tcommon.h b/include/common/tcommon.h index 753f84d774..3910ea6745 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -89,32 +89,6 @@ typedef struct { int32_t exprIdx; } STupleKey; -typedef struct STuplePos { - union { - struct { - int32_t pageId; - int32_t offset; - }; - SWinKey streamTupleKey; - }; -} STuplePos; - -typedef struct SFirstLastRes { - bool hasResult; - // used for last_row function only, isNullRes in SResultRowEntry can not be passed to downstream.So, - // this attribute is required - bool isNull; - int32_t bytes; - int64_t ts; - char* pkData; - int32_t pkBytes; - int8_t pkType; - STuplePos pos; - STuplePos nullTuplePos; - bool nullTupleSaved; - char buf[]; -} SFirstLastRes; - static inline int STupleKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, int kLen2) { STupleKey* pTuple1 = (STupleKey*)pKey1; STupleKey* pTuple2 = (STupleKey*)pKey2; diff --git a/include/common/tvariant.h b/include/common/tvariant.h index 0d84b2414e..5ab42ae8b7 100644 --- a/include/common/tvariant.h +++ b/include/common/tvariant.h @@ -23,6 +23,7 @@ extern "C" { #endif // variant, each number/string/field_id has a corresponding struct during parsing sql +// **NOTE**: if you want to change this struct, please consider the backward compatibility of function top and bottom. typedef struct SVariant { uint32_t nType; int32_t nLen; // only used for string, for number, it is useless diff --git a/include/libs/function/function.h b/include/libs/function/function.h index ec01cf1f6f..7ca046762a 100644 --- a/include/libs/function/function.h +++ b/include/libs/function/function.h @@ -23,6 +23,7 @@ extern "C" { #include "tcommon.h" #include "tsimplehash.h" #include "tvariant.h" +#include "functionResInfo.h" struct SqlFunctionCtx; struct SResultRowEntryInfo; @@ -85,14 +86,7 @@ enum { PRE_SCAN = 0x2u, // pre-scan belongs to the main scan and occurs before main scan }; -typedef struct SPoint1 { - int64_t key; - union { - double val; - char *ptr; - }; -} SPoint1; - +struct SPoint1; struct SqlFunctionCtx; struct SResultRowEntryInfo; diff --git a/include/libs/function/functionResInfo.h b/include/libs/function/functionResInfo.h new file mode 100644 index 0000000000..d79caf3f8c --- /dev/null +++ b/include/libs/function/functionResInfo.h @@ -0,0 +1,90 @@ +/* +* Copyright (c) 2019 TAOS Data, Inc. +* +* This program is free software: you can use, redistribute, and/or modify +* it under the terms of the GNU Affero General Public License, version 3 +* or later ("AGPL"), as published by the Free Software Foundation. +* +* This program is distributed in the hope that it will be useful, but WITHOUT +* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +* FITNESS FOR A PARTICULAR PURPOSE. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see . + */ + +#ifndef TDENGINE_FUNCTIONRESINFO_H +#define TDENGINE_FUNCTIONRESINFO_H + +#ifdef __cplusplus +extern "C" { +#endif + +#include "os.h" +#include "tcommon.h" + +typedef struct STuplePos { + union { + struct { + int32_t pageId; + int32_t offset; + }; + SWinKey streamTupleKey; + }; +} STuplePos; + +typedef struct SCentroid { + double mean; + int64_t weight; +} SCentroid; + +typedef struct SPt { + double value; + int64_t weight; +} SPt; + +typedef struct TDigest { + double compression; + int32_t threshold; + int64_t size; + + int64_t total_weight; + double min; + double max; + + int32_t num_buffered_pts; + SPt *buffered_pts; + + int32_t num_centroids; + SCentroid *centroids; +} TDigest; + +typedef struct SFirstLastRes { + bool hasResult; + // used for last_row function only, isNullRes in SResultRowEntry can not be passed to downstream.So, + // this attribute is required + bool isNull; + int32_t bytes; + int64_t ts; + char* pkData; + int32_t pkBytes; + int8_t pkType; + STuplePos pos; + STuplePos nullTuplePos; + bool nullTupleSaved; + char buf[]; +} SFirstLastRes; + +typedef struct SPoint1 { + int64_t key; + union { + double val; + char *ptr; + }; +} SPoint1; + +#ifdef __cplusplus +} +#endif + +#endif // TDENGINE_FUNCTIONRESINFO_H diff --git a/include/util/tdigest.h b/include/util/tdigest.h index 03519b4c7b..d807a7f0cf 100644 --- a/include/util/tdigest.h +++ b/include/util/tdigest.h @@ -23,6 +23,7 @@ #define TDIGEST_H #include "os.h" +#include "libs/function/functionResInfo.h" #ifndef M_PI #define M_PI 3.14159265358979323846264338327950288 /* pi */ @@ -37,32 +38,6 @@ #define TDIGEST_SIZE(compression) \ (sizeof(TDigest) + sizeof(SCentroid) * GET_CENTROID(compression) + sizeof(SPt) * GET_THRESHOLD(compression)) -typedef struct SCentroid { - double mean; - int64_t weight; -} SCentroid; - -typedef struct SPt { - double value; - int64_t weight; -} SPt; - -typedef struct TDigest { - double compression; - int32_t threshold; - int64_t size; - - int64_t total_weight; - double min; - double max; - - int32_t num_buffered_pts; - SPt *buffered_pts; - - int32_t num_centroids; - SCentroid *centroids; -} TDigest; - TDigest *tdigestNewFrom(void *pBuf, int32_t compression); int32_t tdigestAdd(TDigest *t, double x, int64_t w); int32_t tdigestMerge(TDigest *t1, TDigest *t2); diff --git a/source/client/src/clientStmt2.c b/source/client/src/clientStmt2.c index 0837154fce..b78e0d0f56 100644 --- a/source/client/src/clientStmt2.c +++ b/source/client/src/clientStmt2.c @@ -1200,22 +1200,6 @@ static int stmtAddBatch2(TAOS_STMT2* stmt) { STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_ADD_BATCH)); - if (pStmt->sql.stbInterlaceMode) { - int64_t startUs2 = taosGetTimestampUs(); - pStmt->stat.addBatchUs += startUs2 - startUs; - - pStmt->sql.siInfo.tableColsReady = false; - - SStmtQNode* param = NULL; - STMT_ERR_RET(stmtAllocQNodeFromBuf(&pStmt->sql.siInfo.tbBuf, (void**)¶m)); - param->restoreTbCols = true; - param->next = NULL; - - stmtEnqueue(pStmt, param); - - return TSDB_CODE_SUCCESS; - } - STMT_ERR_RET(stmtCacheBlock(pStmt)); return TSDB_CODE_SUCCESS; @@ -1627,6 +1611,22 @@ int stmtExec2(TAOS_STMT2* stmt, int* affected_rows) { STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_EXECUTE)); + if (pStmt->sql.stbInterlaceMode) { + int64_t startUs2 = taosGetTimestampUs(); + pStmt->stat.addBatchUs += startUs2 - startUs; + + pStmt->sql.siInfo.tableColsReady = false; + + SStmtQNode* param = NULL; + STMT_ERR_RET(stmtAllocQNodeFromBuf(&pStmt->sql.siInfo.tbBuf, (void**)¶m)); + param->restoreTbCols = true; + param->next = NULL; + + stmtEnqueue(pStmt, param); + + return TSDB_CODE_SUCCESS; + } + if (STMT_TYPE_QUERY != pStmt->sql.type) { if (pStmt->sql.stbInterlaceMode) { int64_t startTs = taosGetTimestampUs(); diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 14594e02cc..fd6ca831d1 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -41,7 +41,7 @@ #define SET_ERROR_MSG_TMQ(MSG) \ - if (errstr != NULL) (void)snprintf(errstr, errstrLen, MSG); + if (errstr != NULL && errstrLen > 0) (void)snprintf(errstr, errstrLen, MSG); #define PROCESS_POLL_RSP(FUNC,DATA) \ SDecoder decoder = {0}; \ diff --git a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c index 9e37e785f8..d508d75922 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c @@ -14,6 +14,7 @@ */ #include "functionMgt.h" +#include "functionResInfo.h" #include "taoserror.h" #include "tarray.h" #include "tcommon.h" diff --git a/source/libs/function/inc/builtinsimpl.h b/source/libs/function/inc/builtinsimpl.h index 77905792b8..36e53d0a80 100644 --- a/source/libs/function/inc/builtinsimpl.h +++ b/source/libs/function/inc/builtinsimpl.h @@ -22,29 +22,7 @@ extern "C" { #include "function.h" #include "functionMgt.h" - -typedef struct SSumRes { - union { - int64_t isum; - uint64_t usum; - double dsum; - }; - int16_t type; - int64_t prevTs; - bool isPrevTsSet; - bool overflow; // if overflow is true, dsum to be used for any type; -} SSumRes; - -typedef struct SMinmaxResInfo { - bool assign; // assign the first value or not - int64_t v; - char *str; - STuplePos tuplePos; - - STuplePos nullTuplePos; - bool nullTupleSaved; - int16_t type; -} SMinmaxResInfo; +#include "functionResInfoInt.h" int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc, int32_t* nElems); diff --git a/source/libs/function/inc/functionResInfoInt.h b/source/libs/function/inc/functionResInfoInt.h new file mode 100644 index 0000000000..9ee1e884b3 --- /dev/null +++ b/source/libs/function/inc/functionResInfoInt.h @@ -0,0 +1,366 @@ +/* +* Copyright (c) 2019 TAOS Data, Inc. +* +* This program is free software: you can use, redistribute, and/or modify +* it under the terms of the GNU Affero General Public License, version 3 +* or later ("AGPL"), as published by the Free Software Foundation. +* +* This program is distributed in the hope that it will be useful, but WITHOUT +* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +* FITNESS FOR A PARTICULAR PURPOSE. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see . +*/ + +#ifndef TDENGINE_FUNCTIONRESINFOINT_H +#define TDENGINE_FUNCTIONRESINFOINT_H + +#ifdef __cplusplus +extern "C" { +#endif + +#include "os.h" +#include "thistogram.h" +#include "tdigest.h" +#include "functionResInfo.h" +#include "tpercentile.h" + +#define USE_ARRAYLIST + +#define HLL_BUCKET_BITS 14 // The bits of the bucket +#define HLL_DATA_BITS (64 - HLL_BUCKET_BITS) +#define HLL_BUCKETS (1 << HLL_BUCKET_BITS) +#define HLL_BUCKET_MASK (HLL_BUCKETS - 1) +#define HLL_ALPHA_INF 0.721347520444481703680 // constant for 0.5/ln(2) + +typedef struct SSumRes { + union { + int64_t isum; + uint64_t usum; + double dsum; + }; + int16_t type; + int64_t prevTs; + bool isPrevTsSet; + bool overflow; // if overflow is true, dsum to be used for any type; +} SSumRes; + +typedef struct SMinmaxResInfo { + bool assign; // assign the first value or not + int64_t v; + char *str; + STuplePos tuplePos; + + STuplePos nullTuplePos; + bool nullTupleSaved; + int16_t type; +} SMinmaxResInfo; + +typedef struct SStdRes { + double result; + int64_t count; + union { + double quadraticDSum; + int64_t quadraticISum; + uint64_t quadraticUSum; + }; + union { + double dsum; + int64_t isum; + uint64_t usum; + }; + int16_t type; +} SStdRes; + +typedef struct SHistBin { + double val; + int64_t num; + +#if !defined(USE_ARRAYLIST) + double delta; + int32_t index; // index in min-heap list +#endif +} SHistBin; + +typedef struct SHistogramInfo { + int64_t numOfElems; + int32_t numOfEntries; + int32_t maxEntries; + double min; + double max; +#if defined(USE_ARRAYLIST) + SHistBin* elems; +#else + tSkipList* pList; + SMultiwayMergeTreeInfo* pLoserTree; + int32_t maxIndex; + bool ordered; +#endif +} SHistogramInfo; + +typedef struct SAPercentileInfo { + double result; + double percent; + int8_t algo; + SHistogramInfo* pHisto; + TDigest* pTDigest; +} SAPercentileInfo; + +typedef struct SSpreadInfo { + double result; + bool hasResult; + double min; + double max; +} SSpreadInfo; + +typedef struct SHLLFuncInfo { + uint64_t result; + uint64_t totalCount; + uint8_t buckets[HLL_BUCKETS]; +} SHLLInfo; + +typedef struct SGroupKeyInfo { + bool hasResult; + bool isNull; + char data[]; +} SGroupKeyInfo; + +typedef struct SAvgRes { + double result; + SSumRes sum; + int64_t count; + int16_t type; // store the original input type, used in merge function +} SAvgRes; + + +// structs above are used in stream + +#define HISTOGRAM_MAX_BINS_NUM 1000 +#define MAVG_MAX_POINTS_NUM 1000 +#define TAIL_MAX_POINTS_NUM 100 +#define TAIL_MAX_OFFSET 100 + +typedef struct STopBotResItem { + SVariant v; + uint64_t uid; // it is a table uid, used to extract tag data during building of the final result for the tag data + STuplePos tuplePos; // tuple data of this chosen row +} STopBotResItem; + +typedef struct STopBotRes { + int32_t maxSize; + int16_t type; + + STuplePos nullTuplePos; + bool nullTupleSaved; + + STopBotResItem* pItems; +} STopBotRes; + +typedef struct SLeastSQRInfo { + double matrix[2][3]; + double startVal; + double stepVal; + int64_t num; +} SLeastSQRInfo; + +typedef struct MinMaxEntry { + union { + double dMinVal; + // double i64MinVal; + uint64_t u64MinVal; + }; + union { + double dMaxVal; + // double i64MaxVal; + int64_t u64MaxVal; + }; +} MinMaxEntry; + +typedef struct { + int32_t size; + int32_t pageId; + SFilePage *data; +} SSlotInfo; + +typedef struct tMemBucketSlot { + SSlotInfo info; + MinMaxEntry range; +} tMemBucketSlot; + +struct tMemBucket; +typedef int32_t (*__perc_hash_func_t)(struct tMemBucket *pBucket, const void *value, int32_t *index); + +typedef struct tMemBucket { + int16_t numOfSlots; + int16_t type; + int32_t bytes; + int32_t total; + int32_t elemPerPage; // number of elements for each object + int32_t maxCapacity; // maximum allowed number of elements that can be sort directly to get the result + int32_t bufPageSize; // disk page size + MinMaxEntry range; // value range + int32_t times; // count that has been checked for deciding the correct data value buckets. + __compar_fn_t comparFn; + tMemBucketSlot *pSlots; + SDiskbasedBuf *pBuffer; + __perc_hash_func_t hashFunc; + SHashObj *groupPagesMap; // disk page map for different groups; +} tMemBucket; + +typedef struct SPercentileInfo { + double result; + tMemBucket* pMemBucket; + int32_t stage; + double minval; + double maxval; + int64_t numOfElems; +} SPercentileInfo; + +typedef struct SDiffInfo { + bool hasPrev; + bool isFirstRow; + int8_t ignoreOption; // replace the ignore with case when + union { + int64_t i64; + double d64; + } prev; + + int64_t prevTs; +} SDiffInfo; + +typedef struct SElapsedInfo { + double result; + TSKEY min; + TSKEY max; + int64_t timeUnit; +} SElapsedInfo; + +typedef struct STwaInfo { + double dOutput; + int64_t numOfElems; + SPoint1 p; + STimeWindow win; +} STwaInfo; + +typedef struct SHistoFuncBin { + double lower; + double upper; + int64_t count; + double percentage; +} SHistoFuncBin; + +typedef struct SHistoFuncInfo { + int32_t numOfBins; + int32_t totalCount; + bool normalized; + SHistoFuncBin bins[]; +} SHistoFuncInfo; + +typedef struct SStateInfo { + union { + int64_t count; + int64_t durationStart; + }; + int64_t prevTs; + bool isPrevTsSet; +} SStateInfo; + +typedef struct SMavgInfo { + int32_t pos; + double sum; + int64_t prevTs; + bool isPrevTsSet; + int32_t numOfPoints; + bool pointsMeet; + double points[]; +} SMavgInfo; + +typedef struct SSampleInfo { + int32_t samples; + int32_t totalPoints; + int32_t numSampled; + uint8_t colType; + uint16_t colBytes; + + STuplePos nullTuplePos; + bool nullTupleSaved; + + char* data; + STuplePos* tuplePos; +} SSampleInfo; + +typedef struct STailItem { + int64_t timestamp; + bool isNull; + char data[]; +} STailItem; + +typedef struct STailInfo { + int32_t numOfPoints; + int32_t numAdded; + int32_t offset; + uint8_t colType; + uint16_t colBytes; + STailItem** pItems; +} STailInfo; + +typedef struct SUniqueItem { + int64_t timestamp; + bool isNull; + char data[]; +} SUniqueItem; + +typedef struct SUniqueInfo { + int32_t numOfPoints; + uint8_t colType; + uint16_t colBytes; + bool hasNull; // null is not hashable, handle separately + SHashObj* pHash; + char pItems[]; +} SUniqueInfo; + +typedef struct SModeItem { + int64_t count; + STuplePos dataPos; + STuplePos tuplePos; +} SModeItem; + +typedef struct SModeInfo { + uint8_t colType; + uint16_t colBytes; + SHashObj* pHash; + + STuplePos nullTuplePos; + bool nullTupleSaved; + + char* buf; // serialize data buffer +} SModeInfo; + +typedef struct SDerivInfo { + double prevValue; // previous value + TSKEY prevTs; // previous timestamp + bool ignoreNegative; // ignore the negative value + int64_t tsWindow; // time window for derivative + bool valueSet; // the value has been set already +} SDerivInfo; + +typedef struct SRateInfo { + double firstValue; + TSKEY firstKey; + double lastValue; + TSKEY lastKey; + int8_t hasResult; // flag to denote has value + + char* firstPk; + char* lastPk; + int8_t pkType; + int32_t pkBytes; + char pkData[]; +} SRateInfo; + +#ifdef __cplusplus +} +#endif + +#endif // TDENGINE_FUNCTIONRESINFOINT_H diff --git a/source/libs/function/inc/thistogram.h b/source/libs/function/inc/thistogram.h index 08bff7117e..26681d0426 100644 --- a/source/libs/function/inc/thistogram.h +++ b/source/libs/function/inc/thistogram.h @@ -16,6 +16,8 @@ #ifndef TDENGINE_HISTOGRAM_H #define TDENGINE_HISTOGRAM_H +#include "functionResInfoInt.h" + #ifdef __cplusplus extern "C" { #endif @@ -24,51 +26,28 @@ extern "C" { #define MAX_HISTOGRAM_BIN 500 -typedef struct SHistBin { - double val; - int64_t num; - -#if !defined(USE_ARRAYLIST) - double delta; - int32_t index; // index in min-heap list -#endif -} SHistBin; - typedef struct SHeapEntry { void* pData; double val; } SHeapEntry; -typedef struct SHistogramInfo { - int64_t numOfElems; - int32_t numOfEntries; - int32_t maxEntries; - double min; - double max; -#if defined(USE_ARRAYLIST) - SHistBin* elems; -#else - tSkipList* pList; - SMultiwayMergeTreeInfo* pLoserTree; - int32_t maxIndex; - bool ordered; -#endif -} SHistogramInfo; +struct SHistogramInfo; +struct SHistBin; -int32_t tHistogramCreate(int32_t numOfEntries, SHistogramInfo** pHisto); -SHistogramInfo* tHistogramCreateFrom(void* pBuf, int32_t numOfBins); +int32_t tHistogramCreate(int32_t numOfEntries, struct SHistogramInfo** pHisto); +struct SHistogramInfo* tHistogramCreateFrom(void* pBuf, int32_t numOfBins); -int32_t tHistogramAdd(SHistogramInfo** pHisto, double val); -int32_t tHistogramSum(SHistogramInfo* pHisto, double v, int64_t *res); +int32_t tHistogramAdd(struct SHistogramInfo** pHisto, double val); +int32_t tHistogramSum(struct SHistogramInfo* pHisto, double v, int64_t *res); -int32_t tHistogramUniform(SHistogramInfo* pHisto, double* ratio, int32_t num, double** pVal); -int32_t tHistogramMerge(SHistogramInfo* pHisto1, SHistogramInfo* pHisto2, int32_t numOfEntries, - SHistogramInfo** pResHistogram); -void tHistogramDestroy(SHistogramInfo** pHisto); +int32_t tHistogramUniform(struct SHistogramInfo* pHisto, double* ratio, int32_t num, double** pVal); +int32_t tHistogramMerge(struct SHistogramInfo* pHisto1, struct SHistogramInfo* pHisto2, int32_t numOfEntries, + struct SHistogramInfo** pResHistogram); +void tHistogramDestroy(struct SHistogramInfo** pHisto); -void tHistogramPrint(SHistogramInfo* pHisto); +void tHistogramPrint(struct SHistogramInfo* pHisto); -int32_t histoBinarySearch(SHistBin* pEntry, int32_t len, double val); +int32_t histoBinarySearch(struct SHistBin* pEntry, int32_t len, double val); SHeapEntry* tHeapCreate(int32_t numOfEntries); void tHeapSort(SHeapEntry* pEntry, int32_t len); diff --git a/source/libs/function/inc/tpercentile.h b/source/libs/function/inc/tpercentile.h index 09df42d3a3..35067fa3ea 100644 --- a/source/libs/function/inc/tpercentile.h +++ b/source/libs/function/inc/tpercentile.h @@ -21,59 +21,18 @@ extern "C" { #endif #include "tpagedbuf.h" - -typedef struct MinMaxEntry { - union { - double dMinVal; - // double i64MinVal; - uint64_t u64MinVal; - }; - union { - double dMaxVal; - // double i64MaxVal; - int64_t u64MaxVal; - }; -} MinMaxEntry; - -typedef struct { - int32_t size; - int32_t pageId; - SFilePage *data; -} SSlotInfo; - -typedef struct tMemBucketSlot { - SSlotInfo info; - MinMaxEntry range; -} tMemBucketSlot; +#include "functionResInfoInt.h" struct tMemBucket; -typedef int32_t (*__perc_hash_func_t)(struct tMemBucket *pBucket, const void *value, int32_t *index); - -typedef struct tMemBucket { - int16_t numOfSlots; - int16_t type; - int32_t bytes; - int32_t total; - int32_t elemPerPage; // number of elements for each object - int32_t maxCapacity; // maximum allowed number of elements that can be sort directly to get the result - int32_t bufPageSize; // disk page size - MinMaxEntry range; // value range - int32_t times; // count that has been checked for deciding the correct data value buckets. - __compar_fn_t comparFn; - tMemBucketSlot *pSlots; - SDiskbasedBuf *pBuffer; - __perc_hash_func_t hashFunc; - SHashObj *groupPagesMap; // disk page map for different groups; -} tMemBucket; int32_t tMemBucketCreate(int32_t nElemSize, int16_t dataType, double minval, double maxval, bool hasWindowOrGroup, - tMemBucket **pBucket); + struct tMemBucket **pBucket); -void tMemBucketDestroy(tMemBucket **pBucket); +void tMemBucketDestroy(struct tMemBucket **pBucket); -int32_t tMemBucketPut(tMemBucket *pBucket, const void *data, size_t size); +int32_t tMemBucketPut(struct tMemBucket *pBucket, const void *data, size_t size); -int32_t getPercentile(tMemBucket *pMemBucket, double percent, double *result); +int32_t getPercentile(struct tMemBucket *pMemBucket, double percent, double *result); #endif // TDENGINE_TPERCENTILE_H diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index f13685239a..2e5dd5e381 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -16,6 +16,7 @@ #include "builtinsimpl.h" #include "cJSON.h" #include "function.h" +#include "functionResInfoInt.h" #include "query.h" #include "querynodes.h" #include "tanal.h" @@ -27,82 +28,12 @@ #include "thistogram.h" #include "tpercentile.h" -#define HISTOGRAM_MAX_BINS_NUM 1000 -#define MAVG_MAX_POINTS_NUM 1000 -#define TAIL_MAX_POINTS_NUM 100 -#define TAIL_MAX_OFFSET 100 - -#define HLL_BUCKET_BITS 14 // The bits of the bucket -#define HLL_DATA_BITS (64 - HLL_BUCKET_BITS) -#define HLL_BUCKETS (1 << HLL_BUCKET_BITS) -#define HLL_BUCKET_MASK (HLL_BUCKETS - 1) -#define HLL_ALPHA_INF 0.721347520444481703680 // constant for 0.5/ln(2) - -// typedef struct SMinmaxResInfo { -// bool assign; // assign the first value or not -// int64_t v; -// STuplePos tuplePos; -// -// STuplePos nullTuplePos; -// bool nullTupleSaved; -// int16_t type; -// } SMinmaxResInfo; - -typedef struct STopBotResItem { - SVariant v; - uint64_t uid; // it is a table uid, used to extract tag data during building of the final result for the tag data - STuplePos tuplePos; // tuple data of this chosen row -} STopBotResItem; - -typedef struct STopBotRes { - int32_t maxSize; - int16_t type; - - STuplePos nullTuplePos; - bool nullTupleSaved; - - STopBotResItem* pItems; -} STopBotRes; - -typedef struct SStdRes { - double result; - int64_t count; - union { - double quadraticDSum; - int64_t quadraticISum; - uint64_t quadraticUSum; - }; - union { - double dsum; - int64_t isum; - uint64_t usum; - }; - int16_t type; -} SStdRes; - -typedef struct SLeastSQRInfo { - double matrix[2][3]; - double startVal; - double stepVal; - int64_t num; -} SLeastSQRInfo; - -typedef struct SPercentileInfo { - double result; - tMemBucket* pMemBucket; - int32_t stage; - double minval; - double maxval; - int64_t numOfElems; -} SPercentileInfo; - -typedef struct SAPercentileInfo { - double result; - double percent; - int8_t algo; - SHistogramInfo* pHisto; - TDigest* pTDigest; -} SAPercentileInfo; +bool ignoreNegative(int8_t ignoreOption){ + return (ignoreOption & 0x1) == 0x1; +} +bool ignoreNull(int8_t ignoreOption){ + return (ignoreOption & 0x2) == 0x2; +} typedef enum { APERCT_ALGO_UNKNOWN = 0, @@ -110,76 +41,8 @@ typedef enum { APERCT_ALGO_TDIGEST, } EAPerctAlgoType; -typedef struct SDiffInfo { - bool hasPrev; - bool isFirstRow; - int8_t ignoreOption; // replace the ignore with case when - union { - int64_t i64; - double d64; - } prev; - - int64_t prevTs; -} SDiffInfo; - -bool ignoreNegative(int8_t ignoreOption){ - return (ignoreOption & 0x1) == 0x1; -} -bool ignoreNull(int8_t ignoreOption){ - return (ignoreOption & 0x2) == 0x2; -} -typedef struct SSpreadInfo { - double result; - bool hasResult; - double min; - double max; -} SSpreadInfo; - -typedef struct SElapsedInfo { - double result; - TSKEY min; - TSKEY max; - int64_t timeUnit; -} SElapsedInfo; - -typedef struct STwaInfo { - double dOutput; - int64_t numOfElems; - SPoint1 p; - STimeWindow win; -} STwaInfo; - -typedef struct SHistoFuncBin { - double lower; - double upper; - int64_t count; - double percentage; -} SHistoFuncBin; - -typedef struct SHistoFuncInfo { - int32_t numOfBins; - int32_t totalCount; - bool normalized; - SHistoFuncBin bins[]; -} SHistoFuncInfo; - typedef enum { UNKNOWN_BIN = 0, USER_INPUT_BIN, LINEAR_BIN, LOG_BIN } EHistoBinType; -typedef struct SHLLFuncInfo { - uint64_t result; - uint64_t totalCount; - uint8_t buckets[HLL_BUCKETS]; -} SHLLInfo; - -typedef struct SStateInfo { - union { - int64_t count; - int64_t durationStart; - }; - int64_t prevTs; - bool isPrevTsSet; -} SStateInfo; - typedef enum { STATE_OPER_INVALID = 0, STATE_OPER_LT, @@ -190,105 +53,6 @@ typedef enum { STATE_OPER_EQ, } EStateOperType; -typedef struct SMavgInfo { - int32_t pos; - double sum; - int64_t prevTs; - bool isPrevTsSet; - int32_t numOfPoints; - bool pointsMeet; - double points[]; -} SMavgInfo; - -typedef struct SSampleInfo { - int32_t samples; - int32_t totalPoints; - int32_t numSampled; - uint8_t colType; - uint16_t colBytes; - - STuplePos nullTuplePos; - bool nullTupleSaved; - - char* data; - STuplePos* tuplePos; -} SSampleInfo; - -typedef struct STailItem { - int64_t timestamp; - bool isNull; - char data[]; -} STailItem; - -typedef struct STailInfo { - int32_t numOfPoints; - int32_t numAdded; - int32_t offset; - uint8_t colType; - uint16_t colBytes; - STailItem** pItems; -} STailInfo; - -typedef struct SUniqueItem { - int64_t timestamp; - bool isNull; - char data[]; -} SUniqueItem; - -typedef struct SUniqueInfo { - int32_t numOfPoints; - uint8_t colType; - uint16_t colBytes; - bool hasNull; // null is not hashable, handle separately - SHashObj* pHash; - char pItems[]; -} SUniqueInfo; - -typedef struct SModeItem { - int64_t count; - STuplePos dataPos; - STuplePos tuplePos; -} SModeItem; - -typedef struct SModeInfo { - uint8_t colType; - uint16_t colBytes; - SHashObj* pHash; - - STuplePos nullTuplePos; - bool nullTupleSaved; - - char* buf; // serialize data buffer -} SModeInfo; - -typedef struct SDerivInfo { - double prevValue; // previous value - TSKEY prevTs; // previous timestamp - bool ignoreNegative; // ignore the negative value - int64_t tsWindow; // time window for derivative - bool valueSet; // the value has been set already -} SDerivInfo; - -typedef struct SRateInfo { - double firstValue; - TSKEY firstKey; - double lastValue; - TSKEY lastKey; - int8_t hasResult; // flag to denote has value - - char* firstPk; - char* lastPk; - int8_t pkType; - int32_t pkBytes; - char pkData[]; -} SRateInfo; - -typedef struct SGroupKeyInfo { - bool hasResult; - bool isNull; - char data[]; -} SGroupKeyInfo; - #define SET_VAL(_info, numOfElem, res) \ do { \ if ((numOfElem) <= 0) { \ diff --git a/source/libs/function/src/detail/tavgfunction.c b/source/libs/function/src/detail/tavgfunction.c index 854260c354..b1bef84511 100644 --- a/source/libs/function/src/detail/tavgfunction.c +++ b/source/libs/function/src/detail/tavgfunction.c @@ -92,13 +92,6 @@ out->sum.usum += val; \ } -typedef struct SAvgRes { - double result; - SSumRes sum; - int64_t count; - int16_t type; // store the original input type, used in merge function -} SAvgRes; - static void floatVectorSumAVX(const float* plist, int32_t numOfRows, SAvgRes* pRes) { const int32_t bitWidth = 256; diff --git a/source/libs/scalar/src/sclfunc.c b/source/libs/scalar/src/sclfunc.c index 71e29a7aa1..9ae89436f2 100644 --- a/source/libs/scalar/src/sclfunc.c +++ b/source/libs/scalar/src/sclfunc.c @@ -3006,20 +3006,56 @@ static int32_t doScalarFunction2(SScalarParam *pInput, int32_t inputNum, SScalar } break; } - case TSDB_DATA_TYPE_TINYINT: - case TSDB_DATA_TYPE_SMALLINT: - case TSDB_DATA_TYPE_INT: - case TSDB_DATA_TYPE_BIGINT:{ + case TSDB_DATA_TYPE_TINYINT: { + int8_t *in = (int8_t *)pInputData[0]->pData; + int8_t *out = (int8_t *)pOutputData->pData; + int8_t result = (int8_t)d1((double)in[i], in2); + out[i] = result; + break; + } + case TSDB_DATA_TYPE_SMALLINT: { + int16_t *in = (int16_t *)pInputData[0]->pData; + int16_t *out = (int16_t *)pOutputData->pData; + int16_t result = (int16_t)d1((double)in[i], in2); + out[i] = result; + break; + } + case TSDB_DATA_TYPE_INT: { + int32_t *in = (int32_t *)pInputData[0]->pData; + int32_t *out = (int32_t *)pOutputData->pData; + int32_t result = (int32_t)d1((double)in[i], in2); + out[i] = result; + break; + } + case TSDB_DATA_TYPE_BIGINT: { int64_t *in = (int64_t *)pInputData[0]->pData; int64_t *out = (int64_t *)pOutputData->pData; int64_t result = (int64_t)d1((double)in[i], in2); out[i] = result; break; } - case TSDB_DATA_TYPE_UTINYINT: - case TSDB_DATA_TYPE_USMALLINT: - case TSDB_DATA_TYPE_UINT: - case TSDB_DATA_TYPE_UBIGINT:{ + case TSDB_DATA_TYPE_UTINYINT: { + uint8_t *in = (uint8_t *)pInputData[0]->pData; + uint8_t *out = (uint8_t *)pOutputData->pData; + uint8_t result = (uint8_t)d1((double)in[i], in2); + out[i] = result; + break; + } + case TSDB_DATA_TYPE_USMALLINT: { + uint16_t *in = (uint16_t *)pInputData[0]->pData; + uint16_t *out = (uint16_t *)pOutputData->pData; + uint16_t result = (uint16_t)d1((double)in[i], in2); + out[i] = result; + break; + } + case TSDB_DATA_TYPE_UINT: { + uint32_t *in = (uint32_t *)pInputData[0]->pData; + uint32_t *out = (uint32_t *)pOutputData->pData; + uint32_t result = (uint32_t)d1((double)in[i], in2); + out[i] = result; + break; + } + case TSDB_DATA_TYPE_UBIGINT: { uint64_t *in = (uint64_t *)pInputData[0]->pData; uint64_t *out = (uint64_t *)pOutputData->pData; uint64_t result = (uint64_t)d1((double)in[i], in2); @@ -3062,20 +3098,56 @@ static int32_t doScalarFunction2(SScalarParam *pInput, int32_t inputNum, SScalar } break; } - case TSDB_DATA_TYPE_TINYINT: - case TSDB_DATA_TYPE_SMALLINT: - case TSDB_DATA_TYPE_INT: - case TSDB_DATA_TYPE_BIGINT:{ + case TSDB_DATA_TYPE_TINYINT: { + int8_t *in = (int8_t *)pInputData[0]->pData; + int8_t *out = (int8_t *)pOutputData->pData; + int8_t result = (int8_t)d1((double)in[0], in2); + out[i] = result; + break; + } + case TSDB_DATA_TYPE_SMALLINT: { + int16_t *in = (int16_t *)pInputData[0]->pData; + int16_t *out = (int16_t *)pOutputData->pData; + int16_t result = (int16_t)d1((double)in[0], in2); + out[i] = result; + break; + } + case TSDB_DATA_TYPE_INT: { + int32_t *in = (int32_t *)pInputData[0]->pData; + int32_t *out = (int32_t *)pOutputData->pData; + int32_t result = (int32_t)d1((double)in[0], in2); + out[i] = result; + break; + } + case TSDB_DATA_TYPE_BIGINT: { int64_t *in = (int64_t *)pInputData[0]->pData; int64_t *out = (int64_t *)pOutputData->pData; int64_t result = (int64_t)d1((double)in[0], in2); out[i] = result; break; } - case TSDB_DATA_TYPE_UTINYINT: - case TSDB_DATA_TYPE_USMALLINT: - case TSDB_DATA_TYPE_UINT: - case TSDB_DATA_TYPE_UBIGINT:{ + case TSDB_DATA_TYPE_UTINYINT: { + uint8_t *in = (uint8_t *)pInputData[0]->pData; + uint8_t *out = (uint8_t *)pOutputData->pData; + uint8_t result = (uint8_t)d1((double)in[0], in2); + out[i] = result; + break; + } + case TSDB_DATA_TYPE_USMALLINT: { + uint16_t *in = (uint16_t *)pInputData[0]->pData; + uint16_t *out = (uint16_t *)pOutputData->pData; + uint16_t result = (uint16_t)d1((double)in[0], in2); + out[i] = result; + break; + } + case TSDB_DATA_TYPE_UINT: { + uint32_t *in = (uint32_t *)pInputData[0]->pData; + uint32_t *out = (uint32_t *)pOutputData->pData; + uint32_t result = (uint32_t)d1((double)in[0], in2); + out[i] = result; + break; + } + case TSDB_DATA_TYPE_UBIGINT: { uint64_t *in = (uint64_t *)pInputData[0]->pData; uint64_t *out = (uint64_t *)pOutputData->pData; uint64_t result = (uint64_t)d1((double)in[0], in2); @@ -3119,20 +3191,56 @@ static int32_t doScalarFunction2(SScalarParam *pInput, int32_t inputNum, SScalar } break; } - case TSDB_DATA_TYPE_TINYINT: - case TSDB_DATA_TYPE_SMALLINT: - case TSDB_DATA_TYPE_INT: - case TSDB_DATA_TYPE_BIGINT:{ + case TSDB_DATA_TYPE_TINYINT: { + int8_t *in = (int8_t *)pInputData[0]->pData; + int8_t *out = (int8_t *)pOutputData->pData; + int8_t result = (int8_t)d1((double)in[i], in2); + out[i] = result; + break; + } + case TSDB_DATA_TYPE_SMALLINT: { + int16_t *in = (int16_t *)pInputData[0]->pData; + int16_t *out = (int16_t *)pOutputData->pData; + int16_t result = (int16_t)d1((double)in[i], in2); + out[i] = result; + break; + } + case TSDB_DATA_TYPE_INT: { + int32_t *in = (int32_t *)pInputData[0]->pData; + int32_t *out = (int32_t *)pOutputData->pData; + int32_t result = (int32_t)d1((double)in[i], in2); + out[i] = result; + break; + } + case TSDB_DATA_TYPE_BIGINT: { int64_t *in = (int64_t *)pInputData[0]->pData; int64_t *out = (int64_t *)pOutputData->pData; int64_t result = (int64_t)d1((double)in[i], in2); out[i] = result; break; } - case TSDB_DATA_TYPE_UTINYINT: - case TSDB_DATA_TYPE_USMALLINT: - case TSDB_DATA_TYPE_UINT: - case TSDB_DATA_TYPE_UBIGINT:{ + case TSDB_DATA_TYPE_UTINYINT: { + uint8_t *in = (uint8_t *)pInputData[0]->pData; + uint8_t *out = (uint8_t *)pOutputData->pData; + uint8_t result = (uint8_t)d1((double)in[i], in2); + out[i] = result; + break; + } + case TSDB_DATA_TYPE_USMALLINT: { + uint16_t *in = (uint16_t *)pInputData[0]->pData; + uint16_t *out = (uint16_t *)pOutputData->pData; + uint16_t result = (uint16_t)d1((double)in[i], in2); + out[i] = result; + break; + } + case TSDB_DATA_TYPE_UINT: { + uint32_t *in = (uint32_t *)pInputData[0]->pData; + uint32_t *out = (uint32_t *)pOutputData->pData; + uint32_t result = (uint32_t)d1((double)in[i], in2); + out[i] = result; + break; + } + case TSDB_DATA_TYPE_UBIGINT: { uint64_t *in = (uint64_t *)pInputData[0]->pData; uint64_t *out = (uint64_t *)pOutputData->pData; uint64_t result = (uint64_t)d1((double)in[i], in2); diff --git a/tests/army/query/function/test_resinfo.py b/tests/army/query/function/test_resinfo.py new file mode 100644 index 0000000000..51d51f3ce1 --- /dev/null +++ b/tests/army/query/function/test_resinfo.py @@ -0,0 +1,69 @@ +################################################################### +# Copyright (c) 2016 by TAOS Technologies, Inc. +# All rights reserved. +# +# This file is proprietary and confidential to TAOS Technologies. +# No part of this file may be reproduced, stored, transmitted, +# disclosed or used in any form or by any means other than as +# expressly provided by the written permission from Jianhui Tao +# +################################################################### + +# -*- coding: utf-8 -*- + +import sys +import time +import random +import hashlib + +import taos +import frame +import frame.etool + +from frame.log import * +from frame.cases import * +from frame.sql import * +from frame.caseBase import * +from frame import * + +initial_hash_resinfoInt = "e739cde34b98f13dd9ad696d18f060cc" +initial_hash_resinfo = "172d04aa7af0d8cd2e4d9df284079958" + +class TDTestCase(TBase): + def get_file_hash(self, file_path): + hasher = hashlib.md5() + with open(file_path, 'rb') as f: + buf = f.read() + hasher.update(buf) + return hasher.hexdigest() + + def testFileChanged(self): + tdLog.info(f"insert data.") + # taosBenchmark run + resinfoIntFile = etool.curFile(__file__, "../../../../source/libs/function/inc/functionResInfoInt.h") + resinfoFile = etool.curFile(__file__, "../../../../include/libs/function/functionResInfo.h") + current_hash = self.get_file_hash(resinfoIntFile) + if current_hash != initial_hash_resinfoInt: + tdLog.exit(f"{resinfoIntFile} has been modified.") + else: + tdLog.success(f"{resinfoIntFile} is not modified.") + current_hash = self.get_file_hash(resinfoFile) + if current_hash != initial_hash_resinfo: + tdLog.exit(f"{resinfoFile} has been modified.") + else: + tdLog.success(f"{resinfoFile} is not modified.") + + + + # run + def run(self): + tdLog.debug(f"start to excute {__file__}") + + # insert data + self.testFileChanged() + + tdLog.success(f"{__file__} successfully executed") + + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase()) diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index 450ef91798..81ffaf0d4e 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -15,6 +15,7 @@ ,,y,army,./pytest.sh python3 ./test.py -f cluster/snapshot.py -N 3 -L 3 -D 2 ,,y,army,./pytest.sh python3 ./test.py -f query/function/test_func_elapsed.py ,,y,army,./pytest.sh python3 ./test.py -f query/function/test_function.py +,,y,army,./pytest.sh python3 ./test.py -f query/function/test_resinfo.py ,,y,army,./pytest.sh python3 ./test.py -f query/function/concat.py ,,y,army,./pytest.sh python3 ./test.py -f query/function/cast.py ,,y,army,./pytest.sh python3 ./test.py -f query/test_join.py @@ -283,6 +284,7 @@ ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_taosx.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_ts5466.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_ts-5473.py +,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/td-32187.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_ts4563.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_td32526.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_replay.py diff --git a/tests/system-test/7-tmq/td-32187.py b/tests/system-test/7-tmq/td-32187.py new file mode 100644 index 0000000000..7f971b23da --- /dev/null +++ b/tests/system-test/7-tmq/td-32187.py @@ -0,0 +1,45 @@ +import taos +import sys +import time +import socket +import os +import threading + +from util.log import * +from util.sql import * +from util.cases import * +from util.dnodes import * +from util.common import * +from taos.tmq import * +sys.path.append("./7-tmq") +from tmqCommon import * + +class TDTestCase: + updatecfgDict = {'debugFlag': 135, 'asynclog': 0} + def init(self, conn, logSql, replicaVar=1): + self.replicaVar = int(replicaVar) + tdLog.debug(f"start to excute {__file__}") + tdSql.init(conn.cursor()) + #tdSql.init(conn.cursor(), logSql) # output sql.txt file + + def run(self): + tdSql.execute(f'create database if not exists db_32187') + tdSql.execute(f'use db_32187') + tdSql.execute(f'create stable if not exists s5466 (ts timestamp, c1 int, c2 int) tags (t binary(32))') + tdSql.execute(f'insert into t1 using s5466 tags("__devicid__") values(1669092069068, 0, 1)') + tdSql.execute(f'insert into t1(ts, c1, c2) values(1669092069067, 0, 1)') + + tdSql.execute("create topic topic_test with meta as database db_32187") + buildPath = tdCom.getBuildPath() + cmdStr = '%s/build/bin/tmq_td32187'%(buildPath) + tdLog.info(cmdStr) + os.system(cmdStr) + + return + + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase()) \ No newline at end of file diff --git a/utils/test/c/CMakeLists.txt b/utils/test/c/CMakeLists.txt index 29b2fcc99f..8701f208bb 100644 --- a/utils/test/c/CMakeLists.txt +++ b/utils/test/c/CMakeLists.txt @@ -5,6 +5,7 @@ add_executable(create_table createTable.c) add_executable(tmq_taosx_ci tmq_taosx_ci.c) add_executable(tmq_ts5466 tmq_ts5466.c) add_executable(tmq_td32526 tmq_td32526.c) +add_executable(tmq_td32187 tmq_td32187.c) add_executable(tmq_write_raw_test tmq_write_raw_test.c) add_executable(write_raw_block_test write_raw_block_test.c) add_executable(sml_test sml_test.c) @@ -63,6 +64,13 @@ target_link_libraries( PUBLIC common PUBLIC os ) +target_link_libraries( + tmq_td32187 + PUBLIC taos + PUBLIC util + PUBLIC common + PUBLIC os +) target_link_libraries( tmq_td32526 PUBLIC taos diff --git a/utils/test/c/tmq_td32187.c b/utils/test/c/tmq_td32187.c new file mode 100644 index 0000000000..fb26e248c9 --- /dev/null +++ b/utils/test/c/tmq_td32187.c @@ -0,0 +1,176 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include +#include +#include +#include +#include +#include "cJSON.h" +#include "taos.h" +#include "tmsg.h" +#include "types.h" + + +static TAOS_RES* tmqmessage = NULL; +static char* topic = "topic_test"; +static int32_t vgroupId = 0; +static int64_t offset = 0; + +void tmq_commit_cb_print(tmq_t* tmq, int32_t code, void* param) { + printf("commit %d tmq %p param %p\n", code, tmq, param); +} + +tmq_t* build_consumer() { + tmq_conf_t* conf = tmq_conf_new(); + tmq_conf_set(conf, "group.id", "tg2"); + tmq_conf_set(conf, "client.id", "my app 1"); + tmq_conf_set(conf, "td.connect.user", "root"); + tmq_conf_set(conf, "td.connect.pass", "taosdata"); + tmq_conf_set(conf, "msg.with.table.name", "true"); + tmq_conf_set(conf, "enable.auto.commit", "true"); + tmq_conf_set(conf, "auto.offset.reset", "earliest"); + tmq_conf_set(conf, "msg.consume.excluded", "1"); +// tmq_conf_set(conf, "experimental.snapshot.enable", "true"); + + tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL); + tmq_t* tmq = tmq_consumer_new(conf, NULL, 0); + assert(tmq); + tmq_conf_destroy(conf); + return tmq; +} + +tmq_list_t* build_topic_list() { + tmq_list_t* topic_list = tmq_list_new(); + tmq_list_append(topic_list, topic); + return topic_list; +} + +static void callFunc(int i, tmq_t* tmq, tmq_list_t* topics) { + printf("call %d\n", i); + switch (i) { + case 0: + tmq_subscribe(tmq, topics); + break; + case 1: + tmq_unsubscribe(tmq); + break; + case 2:{ + tmq_list_t* t = NULL; + tmq_subscription(tmq, &t); + tmq_list_destroy(t); + break; + } + case 3: + taos_free_result(tmqmessage); + tmqmessage = tmq_consumer_poll(tmq, 5000); + break; + case 4: +// tmq_consumer_close(tmq); + break; + case 5: + tmq_commit_sync(tmq, NULL); + break; + case 6: + tmq_commit_async(tmq, NULL, NULL, NULL); + break; + case 7: + tmq_commit_offset_sync(tmq, topic, vgroupId, offset); + break; + case 8: + tmq_commit_offset_async(tmq, topic, vgroupId, offset, NULL, NULL); + break; + case 9: + tmq_get_topic_assignment(tmq, topic, NULL, NULL); + break; + case 10: + tmq_free_assignment(NULL); + break; + case 11: + tmq_offset_seek(tmq, topic, vgroupId, offset); + break; + case 12: + tmq_position(tmq, topic, vgroupId); + break; + case 13: + tmq_committed(tmq, topic, vgroupId); + break; + case 14: + tmq_get_connect(tmq); + break; + case 15: + tmq_get_table_name(tmqmessage); + break; + case 16: + vgroupId = tmq_get_vgroup_id(tmqmessage); + break; + case 17: + offset = tmq_get_vgroup_offset(tmqmessage); + break; + case 18: + tmq_get_res_type(tmqmessage); + break; + case 19: + tmq_get_topic_name(tmqmessage); + break; + case 20: + tmq_get_db_name(tmqmessage); + break; + default: + break; + } +} +void basic_consume_loop(tmq_t* tmq, tmq_list_t* topics) { + int32_t code; + + if ((code = tmq_subscribe(tmq, topics))) { + fprintf(stderr, "%% Failed to start consuming topics: %s\n", tmq_err2str(code)); + printf("subscribe err\n"); + return; + } + int32_t cnt = 0; + while (1) { + tmqmessage = tmq_consumer_poll(tmq, 5000); + if (tmqmessage) { + printf("poll message\n"); + while(cnt < 100){ + uint32_t i = taosRand()%21; + callFunc(i, tmq, topics); + callFunc(i, tmq, topics); + cnt++; + } + while(cnt < 300){ + uint32_t i = taosRand()%21; + callFunc(i, tmq, topics); + cnt++; + } + taos_free_result(tmqmessage); + } + break; + } + + code = tmq_consumer_close(tmq); + if (code) + fprintf(stderr, "%% Failed to close consumer: %s\n", tmq_err2str(code)); + else + fprintf(stderr, "%% Consumer closed\n"); +} + +int main(int argc, char* argv[]) { + tmq_t* tmq = build_consumer(); + tmq_list_t* topic_list = build_topic_list(); + basic_consume_loop(tmq, topic_list); + tmq_list_destroy(topic_list); +} \ No newline at end of file