Merge branch '3.0' of https://github.com/taosdata/TDengine into feature/vnode_refact1
This commit is contained in:
commit
2f923ead39
|
@ -206,6 +206,8 @@ typedef struct SqlFunctionCtx {
|
||||||
struct SDiskbasedBuf *pBuf;
|
struct SDiskbasedBuf *pBuf;
|
||||||
struct SSDataBlock *pSrcBlock;
|
struct SSDataBlock *pSrcBlock;
|
||||||
int32_t curBufPage;
|
int32_t curBufPage;
|
||||||
|
|
||||||
|
char* udfName[TSDB_FUNC_NAME_LEN];
|
||||||
} SqlFunctionCtx;
|
} SqlFunctionCtx;
|
||||||
|
|
||||||
enum {
|
enum {
|
||||||
|
@ -334,8 +336,6 @@ int32_t udfcOpen();
|
||||||
*/
|
*/
|
||||||
int32_t udfcClose();
|
int32_t udfcClose();
|
||||||
|
|
||||||
typedef void *UdfcFuncHandle;
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -42,8 +42,7 @@ enum {
|
||||||
UDFC_CODE_INVALID_STATE = -5
|
UDFC_CODE_INVALID_STATE = -5
|
||||||
};
|
};
|
||||||
|
|
||||||
|
typedef void *UdfcFuncHandle;
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* setup udf
|
* setup udf
|
||||||
|
@ -95,6 +94,7 @@ typedef struct SUdfDataBlock {
|
||||||
typedef struct SUdfInterBuf {
|
typedef struct SUdfInterBuf {
|
||||||
int32_t bufLen;
|
int32_t bufLen;
|
||||||
char* buf;
|
char* buf;
|
||||||
|
int8_t numOfResult; //zero or one
|
||||||
} SUdfInterBuf;
|
} SUdfInterBuf;
|
||||||
|
|
||||||
// output: interBuf
|
// output: interBuf
|
||||||
|
@ -118,6 +118,10 @@ int32_t callUdfScalarFunc(UdfcFuncHandle handle, SScalarParam *input, int32_t nu
|
||||||
*/
|
*/
|
||||||
int32_t teardownUdf(UdfcFuncHandle handle);
|
int32_t teardownUdf(UdfcFuncHandle handle);
|
||||||
|
|
||||||
|
bool udfAggGetEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
||||||
|
bool udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo* pResultCellInfo);
|
||||||
|
int32_t udfAggProcess(struct SqlFunctionCtx *pCtx);
|
||||||
|
int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock);
|
||||||
// end API to taosd and qworker
|
// end API to taosd and qworker
|
||||||
//=============================================================================================================================
|
//=============================================================================================================================
|
||||||
// begin API to UDF writer.
|
// begin API to UDF writer.
|
||||||
|
@ -133,11 +137,11 @@ typedef int32_t (*TUdfTeardownFunc)();
|
||||||
//typedef int32_t addVariableLengthColumnData(SColumnData *columnData, int rowIndex, bool isNull, int32_t dataLen, char * data);
|
//typedef int32_t addVariableLengthColumnData(SColumnData *columnData, int rowIndex, bool isNull, int32_t dataLen, char * data);
|
||||||
|
|
||||||
typedef int32_t (*TUdfFreeUdfColumnFunc)(SUdfColumn* column);
|
typedef int32_t (*TUdfFreeUdfColumnFunc)(SUdfColumn* column);
|
||||||
|
typedef int32_t (*TUdfScalarProcFunc)(SUdfDataBlock* block, SUdfColumn *resultCol);
|
||||||
|
|
||||||
typedef int32_t (*TUdfScalarProcFunc)(SUdfDataBlock block, SUdfColumn *resultCol);
|
typedef int32_t (*TUdfAggStartFunc)(SUdfInterBuf *buf);
|
||||||
typedef int32_t (*TUdfAggInitFunc)(SUdfInterBuf *buf);
|
typedef int32_t (*TUdfAggProcessFunc)(SUdfDataBlock* block, SUdfInterBuf *interBuf);
|
||||||
typedef int32_t (*TUdfAggProcessFunc)(SUdfDataBlock block, SUdfInterBuf *interBuf);
|
typedef int32_t (*TUdfAggFinishFunc)(SUdfInterBuf* buf, SUdfInterBuf *resultData);
|
||||||
typedef int32_t (*TUdfAggFinalizeFunc)(SUdfInterBuf buf, SUdfInterBuf *resultData);
|
|
||||||
|
|
||||||
|
|
||||||
// end API to UDF writer
|
// end API to UDF writer
|
|
@ -0,0 +1,46 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
#ifndef _TSTREAMUPDATE_H_
|
||||||
|
#define _TSTREAMUPDATE_H_
|
||||||
|
|
||||||
|
#include "taosdef.h"
|
||||||
|
#include "tarray.h"
|
||||||
|
#include "tmsg.h"
|
||||||
|
#include "tscalablebf.h"
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
extern "C" {
|
||||||
|
#endif
|
||||||
|
|
||||||
|
typedef struct SUpdateInfo {
|
||||||
|
SArray *pTsBuckets;
|
||||||
|
uint64_t numBuckets;
|
||||||
|
SArray *pTsSBFs;
|
||||||
|
uint64_t numSBFs;
|
||||||
|
int64_t interval;
|
||||||
|
int64_t watermark;
|
||||||
|
TSKEY minTS;
|
||||||
|
} SUpdateInfo;
|
||||||
|
|
||||||
|
SUpdateInfo *updateInfoInitP(SInterval* pInterval, int64_t watermark);
|
||||||
|
SUpdateInfo *updateInfoInit(int64_t interval, int32_t precision, int64_t watermark);
|
||||||
|
bool isUpdated(SUpdateInfo *pInfo, tb_uid_t tableId, TSKEY ts);
|
||||||
|
void updateInfoDestroy(SUpdateInfo *pInfo);
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
#endif /* ifndef _TSTREAMUPDATE_H_ */
|
|
@ -218,6 +218,13 @@ void taosArrayClear(SArray* pArray);
|
||||||
*/
|
*/
|
||||||
void taosArrayClearEx(SArray* pArray, void (*fp)(void*));
|
void taosArrayClearEx(SArray* pArray, void (*fp)(void*));
|
||||||
|
|
||||||
|
/**
|
||||||
|
* clear the array (remove all element)
|
||||||
|
* @param pArray
|
||||||
|
* @param fp
|
||||||
|
*/
|
||||||
|
void taosArrayClearP(SArray* pArray, FDelete fp);
|
||||||
|
|
||||||
void* taosArrayDestroy(SArray* pArray);
|
void* taosArrayDestroy(SArray* pArray);
|
||||||
void taosArrayDestroyP(SArray* pArray, FDelete fp);
|
void taosArrayDestroyP(SArray* pArray, FDelete fp);
|
||||||
void taosArrayDestroyEx(SArray* pArray, FDelete fp);
|
void taosArrayDestroyEx(SArray* pArray, FDelete fp);
|
||||||
|
|
|
@ -0,0 +1,50 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#ifndef _TD_UTIL_BLOOMFILTER_H_
|
||||||
|
#define _TD_UTIL_BLOOMFILTER_H_
|
||||||
|
|
||||||
|
#include "os.h"
|
||||||
|
#include "thash.h"
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
extern "C" {
|
||||||
|
#endif
|
||||||
|
|
||||||
|
typedef struct SBloomFilter {
|
||||||
|
uint32_t hashFunctions;
|
||||||
|
uint64_t expectedEntries;
|
||||||
|
uint64_t numUnits;
|
||||||
|
uint64_t numBits;
|
||||||
|
uint64_t size;
|
||||||
|
_hash_fn_t hashFn1;
|
||||||
|
_hash_fn_t hashFn2;
|
||||||
|
void *buffer;
|
||||||
|
double errorRate;
|
||||||
|
} SBloomFilter;
|
||||||
|
|
||||||
|
SBloomFilter *tBloomFilterInit(uint64_t expectedEntries, double errorRate);
|
||||||
|
int32_t tBloomFilterPut(SBloomFilter *pBF, const void *keyBuf, uint32_t len);
|
||||||
|
int32_t tBloomFilterNoContain(const SBloomFilter *pBF, const void *keyBuf,
|
||||||
|
uint32_t len);
|
||||||
|
void tBloomFilterDestroy(SBloomFilter *pBF);
|
||||||
|
void tBloomFilterDump(const SBloomFilter *pBF);
|
||||||
|
bool tBloomFilterIsFull(const SBloomFilter *pBF);
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
#endif /*_TD_UTIL_BLOOMFILTER_H_*/
|
|
@ -132,6 +132,7 @@ extern const int32_t TYPE_BYTES[15];
|
||||||
#define TSDB_PERFS_TABLE_CONSUMERS "consumers"
|
#define TSDB_PERFS_TABLE_CONSUMERS "consumers"
|
||||||
#define TSDB_PERFS_TABLE_SUBSCRIPTIONS "subscriptions"
|
#define TSDB_PERFS_TABLE_SUBSCRIPTIONS "subscriptions"
|
||||||
#define TSDB_PERFS_TABLE_OFFSETS "offsets"
|
#define TSDB_PERFS_TABLE_OFFSETS "offsets"
|
||||||
|
#define TSDB_PERFS_TABLE_STREAMS "streams"
|
||||||
|
|
||||||
#define TSDB_INDEX_TYPE_SMA "SMA"
|
#define TSDB_INDEX_TYPE_SMA "SMA"
|
||||||
#define TSDB_INDEX_TYPE_FULLTEXT "FULLTEXT"
|
#define TSDB_INDEX_TYPE_FULLTEXT "FULLTEXT"
|
||||||
|
|
|
@ -0,0 +1,42 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#ifndef _TD_UTIL_SCALABLEBF_H_
|
||||||
|
#define _TD_UTIL_SCALABLEBF_H_
|
||||||
|
|
||||||
|
#include "tbloomfilter.h"
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
extern "C" {
|
||||||
|
#endif
|
||||||
|
|
||||||
|
typedef struct SScalableBf {
|
||||||
|
SArray *bfArray; // array of bloom filters
|
||||||
|
uint32_t growth;
|
||||||
|
uint64_t numBits;
|
||||||
|
} SScalableBf;
|
||||||
|
|
||||||
|
SScalableBf *tScalableBfInit(uint64_t expectedEntries, double errorRate);
|
||||||
|
int32_t tScalableBfPut(SScalableBf *pSBf, const void *keyBuf, uint32_t len);
|
||||||
|
int32_t tScalableBfNoContain(const SScalableBf *pSBf, const void *keyBuf,
|
||||||
|
uint32_t len);
|
||||||
|
void tScalableBfDestroy(SScalableBf *pSBf);
|
||||||
|
void tScalableBfDump(const SScalableBf *pSBf);
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
#endif /*_TD_UTIL_SCALABLEBF_H_*/
|
|
@ -188,7 +188,6 @@ typedef struct SRequestSendRecvBody {
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int8_t resType;
|
int8_t resType;
|
||||||
int32_t code;
|
|
||||||
char topic[TSDB_TOPIC_FNAME_LEN];
|
char topic[TSDB_TOPIC_FNAME_LEN];
|
||||||
int32_t vgId;
|
int32_t vgId;
|
||||||
SSchemaWrapper schema;
|
SSchemaWrapper schema;
|
||||||
|
|
|
@ -582,8 +582,9 @@ typedef struct {
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
char name[TSDB_TOPIC_FNAME_LEN];
|
char name[TSDB_TOPIC_FNAME_LEN];
|
||||||
char db[TSDB_DB_FNAME_LEN];
|
char sourceDb[TSDB_DB_FNAME_LEN];
|
||||||
char outputSTbName[TSDB_TABLE_FNAME_LEN];
|
char targetDb[TSDB_DB_FNAME_LEN];
|
||||||
|
char targetSTbName[TSDB_TABLE_FNAME_LEN];
|
||||||
int64_t createTime;
|
int64_t createTime;
|
||||||
int64_t updateTime;
|
int64_t updateTime;
|
||||||
int64_t uid;
|
int64_t uid;
|
||||||
|
|
|
@ -812,6 +812,7 @@ static int32_t mndRetrieveConsumer(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock
|
||||||
colDataAppend(pColInfo, numOfRows, (const char *)status, false);
|
colDataAppend(pColInfo, numOfRows, (const char *)status, false);
|
||||||
|
|
||||||
// subscribed topics
|
// subscribed topics
|
||||||
|
// TODO: split into multiple rows
|
||||||
char topics[TSDB_SHOW_LIST_LEN + VARSTR_HEADER_SIZE] = {0};
|
char topics[TSDB_SHOW_LIST_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||||
char *showStr = taosShowStrArray(pConsumer->assignedTopics);
|
char *showStr = taosShowStrArray(pConsumer->assignedTopics);
|
||||||
tstrncpy(varDataVal(topics), showStr, TSDB_SHOW_LIST_LEN);
|
tstrncpy(varDataVal(topics), showStr, TSDB_SHOW_LIST_LEN);
|
||||||
|
|
|
@ -410,7 +410,7 @@ int32_t tEncodeSStreamObj(SCoder *pEncoder, const SStreamObj *pObj) {
|
||||||
int32_t sz = 0;
|
int32_t sz = 0;
|
||||||
/*int32_t outputNameSz = 0;*/
|
/*int32_t outputNameSz = 0;*/
|
||||||
if (tEncodeCStr(pEncoder, pObj->name) < 0) return -1;
|
if (tEncodeCStr(pEncoder, pObj->name) < 0) return -1;
|
||||||
if (tEncodeCStr(pEncoder, pObj->db) < 0) return -1;
|
if (tEncodeCStr(pEncoder, pObj->sourceDb) < 0) return -1;
|
||||||
if (tEncodeI64(pEncoder, pObj->createTime) < 0) return -1;
|
if (tEncodeI64(pEncoder, pObj->createTime) < 0) return -1;
|
||||||
if (tEncodeI64(pEncoder, pObj->updateTime) < 0) return -1;
|
if (tEncodeI64(pEncoder, pObj->updateTime) < 0) return -1;
|
||||||
if (tEncodeI64(pEncoder, pObj->uid) < 0) return -1;
|
if (tEncodeI64(pEncoder, pObj->uid) < 0) return -1;
|
||||||
|
@ -456,7 +456,7 @@ int32_t tEncodeSStreamObj(SCoder *pEncoder, const SStreamObj *pObj) {
|
||||||
|
|
||||||
int32_t tDecodeSStreamObj(SCoder *pDecoder, SStreamObj *pObj) {
|
int32_t tDecodeSStreamObj(SCoder *pDecoder, SStreamObj *pObj) {
|
||||||
if (tDecodeCStrTo(pDecoder, pObj->name) < 0) return -1;
|
if (tDecodeCStrTo(pDecoder, pObj->name) < 0) return -1;
|
||||||
if (tDecodeCStrTo(pDecoder, pObj->db) < 0) return -1;
|
if (tDecodeCStrTo(pDecoder, pObj->sourceDb) < 0) return -1;
|
||||||
if (tDecodeI64(pDecoder, &pObj->createTime) < 0) return -1;
|
if (tDecodeI64(pDecoder, &pObj->createTime) < 0) return -1;
|
||||||
if (tDecodeI64(pDecoder, &pObj->updateTime) < 0) return -1;
|
if (tDecodeI64(pDecoder, &pObj->updateTime) < 0) return -1;
|
||||||
if (tDecodeI64(pDecoder, &pObj->uid) < 0) return -1;
|
if (tDecodeI64(pDecoder, &pObj->uid) < 0) return -1;
|
||||||
|
|
|
@ -124,14 +124,6 @@ static const SInfosTableSchema userStbsSchema[] = {
|
||||||
{.name = "table_comment", .bytes = 1024 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
|
{.name = "table_comment", .bytes = 1024 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||||
};
|
};
|
||||||
|
|
||||||
static const SInfosTableSchema userStreamsSchema[] = {
|
|
||||||
{.name = "stream_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR},
|
|
||||||
{.name = "user_name", .bytes = 23, .type = TSDB_DATA_TYPE_VARCHAR},
|
|
||||||
{.name = "dest_table", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR},
|
|
||||||
{.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP},
|
|
||||||
{.name = "sql", .bytes = 1024, .type = TSDB_DATA_TYPE_VARCHAR},
|
|
||||||
};
|
|
||||||
|
|
||||||
static const SInfosTableSchema userTblsSchema[] = {
|
static const SInfosTableSchema userTblsSchema[] = {
|
||||||
{.name = "table_name", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR},
|
{.name = "table_name", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||||
{.name = "db_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR},
|
{.name = "db_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||||
|
@ -259,7 +251,6 @@ static const SInfosTableMeta infosMeta[] = {
|
||||||
{TSDB_INS_TABLE_USER_FUNCTIONS, userFuncSchema, tListLen(userFuncSchema)},
|
{TSDB_INS_TABLE_USER_FUNCTIONS, userFuncSchema, tListLen(userFuncSchema)},
|
||||||
{TSDB_INS_TABLE_USER_INDEXES, userIdxSchema, tListLen(userIdxSchema)},
|
{TSDB_INS_TABLE_USER_INDEXES, userIdxSchema, tListLen(userIdxSchema)},
|
||||||
{TSDB_INS_TABLE_USER_STABLES, userStbsSchema, tListLen(userStbsSchema)},
|
{TSDB_INS_TABLE_USER_STABLES, userStbsSchema, tListLen(userStbsSchema)},
|
||||||
{TSDB_INS_TABLE_USER_STREAMS, userStreamsSchema, tListLen(userStreamsSchema)},
|
|
||||||
{TSDB_INS_TABLE_USER_TABLES, userTblsSchema, tListLen(userTblsSchema)},
|
{TSDB_INS_TABLE_USER_TABLES, userTblsSchema, tListLen(userTblsSchema)},
|
||||||
{TSDB_INS_TABLE_USER_TABLE_DISTRIBUTED, userTblDistSchema, tListLen(userTblDistSchema)},
|
{TSDB_INS_TABLE_USER_TABLE_DISTRIBUTED, userTblDistSchema, tListLen(userTblDistSchema)},
|
||||||
{TSDB_INS_TABLE_USER_USERS, userUsersSchema, tListLen(userUsersSchema)},
|
{TSDB_INS_TABLE_USER_USERS, userUsersSchema, tListLen(userUsersSchema)},
|
||||||
|
|
|
@ -76,6 +76,18 @@ static const SPerfsTableSchema offsetSchema[] = {
|
||||||
{.name = "skip_log_cnt", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT},
|
{.name = "skip_log_cnt", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT},
|
||||||
};
|
};
|
||||||
|
|
||||||
|
static const SPerfsTableSchema streamSchema[] = {
|
||||||
|
{.name = "stream_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||||
|
{.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP},
|
||||||
|
{.name = "sql", .bytes = TSDB_SHOW_SQL_LEN, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||||
|
{.name = "status", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
|
||||||
|
{.name = "source_db", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||||
|
{.name = "target_db", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||||
|
{.name = "target_table", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||||
|
{.name = "watermark", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT},
|
||||||
|
{.name = "trigger", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
||||||
|
};
|
||||||
|
|
||||||
static const SPerfsTableMeta perfsMeta[] = {
|
static const SPerfsTableMeta perfsMeta[] = {
|
||||||
{TSDB_PERFS_TABLE_CONNECTIONS, connectionsSchema, tListLen(connectionsSchema)},
|
{TSDB_PERFS_TABLE_CONNECTIONS, connectionsSchema, tListLen(connectionsSchema)},
|
||||||
{TSDB_PERFS_TABLE_QUERIES, queriesSchema, tListLen(queriesSchema)},
|
{TSDB_PERFS_TABLE_QUERIES, queriesSchema, tListLen(queriesSchema)},
|
||||||
|
@ -83,6 +95,7 @@ static const SPerfsTableMeta perfsMeta[] = {
|
||||||
{TSDB_PERFS_TABLE_CONSUMERS, consumerSchema, tListLen(consumerSchema)},
|
{TSDB_PERFS_TABLE_CONSUMERS, consumerSchema, tListLen(consumerSchema)},
|
||||||
{TSDB_PERFS_TABLE_SUBSCRIPTIONS, subscriptionSchema, tListLen(subscriptionSchema)},
|
{TSDB_PERFS_TABLE_SUBSCRIPTIONS, subscriptionSchema, tListLen(subscriptionSchema)},
|
||||||
{TSDB_PERFS_TABLE_OFFSETS, offsetSchema, tListLen(offsetSchema)},
|
{TSDB_PERFS_TABLE_OFFSETS, offsetSchema, tListLen(offsetSchema)},
|
||||||
|
{TSDB_PERFS_TABLE_STREAMS, streamSchema, tListLen(streamSchema)},
|
||||||
};
|
};
|
||||||
|
|
||||||
// connection/application/
|
// connection/application/
|
||||||
|
|
|
@ -382,7 +382,7 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
|
||||||
pTask->dispatchType = TASK_DISPATCH__SHUFFLE;
|
pTask->dispatchType = TASK_DISPATCH__SHUFFLE;
|
||||||
|
|
||||||
pTask->dispatchMsgType = TDMT_VND_TASK_WRITE_EXEC;
|
pTask->dispatchMsgType = TDMT_VND_TASK_WRITE_EXEC;
|
||||||
SDbObj* pDb = mndAcquireDb(pMnode, pStream->db);
|
SDbObj* pDb = mndAcquireDb(pMnode, pStream->sourceDb);
|
||||||
ASSERT(pDb);
|
ASSERT(pDb);
|
||||||
if (mndExtractDbInfo(pMnode, pDb, &pTask->shuffleDispatcher.dbInfo, NULL) < 0) {
|
if (mndExtractDbInfo(pMnode, pDb, &pTask->shuffleDispatcher.dbInfo, NULL) < 0) {
|
||||||
sdbRelease(pSdb, pDb);
|
sdbRelease(pSdb, pDb);
|
||||||
|
|
|
@ -40,7 +40,7 @@ static int32_t mndProcessMCreateSmaReq(SNodeMsg *pReq);
|
||||||
static int32_t mndProcessMDropSmaReq(SNodeMsg *pReq);
|
static int32_t mndProcessMDropSmaReq(SNodeMsg *pReq);
|
||||||
static int32_t mndProcessVCreateSmaRsp(SNodeMsg *pRsp);
|
static int32_t mndProcessVCreateSmaRsp(SNodeMsg *pRsp);
|
||||||
static int32_t mndProcessVDropSmaRsp(SNodeMsg *pRsp);
|
static int32_t mndProcessVDropSmaRsp(SNodeMsg *pRsp);
|
||||||
static int32_t mndRetrieveSma(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock* pBlock, int32_t rows);
|
static int32_t mndRetrieveSma(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
|
||||||
static void mndCancelGetNextSma(SMnode *pMnode, void *pIter);
|
static void mndCancelGetNextSma(SMnode *pMnode, void *pIter);
|
||||||
|
|
||||||
int32_t mndInitSma(SMnode *pMnode) {
|
int32_t mndInitSma(SMnode *pMnode) {
|
||||||
|
@ -406,7 +406,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SNodeMsg *pReq, SMCreateSmaReq *pCre
|
||||||
|
|
||||||
SStreamObj streamObj = {0};
|
SStreamObj streamObj = {0};
|
||||||
tstrncpy(streamObj.name, pCreate->name, TSDB_STREAM_FNAME_LEN);
|
tstrncpy(streamObj.name, pCreate->name, TSDB_STREAM_FNAME_LEN);
|
||||||
tstrncpy(streamObj.db, pDb->name, TSDB_DB_FNAME_LEN);
|
tstrncpy(streamObj.sourceDb, pDb->name, TSDB_DB_FNAME_LEN);
|
||||||
streamObj.createTime = taosGetTimestampMs();
|
streamObj.createTime = taosGetTimestampMs();
|
||||||
streamObj.updateTime = streamObj.createTime;
|
streamObj.updateTime = streamObj.createTime;
|
||||||
streamObj.uid = mndGenerateUid(pCreate->name, strlen(pCreate->name));
|
streamObj.uid = mndGenerateUid(pCreate->name, strlen(pCreate->name));
|
||||||
|
@ -707,7 +707,8 @@ int32_t mndProcessGetSmaReq(SMnode *pMnode, SUserIndexReq *indexReq, SUserI
|
||||||
SNode *node = NULL;
|
SNode *node = NULL;
|
||||||
FOREACH(node, pList) {
|
FOREACH(node, pList) {
|
||||||
SFunctionNode *pFunc = (SFunctionNode *)node;
|
SFunctionNode *pFunc = (SFunctionNode *)node;
|
||||||
extOffset += snprintf(rsp->indexExts + extOffset, sizeof(rsp->indexExts) - extOffset - 1, "%s%s", (extOffset ? ",":""), pFunc->functionName);
|
extOffset += snprintf(rsp->indexExts + extOffset, sizeof(rsp->indexExts) - extOffset - 1, "%s%s",
|
||||||
|
(extOffset ? "," : ""), pFunc->functionName);
|
||||||
}
|
}
|
||||||
|
|
||||||
*exist = true;
|
*exist = true;
|
||||||
|
@ -718,13 +719,12 @@ int32_t mndProcessGetSmaReq(SMnode *pMnode, SUserIndexReq *indexReq, SUserI
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
static int32_t mndProcessVDropSmaRsp(SNodeMsg *pRsp) {
|
static int32_t mndProcessVDropSmaRsp(SNodeMsg *pRsp) {
|
||||||
mndTransProcessRsp(pRsp);
|
mndTransProcessRsp(pRsp);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndRetrieveSma(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock* pBlock, int32_t rows) {
|
static int32_t mndRetrieveSma(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
|
||||||
SMnode *pMnode = pReq->pNode;
|
SMnode *pMnode = pReq->pNode;
|
||||||
SSdb *pSdb = pMnode->pSdb;
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
int32_t numOfRows = 0;
|
int32_t numOfRows = 0;
|
||||||
|
@ -758,8 +758,8 @@ static int32_t mndRetrieveSma(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock* pBlo
|
||||||
char n1[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
char n1[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||||
STR_TO_VARSTR(n1, (char *)tNameGetTableName(&stbName));
|
STR_TO_VARSTR(n1, (char *)tNameGetTableName(&stbName));
|
||||||
|
|
||||||
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
colDataAppend(pColInfo, numOfRows, (const char*) n, false);
|
colDataAppend(pColInfo, numOfRows, (const char *)n, false);
|
||||||
|
|
||||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
colDataAppend(pColInfo, numOfRows, (const char *)&pSma->createdTime, false);
|
colDataAppend(pColInfo, numOfRows, (const char *)&pSma->createdTime, false);
|
||||||
|
|
|
@ -40,7 +40,7 @@ static int32_t mndProcessTaskDeployInternalRsp(SNodeMsg *pRsp);
|
||||||
/*static int32_t mndProcessDropStreamInRsp(SNodeMsg *pRsp);*/
|
/*static int32_t mndProcessDropStreamInRsp(SNodeMsg *pRsp);*/
|
||||||
static int32_t mndProcessStreamMetaReq(SNodeMsg *pReq);
|
static int32_t mndProcessStreamMetaReq(SNodeMsg *pReq);
|
||||||
static int32_t mndGetStreamMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta);
|
static int32_t mndGetStreamMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta);
|
||||||
static int32_t mndRetrieveStream(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
|
static int32_t mndRetrieveStream(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
|
||||||
static void mndCancelGetNextStream(SMnode *pMnode, void *pIter);
|
static void mndCancelGetNextStream(SMnode *pMnode, void *pIter);
|
||||||
|
|
||||||
int32_t mndInitStream(SMnode *pMnode) {
|
int32_t mndInitStream(SMnode *pMnode) {
|
||||||
|
@ -58,8 +58,8 @@ int32_t mndInitStream(SMnode *pMnode) {
|
||||||
/*mndSetMsgHandle(pMnode, TDMT_MND_DROP_STREAM, mndProcessDropStreamReq);*/
|
/*mndSetMsgHandle(pMnode, TDMT_MND_DROP_STREAM, mndProcessDropStreamReq);*/
|
||||||
/*mndSetMsgHandle(pMnode, TDMT_MND_DROP_STREAM_RSP, mndProcessDropStreamInRsp);*/
|
/*mndSetMsgHandle(pMnode, TDMT_MND_DROP_STREAM_RSP, mndProcessDropStreamInRsp);*/
|
||||||
|
|
||||||
// mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndRetrieveStream);
|
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STREAMS, mndRetrieveStream);
|
||||||
/*mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndCancelGetNextStream);*/
|
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_STREAMS, mndCancelGetNextStream);
|
||||||
|
|
||||||
return sdbSetTable(pMnode->pSdb, table);
|
return sdbSetTable(pMnode->pSdb, table);
|
||||||
}
|
}
|
||||||
|
@ -294,8 +294,8 @@ static int32_t mndCreateStream(SMnode *pMnode, SNodeMsg *pReq, SCMCreateStreamRe
|
||||||
mDebug("stream:%s to create", pCreate->name);
|
mDebug("stream:%s to create", pCreate->name);
|
||||||
SStreamObj streamObj = {0};
|
SStreamObj streamObj = {0};
|
||||||
tstrncpy(streamObj.name, pCreate->name, TSDB_STREAM_FNAME_LEN);
|
tstrncpy(streamObj.name, pCreate->name, TSDB_STREAM_FNAME_LEN);
|
||||||
tstrncpy(streamObj.db, pDb->name, TSDB_DB_FNAME_LEN);
|
tstrncpy(streamObj.sourceDb, pDb->name, TSDB_DB_FNAME_LEN);
|
||||||
tstrncpy(streamObj.outputSTbName, pCreate->outputSTbName, TSDB_TABLE_FNAME_LEN);
|
tstrncpy(streamObj.targetSTbName, pCreate->outputSTbName, TSDB_TABLE_FNAME_LEN);
|
||||||
streamObj.createTime = taosGetTimestampMs();
|
streamObj.createTime = taosGetTimestampMs();
|
||||||
streamObj.updateTime = streamObj.createTime;
|
streamObj.updateTime = streamObj.createTime;
|
||||||
streamObj.uid = mndGenerateUid(pCreate->name, strlen(pCreate->name));
|
streamObj.uid = mndGenerateUid(pCreate->name, strlen(pCreate->name));
|
||||||
|
@ -424,58 +424,55 @@ static int32_t mndGetNumOfStreams(SMnode *pMnode, char *dbName, int32_t *pNumOfS
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndRetrieveStream(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) {
|
static int32_t mndRetrieveStream(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
|
||||||
SMnode *pMnode = pReq->pNode;
|
SMnode *pMnode = pReq->pNode;
|
||||||
SSdb *pSdb = pMnode->pSdb;
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
int32_t numOfRows = 0;
|
int32_t numOfRows = 0;
|
||||||
SStreamObj *pStream = NULL;
|
SStreamObj *pStream = NULL;
|
||||||
int32_t cols = 0;
|
|
||||||
char *pWrite;
|
|
||||||
char prefix[TSDB_DB_FNAME_LEN] = {0};
|
|
||||||
|
|
||||||
SDbObj *pDb = mndAcquireDb(pMnode, pShow->db);
|
|
||||||
if (pDb == NULL) return 0;
|
|
||||||
|
|
||||||
tstrncpy(prefix, pShow->db, TSDB_DB_FNAME_LEN);
|
|
||||||
strcat(prefix, TS_PATH_DELIMITER);
|
|
||||||
int32_t prefixLen = (int32_t)strlen(prefix);
|
|
||||||
|
|
||||||
while (numOfRows < rows) {
|
while (numOfRows < rows) {
|
||||||
pShow->pIter = sdbFetch(pSdb, SDB_STREAM, pShow->pIter, (void **)&pStream);
|
pShow->pIter = sdbFetch(pSdb, SDB_TOPIC, pShow->pIter, (void **)&pStream);
|
||||||
if (pShow->pIter == NULL) break;
|
if (pShow->pIter == NULL) break;
|
||||||
|
|
||||||
if (pStream->dbUid != pDb->uid) {
|
SColumnInfoData *pColInfo;
|
||||||
if (strncmp(pStream->name, prefix, prefixLen) != 0) {
|
SName n;
|
||||||
mError("Inconsistent stream data, name:%s, db:%s, dbUid:%" PRIu64, pStream->name, pDb->name, pDb->uid);
|
int32_t cols = 0;
|
||||||
|
|
||||||
|
char streamName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||||
|
tNameFromString(&n, pStream->name, T_NAME_ACCT | T_NAME_DB);
|
||||||
|
tNameGetDbName(&n, varDataVal(streamName));
|
||||||
|
varDataSetLen(streamName, strlen(varDataVal(streamName)));
|
||||||
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
|
colDataAppend(pColInfo, numOfRows, (const char *)streamName, false);
|
||||||
|
|
||||||
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
|
colDataAppend(pColInfo, numOfRows, (const char *)&pStream->createTime, false);
|
||||||
|
|
||||||
|
char sql[TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||||
|
tstrncpy(&sql[VARSTR_HEADER_SIZE], pStream->sql, TSDB_SHOW_SQL_LEN);
|
||||||
|
varDataSetLen(sql, strlen(&sql[VARSTR_HEADER_SIZE]));
|
||||||
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
|
colDataAppend(pColInfo, numOfRows, (const char *)sql, false);
|
||||||
|
|
||||||
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
|
colDataAppend(pColInfo, numOfRows, (const char *)&pStream->status, true);
|
||||||
|
|
||||||
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
|
colDataAppend(pColInfo, numOfRows, (const char *)&pStream->sourceDb, true);
|
||||||
|
|
||||||
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
|
colDataAppend(pColInfo, numOfRows, (const char *)&pStream->targetDb, true);
|
||||||
|
|
||||||
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
|
colDataAppend(pColInfo, numOfRows, (const char *)&pStream->targetSTbName, true);
|
||||||
|
|
||||||
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
|
colDataAppend(pColInfo, numOfRows, (const char *)&pStream->waterMark, false);
|
||||||
|
|
||||||
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
|
colDataAppend(pColInfo, numOfRows, (const char *)&pStream->trigger, false);
|
||||||
}
|
}
|
||||||
|
return 0;
|
||||||
sdbRelease(pSdb, pStream);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
cols = 0;
|
|
||||||
|
|
||||||
char streamName[TSDB_TABLE_NAME_LEN] = {0};
|
|
||||||
tstrncpy(streamName, pStream->name + prefixLen, TSDB_TABLE_NAME_LEN);
|
|
||||||
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
|
||||||
STR_TO_VARSTR(pWrite, streamName);
|
|
||||||
cols++;
|
|
||||||
|
|
||||||
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
|
||||||
*(int64_t *)pWrite = pStream->createTime;
|
|
||||||
cols++;
|
|
||||||
|
|
||||||
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
|
||||||
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pStream->sql, pShow->bytes[cols]);
|
|
||||||
cols++;
|
|
||||||
|
|
||||||
numOfRows++;
|
|
||||||
sdbRelease(pSdb, pStream);
|
|
||||||
}
|
|
||||||
|
|
||||||
mndReleaseDb(pMnode, pDb);
|
|
||||||
pShow->numOfRows += numOfRows;
|
|
||||||
return numOfRows;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static void mndCancelGetNextStream(SMnode *pMnode, void *pIter) {
|
static void mndCancelGetNextStream(SMnode *pMnode, void *pIter) {
|
||||||
|
|
|
@ -309,9 +309,6 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
|
||||||
newConsumerEp.consumerId = consumerId;
|
newConsumerEp.consumerId = consumerId;
|
||||||
newConsumerEp.vgs = taosArrayInit(0, sizeof(void *));
|
newConsumerEp.vgs = taosArrayInit(0, sizeof(void *));
|
||||||
taosHashPut(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t), &newConsumerEp, sizeof(SMqConsumerEp));
|
taosHashPut(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t), &newConsumerEp, sizeof(SMqConsumerEp));
|
||||||
/*SMqConsumer* pTestNew = taosHashGet(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t));*/
|
|
||||||
/*ASSERT(pTestNew->consumerId == consumerId);*/
|
|
||||||
/*ASSERT(pTestNew->vgs == newConsumerEp.vgs);*/
|
|
||||||
taosArrayPush(pOutput->newConsumers, &consumerId);
|
taosArrayPush(pOutput->newConsumers, &consumerId);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -369,7 +366,13 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// 8. generate logs
|
// 8. TODO generate logs
|
||||||
|
mInfo("rebalance calculation completed, rebalanced vg:");
|
||||||
|
for (int32_t i = 0; i < taosArrayGetSize(pOutput->rebVgs); i++) {
|
||||||
|
SMqRebOutputVg *pOutputRebVg = taosArrayGet(pOutput->rebVgs, i);
|
||||||
|
mInfo("vg: %d moved from consumer %ld to consumer %ld", pOutputRebVg->pVgEp->vgId, pOutputRebVg->oldConsumerId,
|
||||||
|
pOutputRebVg->newConsumerId);
|
||||||
|
}
|
||||||
|
|
||||||
// 9. clear
|
// 9. clear
|
||||||
taosHashCleanup(pHash);
|
taosHashCleanup(pHash);
|
||||||
|
@ -447,7 +450,9 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SNodeMsg *pMsg, const SMqRebO
|
||||||
goto REB_FAIL;
|
goto REB_FAIL;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// 4. commit log: modification log
|
// 4. TODO commit log: modification log
|
||||||
|
|
||||||
|
// 5. execution
|
||||||
if (mndTransPrepare(pMnode, pTrans) != 0) goto REB_FAIL;
|
if (mndTransPrepare(pMnode, pTrans) != 0) goto REB_FAIL;
|
||||||
|
|
||||||
mndTransDrop(pTrans);
|
mndTransDrop(pTrans);
|
||||||
|
|
|
@ -6,5 +6,5 @@ target_include_directories(
|
||||||
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
|
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
|
||||||
)
|
)
|
||||||
target_link_libraries(
|
target_link_libraries(
|
||||||
sdb os common util
|
sdb os common util wal
|
||||||
)
|
)
|
|
@ -16,6 +16,7 @@
|
||||||
#define _DEFAULT_SOURCE
|
#define _DEFAULT_SOURCE
|
||||||
#include "sdbInt.h"
|
#include "sdbInt.h"
|
||||||
#include "tchecksum.h"
|
#include "tchecksum.h"
|
||||||
|
#include "wal.h"
|
||||||
|
|
||||||
#define SDB_TABLE_SIZE 24
|
#define SDB_TABLE_SIZE 24
|
||||||
#define SDB_RESERVE_SIZE 512
|
#define SDB_RESERVE_SIZE 512
|
||||||
|
@ -137,7 +138,7 @@ int32_t sdbReadFile(SSdb *pSdb) {
|
||||||
int32_t readLen = 0;
|
int32_t readLen = 0;
|
||||||
int64_t ret = 0;
|
int64_t ret = 0;
|
||||||
|
|
||||||
SSdbRaw *pRaw = taosMemoryMalloc(SDB_MAX_SIZE);
|
SSdbRaw *pRaw = taosMemoryMalloc(WAL_MAX_SIZE + 100);
|
||||||
if (pRaw == NULL) {
|
if (pRaw == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
mError("failed read file since %s", terrstr());
|
mError("failed read file since %s", terrstr());
|
||||||
|
|
|
@ -17,7 +17,7 @@
|
||||||
#include "qndInt.h"
|
#include "qndInt.h"
|
||||||
#include "query.h"
|
#include "query.h"
|
||||||
#include "qworker.h"
|
#include "qworker.h"
|
||||||
//#include "tudf.h"
|
#include "libs/function/function.h"
|
||||||
|
|
||||||
SQnode *qndOpen(const SQnodeOpt *pOption) {
|
SQnode *qndOpen(const SQnodeOpt *pOption) {
|
||||||
SQnode *pQnode = taosMemoryCalloc(1, sizeof(SQnode));
|
SQnode *pQnode = taosMemoryCalloc(1, sizeof(SQnode));
|
||||||
|
@ -26,7 +26,9 @@ SQnode *qndOpen(const SQnodeOpt *pOption) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
//udfcOpen();
|
if (udfcOpen() != 0) {
|
||||||
|
qError("qnode can not open udfc");
|
||||||
|
}
|
||||||
|
|
||||||
if (qWorkerInit(NODE_TYPE_QNODE, pQnode->qndId, NULL, (void **)&pQnode->pQuery, &pOption->msgCb)) {
|
if (qWorkerInit(NODE_TYPE_QNODE, pQnode->qndId, NULL, (void **)&pQnode->pQuery, &pOption->msgCb)) {
|
||||||
taosMemoryFreeClear(pQnode);
|
taosMemoryFreeClear(pQnode);
|
||||||
|
@ -40,7 +42,7 @@ SQnode *qndOpen(const SQnodeOpt *pOption) {
|
||||||
void qndClose(SQnode *pQnode) {
|
void qndClose(SQnode *pQnode) {
|
||||||
qWorkerDestroy((void **)&pQnode->pQuery);
|
qWorkerDestroy((void **)&pQnode->pQuery);
|
||||||
|
|
||||||
//udfcClose();
|
udfcClose();
|
||||||
|
|
||||||
taosMemoryFree(pQnode);
|
taosMemoryFree(pQnode);
|
||||||
}
|
}
|
||||||
|
|
|
@ -61,7 +61,6 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg
|
||||||
pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
|
pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
|
||||||
len = pMsg->contLen - sizeof(SMsgHead);
|
len = pMsg->contLen - sizeof(SMsgHead);
|
||||||
|
|
||||||
// todo: change the interface here
|
|
||||||
if (tqPushMsg(pVnode->pTq, pMsg->pCont, pMsg->contLen, pMsg->msgType, version) < 0) {
|
if (tqPushMsg(pVnode->pTq, pMsg->pCont, pMsg->contLen, pMsg->msgType, version) < 0) {
|
||||||
vError("vgId: %d failed to push msg to TQ since %s", TD_VID(pVnode), tstrerror(terrno));
|
vError("vgId: %d failed to push msg to TQ since %s", TD_VID(pVnode), tstrerror(terrno));
|
||||||
return -1;
|
return -1;
|
||||||
|
|
|
@ -51,6 +51,21 @@ target_link_libraries(
|
||||||
udf1 PUBLIC os
|
udf1 PUBLIC os
|
||||||
)
|
)
|
||||||
|
|
||||||
|
add_library(udf2 MODULE test/udf2.c)
|
||||||
|
target_include_directories(
|
||||||
|
udf2
|
||||||
|
PUBLIC
|
||||||
|
"${TD_SOURCE_DIR}/include/libs/function"
|
||||||
|
"${TD_SOURCE_DIR}/include/util"
|
||||||
|
"${TD_SOURCE_DIR}/include/common"
|
||||||
|
"${TD_SOURCE_DIR}/include/client"
|
||||||
|
"${TD_SOURCE_DIR}/include/os"
|
||||||
|
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
|
||||||
|
)
|
||||||
|
target_link_libraries(
|
||||||
|
udf2 PUBLIC os
|
||||||
|
)
|
||||||
|
|
||||||
#SET(EXECUTABLE_OUTPUT_PATH ${CMAKE_BINARY_DIR}/build/bin)
|
#SET(EXECUTABLE_OUTPUT_PATH ${CMAKE_BINARY_DIR}/build/bin)
|
||||||
add_executable(udfd src/udfd.c)
|
add_executable(udfd src/udfd.c)
|
||||||
target_include_directories(
|
target_include_directories(
|
||||||
|
|
|
@ -25,6 +25,7 @@ extern "C" {
|
||||||
|
|
||||||
bool functionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
|
bool functionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
|
||||||
int32_t functionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
int32_t functionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
||||||
|
int32_t functionFinalizeWithResultBuf(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, char* finalResult);
|
||||||
|
|
||||||
EFuncDataRequired countDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWindow);
|
EFuncDataRequired countDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWindow);
|
||||||
bool getCountFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
bool getCountFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
||||||
|
|
|
@ -43,6 +43,9 @@ typedef struct SUdfSetupRequest {
|
||||||
|
|
||||||
typedef struct SUdfSetupResponse {
|
typedef struct SUdfSetupResponse {
|
||||||
int64_t udfHandle;
|
int64_t udfHandle;
|
||||||
|
int8_t outputType;
|
||||||
|
int32_t outputLen;
|
||||||
|
int32_t bufSize;
|
||||||
} SUdfSetupResponse;
|
} SUdfSetupResponse;
|
||||||
|
|
||||||
typedef struct SUdfCallRequest {
|
typedef struct SUdfCallRequest {
|
||||||
|
|
|
@ -139,6 +139,20 @@ int32_t functionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||||
return pResInfo->numOfRes;
|
return pResInfo->numOfRes;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t functionFinalizeWithResultBuf(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, char* finalResult) {
|
||||||
|
int32_t slotId = pCtx->pExpr->base.resSchema.slotId;
|
||||||
|
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);
|
||||||
|
|
||||||
|
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
||||||
|
pResInfo->isNullRes = (pResInfo->numOfRes == 0)? 1:0;
|
||||||
|
cleanupResultRowEntry(pResInfo);
|
||||||
|
|
||||||
|
char* in = finalResult;
|
||||||
|
colDataAppend(pCol, pBlock->info.rows, in, pResInfo->isNullRes);
|
||||||
|
|
||||||
|
return pResInfo->numOfRes;
|
||||||
|
}
|
||||||
|
|
||||||
EFuncDataRequired countDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWindow) {
|
EFuncDataRequired countDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWindow) {
|
||||||
SNode* pParam = nodesListGetNode(pFunc->pParameterList, 0);
|
SNode* pParam = nodesListGetNode(pFunc->pParameterList, 0);
|
||||||
if (QUERY_NODE_COLUMN == nodeType(pParam) && PRIMARYKEY_TIMESTAMP_COL_ID == ((SColumnNode*)pParam)->colId) {
|
if (QUERY_NODE_COLUMN == nodeType(pParam) && PRIMARYKEY_TIMESTAMP_COL_ID == ((SColumnNode*)pParam)->colId) {
|
||||||
|
|
|
@ -21,6 +21,10 @@
|
||||||
#include "taos.h"
|
#include "taos.h"
|
||||||
#include "taoserror.h"
|
#include "taoserror.h"
|
||||||
#include "thash.h"
|
#include "thash.h"
|
||||||
|
#include "builtins.h"
|
||||||
|
#include "catalog.h"
|
||||||
|
#include "tudf.h"
|
||||||
|
|
||||||
|
|
||||||
typedef struct SFuncMgtService {
|
typedef struct SFuncMgtService {
|
||||||
SHashObj* pFuncNameHashTable;
|
SHashObj* pFuncNameHashTable;
|
||||||
|
@ -120,6 +124,14 @@ int32_t fmGetFuncExecFuncs(int32_t funcId, SFuncExecFuncs* pFpSet) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t fmGetUdafExecFuncs(SFuncExecFuncs* pFpSet) {
|
||||||
|
pFpSet->getEnv = udfAggGetEnv;
|
||||||
|
pFpSet->init = udfAggInit;
|
||||||
|
pFpSet->process = udfAggProcess;
|
||||||
|
pFpSet->finalize = udfAggFinalize;
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t fmGetScalarFuncExecFuncs(int32_t funcId, SScalarFuncExecFuncs* pFpSet) {
|
int32_t fmGetScalarFuncExecFuncs(int32_t funcId, SScalarFuncExecFuncs* pFpSet) {
|
||||||
if (fmIsUserDefinedFunc(funcId) || funcId < 0 || funcId >= funcMgtBuiltinsNum) {
|
if (fmIsUserDefinedFunc(funcId) || funcId < 0 || funcId >= funcMgtBuiltinsNum) {
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
|
|
|
@ -19,6 +19,9 @@
|
||||||
#include "tudfInt.h"
|
#include "tudfInt.h"
|
||||||
#include "tarray.h"
|
#include "tarray.h"
|
||||||
#include "tdatablock.h"
|
#include "tdatablock.h"
|
||||||
|
#include "querynodes.h"
|
||||||
|
#include "builtinsimpl.h"
|
||||||
|
#include "functionMgt.h"
|
||||||
|
|
||||||
//TODO: network error processing.
|
//TODO: network error processing.
|
||||||
//TODO: add unit test
|
//TODO: add unit test
|
||||||
|
@ -147,6 +150,10 @@ typedef struct SUdfUvSession {
|
||||||
SUdfdProxy *udfc;
|
SUdfdProxy *udfc;
|
||||||
int64_t severHandle;
|
int64_t severHandle;
|
||||||
uv_pipe_t *udfSvcPipe;
|
uv_pipe_t *udfSvcPipe;
|
||||||
|
|
||||||
|
int8_t outputType;
|
||||||
|
int32_t outputLen;
|
||||||
|
int32_t bufSize;
|
||||||
} SUdfUvSession;
|
} SUdfUvSession;
|
||||||
|
|
||||||
typedef struct SClientUvTaskNode {
|
typedef struct SClientUvTaskNode {
|
||||||
|
@ -235,12 +242,14 @@ void* decodeUdfSetupRequest(const void* buf, SUdfSetupRequest *request) {
|
||||||
|
|
||||||
int32_t encodeUdfInterBuf(void **buf, const SUdfInterBuf* state) {
|
int32_t encodeUdfInterBuf(void **buf, const SUdfInterBuf* state) {
|
||||||
int32_t len = 0;
|
int32_t len = 0;
|
||||||
|
len += taosEncodeFixedI8(buf, state->numOfResult);
|
||||||
len += taosEncodeFixedI32(buf, state->bufLen);
|
len += taosEncodeFixedI32(buf, state->bufLen);
|
||||||
len += taosEncodeBinary(buf, state->buf, state->bufLen);
|
len += taosEncodeBinary(buf, state->buf, state->bufLen);
|
||||||
return len;
|
return len;
|
||||||
}
|
}
|
||||||
|
|
||||||
void* decodeUdfInterBuf(const void* buf, SUdfInterBuf* state) {
|
void* decodeUdfInterBuf(const void* buf, SUdfInterBuf* state) {
|
||||||
|
buf = taosDecodeFixedI8(buf, &state->numOfResult);
|
||||||
buf = taosDecodeFixedI32(buf, &state->bufLen);
|
buf = taosDecodeFixedI32(buf, &state->bufLen);
|
||||||
buf = taosDecodeBinary(buf, (void**)&state->buf, state->bufLen);
|
buf = taosDecodeBinary(buf, (void**)&state->buf, state->bufLen);
|
||||||
return (void*)buf;
|
return (void*)buf;
|
||||||
|
@ -342,11 +351,17 @@ void* decodeUdfRequest(const void* buf, SUdfRequest* request) {
|
||||||
int32_t encodeUdfSetupResponse(void **buf, const SUdfSetupResponse *setupRsp) {
|
int32_t encodeUdfSetupResponse(void **buf, const SUdfSetupResponse *setupRsp) {
|
||||||
int32_t len = 0;
|
int32_t len = 0;
|
||||||
len += taosEncodeFixedI64(buf, setupRsp->udfHandle);
|
len += taosEncodeFixedI64(buf, setupRsp->udfHandle);
|
||||||
|
len += taosEncodeFixedI8(buf, setupRsp->outputType);
|
||||||
|
len += taosEncodeFixedI32(buf, setupRsp->outputLen);
|
||||||
|
len += taosEncodeFixedI32(buf, setupRsp->bufSize);
|
||||||
return len;
|
return len;
|
||||||
}
|
}
|
||||||
|
|
||||||
void* decodeUdfSetupResponse(const void* buf, SUdfSetupResponse* setupRsp) {
|
void* decodeUdfSetupResponse(const void* buf, SUdfSetupResponse* setupRsp) {
|
||||||
buf = taosDecodeFixedI64(buf, &setupRsp->udfHandle);
|
buf = taosDecodeFixedI64(buf, &setupRsp->udfHandle);
|
||||||
|
buf = taosDecodeFixedI8(buf, &setupRsp->outputType);
|
||||||
|
buf = taosDecodeFixedI32(buf, &setupRsp->outputLen);
|
||||||
|
buf = taosDecodeFixedI32(buf, &setupRsp->bufSize);
|
||||||
return (void*)buf;
|
return (void*)buf;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1049,6 +1064,9 @@ int32_t setupUdf(char udfName[], UdfcFuncHandle *funcHandle) {
|
||||||
|
|
||||||
SUdfSetupResponse *rsp = &task->_setup.rsp;
|
SUdfSetupResponse *rsp = &task->_setup.rsp;
|
||||||
task->session->severHandle = rsp->udfHandle;
|
task->session->severHandle = rsp->udfHandle;
|
||||||
|
task->session->outputType = rsp->outputType;
|
||||||
|
task->session->outputLen = rsp->outputLen;
|
||||||
|
task->session->bufSize = rsp->bufSize;
|
||||||
if (task->errCode != 0) {
|
if (task->errCode != 0) {
|
||||||
fnError("failed to setup udf. err: %d", task->errCode)
|
fnError("failed to setup udf. err: %d", task->errCode)
|
||||||
} else {
|
} else {
|
||||||
|
@ -1197,3 +1215,122 @@ int32_t teardownUdf(UdfcFuncHandle handle) {
|
||||||
|
|
||||||
return err;
|
return err;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//memory layout |---SUdfAggRes----|-----final result-----|---inter result----|
|
||||||
|
typedef struct SUdfAggRes {
|
||||||
|
SUdfUvSession *session;
|
||||||
|
int8_t finalResNum;
|
||||||
|
int8_t interResNum;
|
||||||
|
char* finalResBuf;
|
||||||
|
char* interResBuf;
|
||||||
|
} SUdfAggRes;
|
||||||
|
|
||||||
|
bool udfAggGetEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
|
||||||
|
if (fmIsScalarFunc(pFunc->funcId)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
pEnv->calcMemSize = sizeof(SUdfAggRes) + pFunc->node.resType.bytes + pFunc->udfBufSize;
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo* pResultCellInfo) {
|
||||||
|
if (functionSetup(pCtx, pResultCellInfo) != true) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
UdfcFuncHandle handle;
|
||||||
|
if (setupUdf((char*)pCtx->udfName, &handle) != 0) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
SUdfUvSession *session = (SUdfUvSession *)handle;
|
||||||
|
SUdfAggRes *udfRes = (SUdfAggRes*)GET_ROWCELL_INTERBUF(pResultCellInfo);
|
||||||
|
udfRes->finalResBuf = (char*)udfRes + sizeof(SUdfAggRes);
|
||||||
|
udfRes->interResBuf = (char*)udfRes + sizeof(SUdfAggRes) + session->outputLen;
|
||||||
|
|
||||||
|
int32_t envSize = sizeof(SUdfAggRes) + session->outputLen + session->bufSize;
|
||||||
|
memset(udfRes, 0, envSize);
|
||||||
|
|
||||||
|
udfRes->session = (SUdfUvSession *)handle;
|
||||||
|
SUdfInterBuf buf = {0};
|
||||||
|
if (callUdfAggInit(handle, &buf) != 0) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
udfRes->interResNum = buf.numOfResult;
|
||||||
|
memcpy(udfRes->interResBuf, buf.buf, buf.bufLen);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t udfAggProcess(struct SqlFunctionCtx *pCtx) {
|
||||||
|
|
||||||
|
SInputColumnInfoData* pInput = &pCtx->input;
|
||||||
|
int32_t numOfCols = pInput->numOfInputCols;
|
||||||
|
|
||||||
|
SUdfAggRes* udfRes = (SUdfAggRes *)GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
||||||
|
SUdfUvSession *session = udfRes->session;
|
||||||
|
udfRes->finalResBuf = (char*)udfRes + sizeof(SUdfAggRes);
|
||||||
|
udfRes->interResBuf = (char*)udfRes + sizeof(SUdfAggRes) + session->outputLen;
|
||||||
|
|
||||||
|
int32_t start = pInput->startRowIndex;
|
||||||
|
int32_t numOfRows = pInput->numOfRows;
|
||||||
|
|
||||||
|
|
||||||
|
SSDataBlock tempBlock = {0};
|
||||||
|
tempBlock.info.numOfCols = numOfCols;
|
||||||
|
tempBlock.info.rows = numOfRows;
|
||||||
|
tempBlock.info.uid = pInput->uid;
|
||||||
|
bool hasVarCol = false;
|
||||||
|
tempBlock.pDataBlock = taosArrayInit(numOfCols, sizeof(SColumnInfoData));
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||||
|
SColumnInfoData *col = pInput->pData[i];
|
||||||
|
if (IS_VAR_DATA_TYPE(col->info.type)) {
|
||||||
|
hasVarCol = true;
|
||||||
|
}
|
||||||
|
taosArrayPush(tempBlock.pDataBlock, col);
|
||||||
|
}
|
||||||
|
tempBlock.info.hasVarCol = hasVarCol;
|
||||||
|
|
||||||
|
SSDataBlock *inputBlock = blockDataExtractBlock(&tempBlock, start, numOfRows);
|
||||||
|
|
||||||
|
SUdfInterBuf state = {.buf = udfRes->interResBuf,
|
||||||
|
.bufLen = session->bufSize,
|
||||||
|
.numOfResult = udfRes->interResNum};
|
||||||
|
SUdfInterBuf newState = {0};
|
||||||
|
|
||||||
|
callUdfAggProcess(session, inputBlock, &state, &newState);
|
||||||
|
|
||||||
|
udfRes->interResNum = newState.numOfResult;
|
||||||
|
memcpy(udfRes->interResBuf, newState.buf, newState.bufLen);
|
||||||
|
|
||||||
|
if (newState.numOfResult == 1 || state.numOfResult == 1) {
|
||||||
|
GET_RES_INFO(pCtx)->numOfRes = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
blockDataDestroy(inputBlock);
|
||||||
|
|
||||||
|
taosArrayDestroy(tempBlock.pDataBlock);
|
||||||
|
|
||||||
|
taosMemoryFree(newState.buf);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock) {
|
||||||
|
SUdfAggRes* udfRes = (SUdfAggRes *)GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
||||||
|
SUdfUvSession *session = udfRes->session;
|
||||||
|
udfRes->finalResBuf = (char*)udfRes + sizeof(SUdfAggRes);
|
||||||
|
udfRes->interResBuf = (char*)udfRes + sizeof(SUdfAggRes) + session->outputLen;
|
||||||
|
|
||||||
|
|
||||||
|
SUdfInterBuf resultBuf = {.buf = udfRes->finalResBuf,
|
||||||
|
.bufLen = session->outputLen,
|
||||||
|
.numOfResult = udfRes->finalResNum};
|
||||||
|
SUdfInterBuf state = {.buf = udfRes->interResBuf,
|
||||||
|
.bufLen = session->bufSize,
|
||||||
|
.numOfResult = udfRes->interResNum};
|
||||||
|
callUdfAggFinalize(session, &state, &resultBuf);
|
||||||
|
teardownUdf(session);
|
||||||
|
|
||||||
|
if (resultBuf.numOfResult == 1) {
|
||||||
|
GET_RES_INFO(pCtx)->numOfRes = 1;
|
||||||
|
}
|
||||||
|
return functionFinalizeWithResultBuf(pCtx, pBlock, udfRes->finalResBuf);
|
||||||
|
}
|
|
@ -77,6 +77,10 @@ typedef struct SUdf {
|
||||||
uv_lib_t lib;
|
uv_lib_t lib;
|
||||||
TUdfScalarProcFunc scalarProcFunc;
|
TUdfScalarProcFunc scalarProcFunc;
|
||||||
TUdfFreeUdfColumnFunc freeUdfColumn;
|
TUdfFreeUdfColumnFunc freeUdfColumn;
|
||||||
|
|
||||||
|
TUdfAggStartFunc aggStartFunc;
|
||||||
|
TUdfAggProcessFunc aggProcFunc;
|
||||||
|
TUdfAggFinishFunc aggFinishFunc;
|
||||||
} SUdf;
|
} SUdf;
|
||||||
|
|
||||||
// TODO: low priority: change name onxxx to xxxCb, and udfc or udfd as prefix
|
// TODO: low priority: change name onxxx to xxxCb, and udfc or udfd as prefix
|
||||||
|
@ -97,15 +101,32 @@ int32_t udfdLoadUdf(char *udfName, SUdf *udf) {
|
||||||
fnError("can not load library %s. error: %s", udf->path, uv_strerror(err));
|
fnError("can not load library %s. error: %s", udf->path, uv_strerror(err));
|
||||||
return UDFC_CODE_LOAD_UDF_FAILURE;
|
return UDFC_CODE_LOAD_UDF_FAILURE;
|
||||||
}
|
}
|
||||||
// TODO: find all the functions
|
//TODO: init and destroy function
|
||||||
char normalFuncName[TSDB_FUNC_NAME_LEN] = {0};
|
if (udf->funcType == TSDB_FUNC_TYPE_SCALAR) {
|
||||||
strcpy(normalFuncName, udfName);
|
char processFuncName[TSDB_FUNC_NAME_LEN] = {0};
|
||||||
uv_dlsym(&udf->lib, normalFuncName, (void **)(&udf->scalarProcFunc));
|
strcpy(processFuncName, udfName);
|
||||||
char freeFuncName[TSDB_FUNC_NAME_LEN + 6] = {0};
|
uv_dlsym(&udf->lib, processFuncName, (void **)(&udf->scalarProcFunc));
|
||||||
|
char freeFuncName[TSDB_FUNC_NAME_LEN + 5] = {0};
|
||||||
char *freeSuffix = "_free";
|
char *freeSuffix = "_free";
|
||||||
strncpy(freeFuncName, normalFuncName, strlen(normalFuncName));
|
strncpy(freeFuncName, processFuncName, strlen(processFuncName));
|
||||||
strncat(freeFuncName, freeSuffix, strlen(freeSuffix));
|
strncat(freeFuncName, freeSuffix, strlen(freeSuffix));
|
||||||
uv_dlsym(&udf->lib, freeFuncName, (void **)(&udf->freeUdfColumn));
|
uv_dlsym(&udf->lib, freeFuncName, (void **)(&udf->freeUdfColumn));
|
||||||
|
} else if (udf->funcType == TSDB_FUNC_TYPE_AGGREGATE) {
|
||||||
|
char processFuncName[TSDB_FUNC_NAME_LEN] = {0};
|
||||||
|
strcpy(processFuncName, udfName);
|
||||||
|
uv_dlsym(&udf->lib, processFuncName, (void **)(&udf->aggProcFunc));
|
||||||
|
char startFuncName[TSDB_FUNC_NAME_LEN + 6] = {0};
|
||||||
|
char *startSuffix = "_start";
|
||||||
|
strncpy(startFuncName, processFuncName, strlen(processFuncName));
|
||||||
|
strncat(startFuncName, startSuffix, strlen(startSuffix));
|
||||||
|
uv_dlsym(&udf->lib, startFuncName, (void **)(&udf->aggStartFunc));
|
||||||
|
char finishFuncName[TSDB_FUNC_NAME_LEN + 7] = {0};
|
||||||
|
char *finishSuffix = "_finish";
|
||||||
|
strncpy(finishFuncName, processFuncName, strlen(processFuncName));
|
||||||
|
strncat(finishFuncName, finishSuffix, strlen(finishSuffix));
|
||||||
|
uv_dlsym(&udf->lib, startFuncName, (void **)(&udf->aggFinishFunc));
|
||||||
|
//TODO: merge
|
||||||
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -160,6 +181,9 @@ void udfdProcessRequest(uv_work_t *req) {
|
||||||
rsp.type = request.type;
|
rsp.type = request.type;
|
||||||
rsp.code = 0;
|
rsp.code = 0;
|
||||||
rsp.setupRsp.udfHandle = (int64_t)(handle);
|
rsp.setupRsp.udfHandle = (int64_t)(handle);
|
||||||
|
rsp.setupRsp.outputType = udf->outputType;
|
||||||
|
rsp.setupRsp.outputLen = udf->outputLen;
|
||||||
|
rsp.setupRsp.bufSize = udf->bufSize;
|
||||||
int32_t len = encodeUdfResponse(NULL, &rsp);
|
int32_t len = encodeUdfResponse(NULL, &rsp);
|
||||||
rsp.msgLen = len;
|
rsp.msgLen = len;
|
||||||
void *bufBegin = taosMemoryMalloc(len);
|
void *bufBegin = taosMemoryMalloc(len);
|
||||||
|
@ -178,25 +202,57 @@ void udfdProcessRequest(uv_work_t *req) {
|
||||||
call->udfHandle);
|
call->udfHandle);
|
||||||
SUdfcFuncHandle *handle = (SUdfcFuncHandle *)(call->udfHandle);
|
SUdfcFuncHandle *handle = (SUdfcFuncHandle *)(call->udfHandle);
|
||||||
SUdf *udf = handle->udf;
|
SUdf *udf = handle->udf;
|
||||||
|
SUdfResponse response = {0};
|
||||||
|
SUdfResponse *rsp = &response;
|
||||||
|
SUdfCallResponse *subRsp = &rsp->callRsp;
|
||||||
|
|
||||||
|
switch(call->callType) {
|
||||||
|
case TSDB_UDF_CALL_SCALA_PROC: {
|
||||||
|
SUdfColumn output = {0};
|
||||||
|
|
||||||
SUdfDataBlock input = {0};
|
SUdfDataBlock input = {0};
|
||||||
convertDataBlockToUdfDataBlock(&call->block, &input);
|
convertDataBlockToUdfDataBlock(&call->block, &input);
|
||||||
SUdfColumn output = {0};
|
udf->scalarProcFunc(&input, &output);
|
||||||
// TODO: call different functions according to call type, for now just calar
|
|
||||||
if (call->callType == TSDB_UDF_CALL_SCALA_PROC) {
|
convertUdfColumnToDataBlock(&output, &response.callRsp.resultData);
|
||||||
udf->scalarProcFunc(input, &output);
|
udf->freeUdfColumn(&output);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case TSDB_UDF_CALL_AGG_INIT: {
|
||||||
|
SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize),
|
||||||
|
.bufLen= udf->bufSize,
|
||||||
|
.numOfResult = 0};
|
||||||
|
udf->aggStartFunc(&outBuf);
|
||||||
|
subRsp->resultBuf = outBuf;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case TSDB_UDF_CALL_AGG_PROC: {
|
||||||
|
SUdfDataBlock input = {0};
|
||||||
|
convertDataBlockToUdfDataBlock(&call->block, &input);
|
||||||
|
SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize),
|
||||||
|
.bufLen= udf->bufSize,
|
||||||
|
.numOfResult = 0};
|
||||||
|
udf->aggProcFunc(&input, &outBuf);
|
||||||
|
subRsp->resultBuf = outBuf;
|
||||||
|
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case TSDB_UDF_CALL_AGG_FIN: {
|
||||||
|
SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize),
|
||||||
|
.bufLen= udf->bufSize,
|
||||||
|
.numOfResult = 0};
|
||||||
|
udf->aggFinishFunc(&call->interBuf, &outBuf);
|
||||||
|
subRsp->resultBuf = outBuf;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
SUdfResponse response = {0};
|
|
||||||
SUdfResponse *rsp = &response;
|
|
||||||
if (call->callType == TSDB_UDF_CALL_SCALA_PROC) {
|
|
||||||
rsp->seqNum = request.seqNum;
|
rsp->seqNum = request.seqNum;
|
||||||
rsp->type = request.type;
|
rsp->type = request.type;
|
||||||
rsp->code = 0;
|
rsp->code = 0;
|
||||||
SUdfCallResponse *subRsp = &rsp->callRsp;
|
|
||||||
subRsp->callType = call->callType;
|
subRsp->callType = call->callType;
|
||||||
convertUdfColumnToDataBlock(&output, &subRsp->resultData);
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t len = encodeUdfResponse(NULL, rsp);
|
int32_t len = encodeUdfResponse(NULL, rsp);
|
||||||
rsp->msgLen = len;
|
rsp->msgLen = len;
|
||||||
|
@ -205,9 +261,6 @@ void udfdProcessRequest(uv_work_t *req) {
|
||||||
encodeUdfResponse(&buf, rsp);
|
encodeUdfResponse(&buf, rsp);
|
||||||
uvUdf->output = uv_buf_init(bufBegin, len);
|
uvUdf->output = uv_buf_init(bufBegin, len);
|
||||||
|
|
||||||
// TODO: free udf column
|
|
||||||
udf->freeUdfColumn(&output);
|
|
||||||
|
|
||||||
taosMemoryFree(uvUdf->input.base);
|
taosMemoryFree(uvUdf->input.base);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
|
@ -9,18 +9,18 @@
|
||||||
#undef free
|
#undef free
|
||||||
#define free free
|
#define free free
|
||||||
|
|
||||||
int32_t udf1_setup() {
|
int32_t udf1_init() {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t udf1_teardown() {
|
int32_t udf1_destroy() {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t udf1(SUdfDataBlock block, SUdfColumn *resultCol) {
|
int32_t udf1(SUdfDataBlock* block, SUdfColumn *resultCol) {
|
||||||
SUdfColumnData *resultData = &resultCol->colData;
|
SUdfColumnData *resultData = &resultCol->colData;
|
||||||
resultData->numOfRows = block.numOfRows;
|
resultData->numOfRows = block->numOfRows;
|
||||||
SUdfColumnData *srcData = &block.udfCols[0]->colData;
|
SUdfColumnData *srcData = &block->udfCols[0]->colData;
|
||||||
resultData->varLengthColumn = srcData->varLengthColumn;
|
resultData->varLengthColumn = srcData->varLengthColumn;
|
||||||
|
|
||||||
if (resultData->varLengthColumn) {
|
if (resultData->varLengthColumn) {
|
||||||
|
|
|
@ -0,0 +1,52 @@
|
||||||
|
#include <string.h>
|
||||||
|
#include <stdlib.h>
|
||||||
|
#include <stdio.h>
|
||||||
|
|
||||||
|
#include "tudf.h"
|
||||||
|
|
||||||
|
#undef malloc
|
||||||
|
#define malloc malloc
|
||||||
|
#undef free
|
||||||
|
#define free free
|
||||||
|
|
||||||
|
int32_t udf2_init() {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t udf2_destroy() {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t udf2_start(SUdfInterBuf *buf) {
|
||||||
|
*(int64_t*)(buf->buf) = 0;
|
||||||
|
buf->bufLen = sizeof(int64_t);
|
||||||
|
buf->numOfResult = 0;
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t udf2(SUdfDataBlock* block, SUdfInterBuf *interBuf) {
|
||||||
|
int64_t sumSquares = *(int64_t*)interBuf->buf;
|
||||||
|
for (int32_t i = 0; i < block->numOfCols; ++i) {
|
||||||
|
for (int32_t j = 0; j < block->numOfRows; ++i) {
|
||||||
|
SUdfColumn* col = block->udfCols[i];
|
||||||
|
//TODO: check the bitmap for null value
|
||||||
|
int32_t* rows = (int32_t*)col->colData.fixLenCol.data;
|
||||||
|
sumSquares += rows[j] * rows[j];
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
*(int64_t*)interBuf = sumSquares;
|
||||||
|
interBuf->bufLen = sizeof(int64_t);
|
||||||
|
//TODO: if all null value, numOfResult = 0;
|
||||||
|
interBuf->numOfResult = 1;
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t udf2_finish(SUdfInterBuf* buf, SUdfInterBuf *resultData) {
|
||||||
|
//TODO: check numOfResults;
|
||||||
|
int64_t sumSquares = *(int64_t*)(buf->buf);
|
||||||
|
*(double*)(resultData->buf) = sqrt(sumSquares);
|
||||||
|
resultData->bufLen = sizeof(double);
|
||||||
|
resultData->numOfResult = 1;
|
||||||
|
return 0;
|
||||||
|
}
|
|
@ -7,6 +7,7 @@
|
||||||
#include "tcommon.h"
|
#include "tcommon.h"
|
||||||
#include "tdatablock.h"
|
#include "tdatablock.h"
|
||||||
#include "scalar.h"
|
#include "scalar.h"
|
||||||
|
#include "tudf.h"
|
||||||
|
|
||||||
int32_t scalarGetOperatorParamNum(EOperatorType type) {
|
int32_t scalarGetOperatorParamNum(EOperatorType type) {
|
||||||
if (OP_TYPE_IS_NULL == type || OP_TYPE_IS_NOT_NULL == type || OP_TYPE_IS_TRUE == type || OP_TYPE_IS_NOT_TRUE == type
|
if (OP_TYPE_IS_NULL == type || OP_TYPE_IS_NOT_NULL == type || OP_TYPE_IS_TRUE == type || OP_TYPE_IS_NOT_TRUE == type
|
||||||
|
@ -336,14 +337,12 @@ int32_t sclExecFunction(SFunctionNode *node, SScalarCtx *ctx, SScalarParam *outp
|
||||||
SCL_ERR_RET(sclInitParamList(¶ms, node->pParameterList, ctx, ¶mNum, &rowNum));
|
SCL_ERR_RET(sclInitParamList(¶ms, node->pParameterList, ctx, ¶mNum, &rowNum));
|
||||||
|
|
||||||
if (fmIsUserDefinedFunc(node->funcId)) {
|
if (fmIsUserDefinedFunc(node->funcId)) {
|
||||||
#if 0
|
|
||||||
UdfcFuncHandle udfHandle = NULL;
|
UdfcFuncHandle udfHandle = NULL;
|
||||||
|
|
||||||
SCL_ERR_JRET(setupUdf(node->functionName, &udfHandle));
|
SCL_ERR_JRET(setupUdf(node->functionName, &udfHandle));
|
||||||
code = callUdfScalarFunc(udfHandle, params, paramNum, output);
|
code = callUdfScalarFunc(udfHandle, params, paramNum, output);
|
||||||
teardownUdf(udfHandle);
|
teardownUdf(udfHandle);
|
||||||
SCL_ERR_JRET(code);
|
SCL_ERR_JRET(code);
|
||||||
#endif
|
|
||||||
} else {
|
} else {
|
||||||
SScalarFuncExecFuncs ffpSet = {0};
|
SScalarFuncExecFuncs ffpSet = {0};
|
||||||
code = fmGetScalarFuncExecFuncs(node->funcId, &ffpSet);
|
code = fmGetScalarFuncExecFuncs(node->funcId, &ffpSet);
|
||||||
|
|
|
@ -0,0 +1,172 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "tstreamUpdate.h"
|
||||||
|
#include "ttime.h"
|
||||||
|
|
||||||
|
#define DEFAULT_FALSE_POSITIVE 0.01
|
||||||
|
#define DEFAULT_BUCKET_SIZE 1024
|
||||||
|
#define ROWS_PER_MILLISECOND 1
|
||||||
|
#define MAX_NUM_SCALABLE_BF 120
|
||||||
|
#define MIN_NUM_SCALABLE_BF 10
|
||||||
|
#define DEFAULT_PREADD_BUCKET 1
|
||||||
|
#define MAX_INTERVAL MILLISECOND_PER_MINUTE
|
||||||
|
#define MIN_INTERVAL (MILLISECOND_PER_SECOND * 10)
|
||||||
|
|
||||||
|
static void windowSBfAdd(SUpdateInfo *pInfo, uint64_t count) {
|
||||||
|
if (pInfo->numSBFs < count ) {
|
||||||
|
count = pInfo->numSBFs;
|
||||||
|
}
|
||||||
|
for (uint64_t i = 0; i < count; ++i) {
|
||||||
|
SScalableBf *tsSBF = tScalableBfInit(pInfo->interval * ROWS_PER_MILLISECOND,
|
||||||
|
DEFAULT_FALSE_POSITIVE);
|
||||||
|
taosArrayPush(pInfo->pTsSBFs, &tsSBF);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static void windowSBfDelete(SUpdateInfo *pInfo, uint64_t count) {
|
||||||
|
if (count < pInfo->numSBFs - 1) {
|
||||||
|
for (uint64_t i = 0; i < count; ++i) {
|
||||||
|
SScalableBf *pTsSBFs = taosArrayGetP(pInfo->pTsSBFs, i);
|
||||||
|
tScalableBfDestroy(pTsSBFs);
|
||||||
|
taosArrayRemove(pInfo->pTsSBFs, i);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
taosArrayClearP(pInfo->pTsSBFs, (FDelete)tScalableBfDestroy);
|
||||||
|
}
|
||||||
|
pInfo->minTS += pInfo->interval * count;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int64_t adjustInterval(int64_t interval, int32_t precision) {
|
||||||
|
int64_t val = interval;
|
||||||
|
if (precision != TSDB_TIME_PRECISION_MILLI) {
|
||||||
|
val = convertTimePrecision(interval, precision, TSDB_TIME_PRECISION_MILLI);
|
||||||
|
}
|
||||||
|
if (val < MIN_INTERVAL) {
|
||||||
|
val = MIN_INTERVAL;
|
||||||
|
} else if (val > MAX_INTERVAL) {
|
||||||
|
val = MAX_INTERVAL;
|
||||||
|
}
|
||||||
|
val = convertTimePrecision(val, TSDB_TIME_PRECISION_MILLI, precision);
|
||||||
|
return val;
|
||||||
|
}
|
||||||
|
|
||||||
|
SUpdateInfo *updateInfoInitP(SInterval* pInterval, int64_t watermark) {
|
||||||
|
return updateInfoInit(pInterval->interval, pInterval->precision, watermark);
|
||||||
|
}
|
||||||
|
|
||||||
|
SUpdateInfo *updateInfoInit(int64_t interval, int32_t precision, int64_t watermark) {
|
||||||
|
SUpdateInfo *pInfo = taosMemoryCalloc(1, sizeof(SUpdateInfo));
|
||||||
|
if (pInfo == NULL) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
pInfo->pTsBuckets = NULL;
|
||||||
|
pInfo->pTsSBFs = NULL;
|
||||||
|
pInfo->minTS = -1;
|
||||||
|
pInfo->interval = adjustInterval(interval, precision);
|
||||||
|
pInfo->watermark = watermark;
|
||||||
|
|
||||||
|
uint64_t bfSize = (uint64_t)(watermark / pInfo->interval);
|
||||||
|
if (bfSize < MIN_NUM_SCALABLE_BF) {
|
||||||
|
bfSize = MIN_NUM_SCALABLE_BF;
|
||||||
|
} else if (bfSize > MAX_NUM_SCALABLE_BF) {
|
||||||
|
bfSize = MAX_NUM_SCALABLE_BF;
|
||||||
|
}
|
||||||
|
|
||||||
|
pInfo->pTsSBFs = taosArrayInit(bfSize, sizeof(SScalableBf));
|
||||||
|
if (pInfo->pTsSBFs == NULL) {
|
||||||
|
updateInfoDestroy(pInfo);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
pInfo->numSBFs = bfSize;
|
||||||
|
windowSBfAdd(pInfo, bfSize);
|
||||||
|
|
||||||
|
pInfo->pTsBuckets = taosArrayInit(DEFAULT_BUCKET_SIZE, sizeof(TSKEY));
|
||||||
|
if (pInfo->pTsBuckets == NULL) {
|
||||||
|
updateInfoDestroy(pInfo);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
TSKEY dumy = 0;
|
||||||
|
for(uint64_t i=0; i < DEFAULT_BUCKET_SIZE; ++i) {
|
||||||
|
taosArrayPush(pInfo->pTsBuckets, &dumy);
|
||||||
|
}
|
||||||
|
pInfo->numBuckets = DEFAULT_BUCKET_SIZE;
|
||||||
|
return pInfo;
|
||||||
|
}
|
||||||
|
|
||||||
|
static SScalableBf* getSBf(SUpdateInfo *pInfo, TSKEY ts) {
|
||||||
|
if (ts <= 0) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
if (pInfo->minTS < 0) {
|
||||||
|
pInfo->minTS = (TSKEY)(ts / pInfo->interval * pInfo->interval);
|
||||||
|
}
|
||||||
|
uint64_t index = (uint64_t)((ts - pInfo->minTS) / pInfo->interval);
|
||||||
|
if (index >= pInfo->numSBFs) {
|
||||||
|
uint64_t count = index + 1 - pInfo->numSBFs;
|
||||||
|
windowSBfDelete(pInfo, count);
|
||||||
|
windowSBfAdd(pInfo, count);
|
||||||
|
index = pInfo->numSBFs - 1;
|
||||||
|
}
|
||||||
|
SScalableBf *res = taosArrayGetP(pInfo->pTsSBFs, index);
|
||||||
|
if (res == NULL) {
|
||||||
|
res = tScalableBfInit(pInfo->interval * ROWS_PER_MILLISECOND,
|
||||||
|
DEFAULT_FALSE_POSITIVE);
|
||||||
|
taosArrayPush(pInfo->pTsSBFs, &res);
|
||||||
|
}
|
||||||
|
return res;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool isUpdated(SUpdateInfo *pInfo, tb_uid_t tableId, TSKEY ts) {
|
||||||
|
int32_t res = TSDB_CODE_FAILED;
|
||||||
|
uint64_t index = ((uint64_t)tableId) % pInfo->numBuckets;
|
||||||
|
SScalableBf* pSBf = getSBf(pInfo, ts);
|
||||||
|
// pSBf may be a null pointer
|
||||||
|
if (pSBf) {
|
||||||
|
res = tScalableBfPut(pSBf, &ts, sizeof(TSKEY));
|
||||||
|
}
|
||||||
|
|
||||||
|
TSKEY maxTs = *(TSKEY *)taosArrayGet(pInfo->pTsBuckets, index);
|
||||||
|
if (maxTs < ts ) {
|
||||||
|
taosArraySet(pInfo->pTsBuckets, index, &ts);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (ts < pInfo->minTS) {
|
||||||
|
return true;
|
||||||
|
} else if (res == TSDB_CODE_SUCCESS) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
//check from tsdb api
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
void updateInfoDestroy(SUpdateInfo *pInfo) {
|
||||||
|
if (pInfo == NULL) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
taosArrayDestroy(pInfo->pTsBuckets);
|
||||||
|
|
||||||
|
uint64_t size = taosArrayGetSize(pInfo->pTsSBFs);
|
||||||
|
for (uint64_t i = 0; i < size; i++) {
|
||||||
|
SScalableBf *pSBF = taosArrayGetP(pInfo->pTsSBFs, i);
|
||||||
|
tScalableBfDestroy(pSBF);
|
||||||
|
}
|
||||||
|
|
||||||
|
taosArrayDestroy(pInfo->pTsSBFs);
|
||||||
|
taosMemoryFree(pInfo);
|
||||||
|
}
|
|
@ -0,0 +1,20 @@
|
||||||
|
|
||||||
|
MESSAGE(STATUS "build stream unit test")
|
||||||
|
|
||||||
|
# GoogleTest requires at least C++11
|
||||||
|
SET(CMAKE_CXX_STANDARD 11)
|
||||||
|
AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST)
|
||||||
|
|
||||||
|
# bloomFilterTest
|
||||||
|
ADD_EXECUTABLE(streamUpdateTest "tstreamUpdateTest.cpp")
|
||||||
|
|
||||||
|
TARGET_LINK_LIBRARIES(
|
||||||
|
streamUpdateTest
|
||||||
|
PUBLIC os util common gtest stream
|
||||||
|
)
|
||||||
|
|
||||||
|
TARGET_INCLUDE_DIRECTORIES(
|
||||||
|
streamUpdateTest
|
||||||
|
PUBLIC "${TD_SOURCE_DIR}/include/libs/stream/"
|
||||||
|
PRIVATE "${TD_SOURCE_DIR}/source/libs/stream/inc"
|
||||||
|
)
|
|
@ -0,0 +1,103 @@
|
||||||
|
#include <gtest/gtest.h>
|
||||||
|
|
||||||
|
#include "tstreamUpdate.h"
|
||||||
|
#include "ttime.h"
|
||||||
|
|
||||||
|
using namespace std;
|
||||||
|
|
||||||
|
TEST(TD_STREAM_UPDATE_TEST, update) {
|
||||||
|
int64_t interval = 20 * 1000;
|
||||||
|
int64_t watermark = 10 * 60 * 1000;
|
||||||
|
SUpdateInfo *pSU = updateInfoInit(interval, TSDB_TIME_PRECISION_MILLI, watermark);
|
||||||
|
GTEST_ASSERT_EQ(isUpdated(pSU,1, 0), true);
|
||||||
|
GTEST_ASSERT_EQ(isUpdated(pSU,1, -1), true);
|
||||||
|
|
||||||
|
for(int i=0; i < 1024; i++) {
|
||||||
|
GTEST_ASSERT_EQ(isUpdated(pSU,i, 1), false);
|
||||||
|
}
|
||||||
|
for(int i=0; i < 1024; i++) {
|
||||||
|
GTEST_ASSERT_EQ(isUpdated(pSU,i, 1), true);
|
||||||
|
}
|
||||||
|
|
||||||
|
for(int i=0; i < 1024; i++) {
|
||||||
|
GTEST_ASSERT_EQ(isUpdated(pSU,i, 2), false);
|
||||||
|
}
|
||||||
|
for(int i=0; i < 1024; i++) {
|
||||||
|
GTEST_ASSERT_EQ(isUpdated(pSU,i, 2), true);
|
||||||
|
}
|
||||||
|
|
||||||
|
for(int i=0; i < 1024; i++) {
|
||||||
|
GTEST_ASSERT_EQ(isUpdated(pSU,i, 1), true);
|
||||||
|
}
|
||||||
|
|
||||||
|
for(int i=3; i < 1024; i++) {
|
||||||
|
GTEST_ASSERT_EQ(isUpdated(pSU,0, i), false);
|
||||||
|
}
|
||||||
|
GTEST_ASSERT_EQ(*(int64_t*)taosArrayGet(pSU->pTsBuckets,0), 1023);
|
||||||
|
|
||||||
|
for(int i=3; i < 1024; i++) {
|
||||||
|
GTEST_ASSERT_EQ(isUpdated(pSU,0, i), true);
|
||||||
|
}
|
||||||
|
GTEST_ASSERT_EQ(*(int64_t*)taosArrayGet(pSU->pTsBuckets,0), 1023);
|
||||||
|
|
||||||
|
SUpdateInfo *pSU1 = updateInfoInit(interval, TSDB_TIME_PRECISION_MILLI, watermark);
|
||||||
|
for(int i=1; i <= watermark / interval; i++) {
|
||||||
|
GTEST_ASSERT_EQ(isUpdated(pSU1, 1, i * interval + 5), false);
|
||||||
|
GTEST_ASSERT_EQ(pSU1->minTS, interval);
|
||||||
|
GTEST_ASSERT_EQ(pSU1->numSBFs, watermark / interval);
|
||||||
|
}
|
||||||
|
for(int i=0; i < pSU1->numSBFs; i++) {
|
||||||
|
SScalableBf *pSBF = (SScalableBf *)taosArrayGetP(pSU1->pTsSBFs, i);
|
||||||
|
SBloomFilter *pBF = (SBloomFilter *)taosArrayGetP(pSBF->bfArray, 0);
|
||||||
|
GTEST_ASSERT_EQ(pBF->size, 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
for(int i= watermark / interval + 1, j = 2 ; i <= watermark / interval + 10; i++,j++) {
|
||||||
|
GTEST_ASSERT_EQ(isUpdated(pSU1, 1, i * interval + 5), false);
|
||||||
|
GTEST_ASSERT_EQ(pSU1->minTS, interval*j);
|
||||||
|
GTEST_ASSERT_EQ(pSU1->numSBFs, watermark / interval);
|
||||||
|
SScalableBf *pSBF = (SScalableBf *)taosArrayGetP(pSU1->pTsSBFs, pSU1->numSBFs - 1);
|
||||||
|
SBloomFilter *pBF = (SBloomFilter *)taosArrayGetP(pSBF->bfArray, 0);
|
||||||
|
GTEST_ASSERT_EQ(pBF->size, 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
for(int i= watermark / interval * 100, j = 0; j < 10; i+= (watermark / interval * 2), j++) {
|
||||||
|
GTEST_ASSERT_EQ(isUpdated(pSU1, 1, i * interval + 5), false);
|
||||||
|
GTEST_ASSERT_EQ(pSU1->minTS, (i-(pSU1->numSBFs-1))*interval);
|
||||||
|
GTEST_ASSERT_EQ(pSU1->numSBFs, watermark / interval);
|
||||||
|
}
|
||||||
|
|
||||||
|
SUpdateInfo *pSU2 = updateInfoInit(interval, TSDB_TIME_PRECISION_MILLI, watermark);
|
||||||
|
GTEST_ASSERT_EQ(isUpdated(pSU2, 1, 1 * interval + 5), false);
|
||||||
|
GTEST_ASSERT_EQ(pSU2->minTS, interval);
|
||||||
|
for(int i= watermark / interval * 100, j = 0; j < 10; i+= (watermark / interval * 10), j++) {
|
||||||
|
GTEST_ASSERT_EQ(isUpdated(pSU2, 1, i * interval + 5), false);
|
||||||
|
GTEST_ASSERT_EQ(pSU2->minTS, (i-(pSU2->numSBFs-1))*interval);
|
||||||
|
GTEST_ASSERT_EQ(pSU2->numSBFs, watermark / interval);
|
||||||
|
GTEST_ASSERT_EQ(*(int64_t*)taosArrayGet(pSU2->pTsBuckets,1), i * interval + 5);
|
||||||
|
}
|
||||||
|
|
||||||
|
SUpdateInfo *pSU3 = updateInfoInit(interval, TSDB_TIME_PRECISION_MILLI, watermark);
|
||||||
|
for(int j = 1; j < 100; j++) {
|
||||||
|
for(int i = 0; i < pSU3->numSBFs; i++) {
|
||||||
|
GTEST_ASSERT_EQ(isUpdated(pSU3, i, i * interval + 5 * j), false);
|
||||||
|
GTEST_ASSERT_EQ(pSU3->minTS, 0);
|
||||||
|
GTEST_ASSERT_EQ(pSU3->numSBFs, watermark / interval);
|
||||||
|
GTEST_ASSERT_EQ(*(int64_t*)taosArrayGet(pSU3->pTsBuckets, i), i * interval + 5 * j);
|
||||||
|
SScalableBf *pSBF = (SScalableBf *)taosArrayGetP(pSU3->pTsSBFs, i);
|
||||||
|
SBloomFilter *pBF = (SBloomFilter *)taosArrayGetP(pSBF->bfArray, 0);
|
||||||
|
GTEST_ASSERT_EQ(pBF->size, j);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
updateInfoDestroy(pSU);
|
||||||
|
updateInfoDestroy(pSU1);
|
||||||
|
updateInfoDestroy(pSU2);
|
||||||
|
updateInfoDestroy(pSU3);
|
||||||
|
}
|
||||||
|
|
||||||
|
int main(int argc, char* argv[]) {
|
||||||
|
testing::InitGoogleTest(&argc, argv);
|
||||||
|
return RUN_ALL_TESTS();
|
||||||
|
}
|
|
@ -348,14 +348,13 @@ typedef struct SDelayQueue {
|
||||||
uv_timer_t* timer;
|
uv_timer_t* timer;
|
||||||
Heap* heap;
|
Heap* heap;
|
||||||
uv_loop_t* loop;
|
uv_loop_t* loop;
|
||||||
void (*free)(void* arg);
|
|
||||||
} SDelayQueue;
|
} SDelayQueue;
|
||||||
|
|
||||||
int transCreateDelayQueue(uv_loop_t* loop, SDelayQueue** queue);
|
int transDQCreate(uv_loop_t* loop, SDelayQueue** queue);
|
||||||
|
|
||||||
void transDestroyDelayQueue(SDelayQueue* queue);
|
void transDQDestroy(SDelayQueue* queue);
|
||||||
|
|
||||||
int transPutTaskToDelayQueue(SDelayQueue* queue, void (*func)(void* arg), void* arg, uint64_t timeoutMs);
|
int transDQSched(SDelayQueue* queue, void (*func)(void* arg), void* arg, uint64_t timeoutMs);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* init global func
|
* init global func
|
||||||
|
|
|
@ -842,7 +842,7 @@ static SCliThrdObj* createThrdObj() {
|
||||||
|
|
||||||
pThrd->pool = createConnPool(4);
|
pThrd->pool = createConnPool(4);
|
||||||
|
|
||||||
transCreateDelayQueue(pThrd->loop, &pThrd->delayQueue);
|
transDQCreate(pThrd->loop, &pThrd->delayQueue);
|
||||||
|
|
||||||
pThrd->quit = false;
|
pThrd->quit = false;
|
||||||
return pThrd;
|
return pThrd;
|
||||||
|
@ -857,7 +857,7 @@ static void destroyThrdObj(SCliThrdObj* pThrd) {
|
||||||
taosThreadMutexDestroy(&pThrd->msgMtx);
|
taosThreadMutexDestroy(&pThrd->msgMtx);
|
||||||
transDestroyAsyncPool(pThrd->asyncPool);
|
transDestroyAsyncPool(pThrd->asyncPool);
|
||||||
|
|
||||||
transDestroyDelayQueue(pThrd->delayQueue);
|
transDQDestroy(pThrd->delayQueue);
|
||||||
taosMemoryFree(pThrd->loop);
|
taosMemoryFree(pThrd->loop);
|
||||||
taosMemoryFree(pThrd);
|
taosMemoryFree(pThrd);
|
||||||
}
|
}
|
||||||
|
@ -923,14 +923,14 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
|
||||||
STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg));
|
STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg));
|
||||||
arg->param1 = pMsg;
|
arg->param1 = pMsg;
|
||||||
arg->param2 = pThrd;
|
arg->param2 = pThrd;
|
||||||
transPutTaskToDelayQueue(pThrd->delayQueue, doDelayTask, arg, TRANS_RETRY_INTERVAL);
|
transDQSched(pThrd->delayQueue, doDelayTask, arg, TRANS_RETRY_INTERVAL);
|
||||||
|
|
||||||
cliDestroy((uv_handle_t*)pConn->stream);
|
cliDestroy((uv_handle_t*)pConn->stream);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
} else if (pCtx->retryCount < TRANS_RETRY_COUNT_LIMIT) {
|
} else if (pCtx->retryCount < TRANS_RETRY_COUNT_LIMIT) {
|
||||||
if (pResp->contLen == 0) {
|
if (pResp->contLen == 0) {
|
||||||
pEpSet->inUse = (pEpSet->inUse++) % pEpSet->numOfEps;
|
pEpSet->inUse = (++pEpSet->inUse) % pEpSet->numOfEps;
|
||||||
} else {
|
} else {
|
||||||
SMEpSet emsg = {0};
|
SMEpSet emsg = {0};
|
||||||
tDeserializeSMEpSet(pResp->pCont, pResp->contLen, &emsg);
|
tDeserializeSMEpSet(pResp->pCont, pResp->contLen, &emsg);
|
||||||
|
@ -940,7 +940,7 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
|
||||||
arg->param1 = pMsg;
|
arg->param1 = pMsg;
|
||||||
arg->param2 = pThrd;
|
arg->param2 = pThrd;
|
||||||
|
|
||||||
transPutTaskToDelayQueue(pThrd->delayQueue, doDelayTask, arg, TRANS_RETRY_INTERVAL);
|
transDQSched(pThrd->delayQueue, doDelayTask, arg, TRANS_RETRY_INTERVAL);
|
||||||
addConnToPool(pThrd, pConn);
|
addConnToPool(pThrd, pConn);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
|
@ -369,7 +369,7 @@ static int32_t timeCompare(const HeapNode* a, const HeapNode* b) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static void transDelayQueueTimeout(uv_timer_t* timer) {
|
static void transDQTimeout(uv_timer_t* timer) {
|
||||||
SDelayQueue* queue = timer->data;
|
SDelayQueue* queue = timer->data;
|
||||||
tTrace("timer %p timeout", timer);
|
tTrace("timer %p timeout", timer);
|
||||||
uint64_t timeout = 0;
|
uint64_t timeout = 0;
|
||||||
|
@ -388,10 +388,10 @@ static void transDelayQueueTimeout(uv_timer_t* timer) {
|
||||||
}
|
}
|
||||||
} while (1);
|
} while (1);
|
||||||
if (timeout != 0) {
|
if (timeout != 0) {
|
||||||
uv_timer_start(queue->timer, transDelayQueueTimeout, timeout, 0);
|
uv_timer_start(queue->timer, transDQTimeout, timeout, 0);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
int transCreateDelayQueue(uv_loop_t* loop, SDelayQueue** queue) {
|
int transDQCreate(uv_loop_t* loop, SDelayQueue** queue) {
|
||||||
uv_timer_t* timer = taosMemoryCalloc(1, sizeof(uv_timer_t));
|
uv_timer_t* timer = taosMemoryCalloc(1, sizeof(uv_timer_t));
|
||||||
uv_timer_init(loop, timer);
|
uv_timer_init(loop, timer);
|
||||||
|
|
||||||
|
@ -407,7 +407,7 @@ int transCreateDelayQueue(uv_loop_t* loop, SDelayQueue** queue) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
void transDestroyDelayQueue(SDelayQueue* queue) {
|
void transDQDestroy(SDelayQueue* queue) {
|
||||||
taosMemoryFree(queue->timer);
|
taosMemoryFree(queue->timer);
|
||||||
|
|
||||||
while (heapSize(queue->heap) > 0) {
|
while (heapSize(queue->heap) > 0) {
|
||||||
|
@ -424,19 +424,15 @@ void transDestroyDelayQueue(SDelayQueue* queue) {
|
||||||
taosMemoryFree(queue);
|
taosMemoryFree(queue);
|
||||||
}
|
}
|
||||||
|
|
||||||
int transPutTaskToDelayQueue(SDelayQueue* queue, void (*func)(void* arg), void* arg, uint64_t timeoutMs) {
|
int transDQSched(SDelayQueue* queue, void (*func)(void* arg), void* arg, uint64_t timeoutMs) {
|
||||||
SDelayTask* task = taosMemoryCalloc(1, sizeof(SDelayTask));
|
SDelayTask* task = taosMemoryCalloc(1, sizeof(SDelayTask));
|
||||||
|
|
||||||
task->func = func;
|
task->func = func;
|
||||||
task->arg = arg;
|
task->arg = arg;
|
||||||
task->execTime = taosGetTimestampMs() + timeoutMs;
|
task->execTime = taosGetTimestampMs() + timeoutMs;
|
||||||
|
|
||||||
tTrace("timer %p put task into queue, timeoutMs: %" PRIu64 "", queue->timer, timeoutMs);
|
tTrace("timer %p put task into queue, timeoutMs: %" PRIu64 "", queue->timer, timeoutMs);
|
||||||
heapInsert(queue->heap, &task->node);
|
heapInsert(queue->heap, &task->node);
|
||||||
if (heapSize(queue->heap) == 1) {
|
uv_timer_start(queue->timer, transDQTimeout, timeoutMs, 0);
|
||||||
uv_timer_start(queue->timer, transDelayQueueTimeout, timeoutMs, 0);
|
|
||||||
}
|
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -318,6 +318,20 @@ void taosArrayClearEx(SArray* pArray, void (*fp)(void*)) {
|
||||||
pArray->size = 0;
|
pArray->size = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void taosArrayClearP(SArray* pArray, FDelete fp) {
|
||||||
|
if (pArray == NULL) return;
|
||||||
|
if (fp == NULL) {
|
||||||
|
pArray->size = 0;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < pArray->size; ++i) {
|
||||||
|
fp(*(void**)TARRAY_GET_ELEM(pArray, i));
|
||||||
|
}
|
||||||
|
|
||||||
|
pArray->size = 0;
|
||||||
|
}
|
||||||
|
|
||||||
void* taosArrayDestroy(SArray* pArray) {
|
void* taosArrayDestroy(SArray* pArray) {
|
||||||
if (pArray) {
|
if (pArray) {
|
||||||
taosMemoryFree(pArray->pData);
|
taosMemoryFree(pArray->pData);
|
||||||
|
|
|
@ -0,0 +1,117 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#define _DEFAULT_SOURCE
|
||||||
|
|
||||||
|
#include "tbloomfilter.h"
|
||||||
|
#include "taos.h"
|
||||||
|
#include "taoserror.h"
|
||||||
|
|
||||||
|
#define UNIT_NUM_BITS 64
|
||||||
|
#define UNIT_ADDR_NUM_BITS 6
|
||||||
|
|
||||||
|
static FORCE_INLINE bool setBit(uint64_t *buf, uint64_t index) {
|
||||||
|
uint64_t unitIndex = index >> UNIT_ADDR_NUM_BITS;
|
||||||
|
uint64_t mask = 1 << (index % UNIT_NUM_BITS);
|
||||||
|
uint64_t old = buf[unitIndex];
|
||||||
|
buf[unitIndex] |= mask;
|
||||||
|
return buf[unitIndex] != old;
|
||||||
|
}
|
||||||
|
|
||||||
|
static FORCE_INLINE bool getBit(uint64_t *buf, uint64_t index) {
|
||||||
|
uint64_t unitIndex = index >> UNIT_ADDR_NUM_BITS;
|
||||||
|
uint64_t mask = 1 << (index % UNIT_NUM_BITS);
|
||||||
|
return buf[unitIndex] & mask;
|
||||||
|
}
|
||||||
|
|
||||||
|
SBloomFilter *tBloomFilterInit(uint64_t expectedEntries, double errorRate) {
|
||||||
|
if (expectedEntries < 1 || errorRate <= 0 || errorRate >= 1.0) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
SBloomFilter *pBF = taosMemoryCalloc(1, sizeof(SBloomFilter));
|
||||||
|
if (pBF == NULL) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
pBF->expectedEntries = expectedEntries;
|
||||||
|
pBF->errorRate = errorRate;
|
||||||
|
|
||||||
|
double lnRate = fabs(log(errorRate));
|
||||||
|
// ln(2)^2 = 0.480453013918201
|
||||||
|
// m = - n * ln(P) / ( ln(2) )^2
|
||||||
|
// m is the size of bloom filter, n is expected entries, P is false positive probability
|
||||||
|
pBF->numUnits = (uint64_t) ceil(expectedEntries * lnRate / 0.480453013918201 / UNIT_NUM_BITS);
|
||||||
|
pBF->numBits = pBF->numUnits * 64;
|
||||||
|
pBF->size = 0;
|
||||||
|
|
||||||
|
// ln(2) = 0.693147180559945
|
||||||
|
pBF->hashFunctions = (uint32_t) ceil(lnRate / 0.693147180559945);
|
||||||
|
pBF->hashFn1 = taosGetDefaultHashFunction(TSDB_DATA_TYPE_TIMESTAMP);
|
||||||
|
pBF->hashFn2 = taosGetDefaultHashFunction(TSDB_DATA_TYPE_NCHAR);
|
||||||
|
pBF->buffer = taosMemoryCalloc(pBF->numUnits, sizeof(uint64_t));
|
||||||
|
if (pBF->buffer == NULL) {
|
||||||
|
tBloomFilterDestroy(pBF);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
return pBF;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tBloomFilterPut(SBloomFilter *pBF, const void *keyBuf, uint32_t len) {
|
||||||
|
ASSERT(!tBloomFilterIsFull(pBF));
|
||||||
|
uint64_t h1 = (uint64_t)pBF->hashFn1(keyBuf, len);
|
||||||
|
uint64_t h2 = (uint64_t)pBF->hashFn2(keyBuf, len);
|
||||||
|
bool hasChange = false;
|
||||||
|
const register uint64_t size = pBF->numBits;
|
||||||
|
uint64_t cbHash = h1;
|
||||||
|
for (uint64_t i = 0; i < pBF->hashFunctions; ++i) {
|
||||||
|
hasChange |= setBit(pBF->buffer, cbHash % size);
|
||||||
|
cbHash += h2;
|
||||||
|
}
|
||||||
|
if (hasChange) {
|
||||||
|
pBF->size++;
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
return TSDB_CODE_FAILED;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tBloomFilterNoContain(const SBloomFilter *pBF, const void *keyBuf,
|
||||||
|
uint32_t len) {
|
||||||
|
uint64_t h1 = (uint64_t)pBF->hashFn1(keyBuf, len);
|
||||||
|
uint64_t h2 = (uint64_t)pBF->hashFn2(keyBuf, len);
|
||||||
|
const register uint64_t size = pBF->numBits;
|
||||||
|
uint64_t cbHash = h1;
|
||||||
|
for (uint64_t i = 0; i < pBF->hashFunctions; ++i) {
|
||||||
|
if (!getBit(pBF->buffer, cbHash % size)) {
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
cbHash += h2;
|
||||||
|
}
|
||||||
|
return TSDB_CODE_FAILED;
|
||||||
|
}
|
||||||
|
|
||||||
|
void tBloomFilterDestroy(SBloomFilter *pBF) {
|
||||||
|
if (pBF == NULL) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
taosMemoryFree(pBF->buffer);
|
||||||
|
taosMemoryFree(pBF);
|
||||||
|
}
|
||||||
|
|
||||||
|
void tBloomFilterDump(const struct SBloomFilter *pBF) {
|
||||||
|
// ToDo
|
||||||
|
}
|
||||||
|
|
||||||
|
bool tBloomFilterIsFull(const SBloomFilter *pBF) {
|
||||||
|
return pBF->size >= pBF->expectedEntries;
|
||||||
|
}
|
|
@ -0,0 +1,106 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#define _DEFAULT_SOURCE
|
||||||
|
|
||||||
|
#include "tscalablebf.h"
|
||||||
|
#include "taoserror.h"
|
||||||
|
|
||||||
|
#define DEFAULT_GROWTH 2
|
||||||
|
#define DEFAULT_TIGHTENING_RATIO 0.5
|
||||||
|
|
||||||
|
static SBloomFilter *tScalableBfAddFilter(SScalableBf *pSBf, uint64_t expectedEntries,
|
||||||
|
double errorRate);
|
||||||
|
|
||||||
|
SScalableBf *tScalableBfInit(uint64_t expectedEntries, double errorRate) {
|
||||||
|
const uint32_t defaultSize = 8;
|
||||||
|
if (expectedEntries < 1 || errorRate <= 0 || errorRate >= 1.0) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
SScalableBf *pSBf = taosMemoryCalloc(1, sizeof(SScalableBf));
|
||||||
|
if (pSBf == NULL) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
pSBf->numBits = 0;
|
||||||
|
pSBf->bfArray = taosArrayInit(defaultSize, sizeof(void *));
|
||||||
|
if (tScalableBfAddFilter(pSBf, expectedEntries, errorRate * DEFAULT_TIGHTENING_RATIO) == NULL ) {
|
||||||
|
tScalableBfDestroy(pSBf);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
pSBf->growth = DEFAULT_GROWTH;
|
||||||
|
return pSBf;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tScalableBfPut(SScalableBf *pSBf, const void *keyBuf, uint32_t len) {
|
||||||
|
int32_t size = taosArrayGetSize(pSBf->bfArray);
|
||||||
|
for (int32_t i = size - 2; i >= 0; --i) {
|
||||||
|
if (tBloomFilterNoContain(taosArrayGetP(pSBf->bfArray, i),
|
||||||
|
keyBuf, len) != TSDB_CODE_SUCCESS) {
|
||||||
|
return TSDB_CODE_FAILED;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
SBloomFilter *pNormalBf = taosArrayGetP(pSBf->bfArray, size - 1);
|
||||||
|
ASSERT(pNormalBf);
|
||||||
|
if (tBloomFilterIsFull(pNormalBf)) {
|
||||||
|
pNormalBf = tScalableBfAddFilter(pSBf,
|
||||||
|
pNormalBf->expectedEntries * pSBf->growth,
|
||||||
|
pNormalBf->errorRate * DEFAULT_TIGHTENING_RATIO);
|
||||||
|
if (pNormalBf == NULL) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return tBloomFilterPut(pNormalBf, keyBuf, len);
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tScalableBfNoContain(const SScalableBf *pSBf, const void *keyBuf,
|
||||||
|
uint32_t len) {
|
||||||
|
int32_t size = taosArrayGetSize(pSBf->bfArray);
|
||||||
|
for (int32_t i = size - 1; i >= 0; --i) {
|
||||||
|
if (tBloomFilterNoContain(taosArrayGetP(pSBf->bfArray, i),
|
||||||
|
keyBuf, len) != TSDB_CODE_SUCCESS) {
|
||||||
|
return TSDB_CODE_FAILED;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
static SBloomFilter *tScalableBfAddFilter(SScalableBf *pSBf, uint64_t expectedEntries,
|
||||||
|
double errorRate) {
|
||||||
|
SBloomFilter *pNormalBf = tBloomFilterInit(expectedEntries, errorRate);
|
||||||
|
if (pNormalBf == NULL) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
if(taosArrayPush(pSBf->bfArray, &pNormalBf) == NULL) {
|
||||||
|
tBloomFilterDestroy(pNormalBf);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
pSBf->numBits += pNormalBf->numBits;
|
||||||
|
return pNormalBf;
|
||||||
|
}
|
||||||
|
|
||||||
|
void tScalableBfDestroy(SScalableBf *pSBf) {
|
||||||
|
if (pSBf == NULL) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (pSBf->bfArray != NULL) {
|
||||||
|
taosArrayDestroyP(pSBf->bfArray, (FDelete)tBloomFilterDestroy);
|
||||||
|
}
|
||||||
|
taosMemoryFree(pSBf);
|
||||||
|
}
|
||||||
|
|
||||||
|
void tScalableBfDump(const SScalableBf *pSBf) {
|
||||||
|
// Todo;
|
||||||
|
}
|
|
@ -60,3 +60,11 @@ add_test(
|
||||||
NAME cfgTest
|
NAME cfgTest
|
||||||
COMMAND cfgTest
|
COMMAND cfgTest
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# bloomFilterTest
|
||||||
|
add_executable(bloomFilterTest "bloomFilterTest.cpp")
|
||||||
|
target_link_libraries(bloomFilterTest os util gtest_main)
|
||||||
|
add_test(
|
||||||
|
NAME bloomFilterTest
|
||||||
|
COMMAND bloomFilterTest
|
||||||
|
)
|
|
@ -0,0 +1,140 @@
|
||||||
|
#include <gtest/gtest.h>
|
||||||
|
|
||||||
|
#include "tscalablebf.h"
|
||||||
|
#include "taoserror.h"
|
||||||
|
|
||||||
|
using namespace std;
|
||||||
|
|
||||||
|
TEST(TD_UTIL_BLOOMFILTER_TEST, normal_bloomFilter) {
|
||||||
|
int64_t ts1 = 1650803518000;
|
||||||
|
|
||||||
|
GTEST_ASSERT_EQ(NULL, tBloomFilterInit(100, 0));
|
||||||
|
GTEST_ASSERT_EQ(NULL, tBloomFilterInit(100, 1));
|
||||||
|
GTEST_ASSERT_EQ(NULL, tBloomFilterInit(100, -0.1));
|
||||||
|
GTEST_ASSERT_EQ(NULL, tBloomFilterInit(0, 0.01));
|
||||||
|
|
||||||
|
SBloomFilter *pBF1 = tBloomFilterInit(100, 0.005);
|
||||||
|
GTEST_ASSERT_EQ(pBF1->numBits, 1152);
|
||||||
|
GTEST_ASSERT_EQ(pBF1->numUnits, 1152/64);
|
||||||
|
int64_t count = 0;
|
||||||
|
for(int64_t i = 0; count < 100; i++) {
|
||||||
|
int64_t ts = i + ts1;
|
||||||
|
if(tBloomFilterPut(pBF1, &ts, sizeof(int64_t)) == TSDB_CODE_SUCCESS ) {
|
||||||
|
count++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
ASSERT_TRUE(tBloomFilterIsFull(pBF1));
|
||||||
|
|
||||||
|
SBloomFilter *pBF2 = tBloomFilterInit(1000*10000, 0.1);
|
||||||
|
GTEST_ASSERT_EQ(pBF2->numBits, 47925312);
|
||||||
|
GTEST_ASSERT_EQ(pBF2->numUnits, 47925312/64);
|
||||||
|
|
||||||
|
SBloomFilter *pBF3 = tBloomFilterInit(10000*10000, 0.001);
|
||||||
|
GTEST_ASSERT_EQ(pBF3->numBits, 1437758784);
|
||||||
|
GTEST_ASSERT_EQ(pBF3->numUnits, 1437758784/64);
|
||||||
|
|
||||||
|
int64_t size = 10000;
|
||||||
|
SBloomFilter *pBF4 = tBloomFilterInit(size, 0.001);
|
||||||
|
for(int64_t i = 0; i < 1000; i++) {
|
||||||
|
int64_t ts = i + ts1;
|
||||||
|
GTEST_ASSERT_EQ(tBloomFilterPut(pBF4, &ts, sizeof(int64_t)), TSDB_CODE_SUCCESS);
|
||||||
|
}
|
||||||
|
ASSERT_TRUE(!tBloomFilterIsFull(pBF4));
|
||||||
|
|
||||||
|
for(int64_t i = 0; i < 1000; i++) {
|
||||||
|
int64_t ts = i + ts1;
|
||||||
|
GTEST_ASSERT_EQ(tBloomFilterNoContain(pBF4, &ts, sizeof(int64_t)), TSDB_CODE_FAILED);
|
||||||
|
}
|
||||||
|
|
||||||
|
for(int64_t i = 2000; i < 3000; i++) {
|
||||||
|
int64_t ts = i + ts1;
|
||||||
|
GTEST_ASSERT_EQ(tBloomFilterNoContain(pBF4, &ts, sizeof(int64_t)), TSDB_CODE_SUCCESS);
|
||||||
|
}
|
||||||
|
|
||||||
|
tBloomFilterDestroy(pBF1);
|
||||||
|
tBloomFilterDestroy(pBF2);
|
||||||
|
tBloomFilterDestroy(pBF3);
|
||||||
|
tBloomFilterDestroy(pBF4);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST(TD_UTIL_BLOOMFILTER_TEST, scalable_bloomFilter) {
|
||||||
|
int64_t ts1 = 1650803518000;
|
||||||
|
|
||||||
|
GTEST_ASSERT_EQ(NULL, tScalableBfInit(100, 0));
|
||||||
|
GTEST_ASSERT_EQ(NULL, tScalableBfInit(100, 1));
|
||||||
|
GTEST_ASSERT_EQ(NULL, tScalableBfInit(100, -0.1));
|
||||||
|
GTEST_ASSERT_EQ(NULL, tScalableBfInit(0, 0.01));
|
||||||
|
|
||||||
|
SScalableBf *pSBF1 = tScalableBfInit(100, 0.01);
|
||||||
|
GTEST_ASSERT_EQ(pSBF1->numBits, 1152);
|
||||||
|
int64_t count = 0;
|
||||||
|
int64_t index = 0;
|
||||||
|
for( ; count < 100; index++) {
|
||||||
|
int64_t ts = index + ts1;
|
||||||
|
if(tScalableBfPut(pSBF1, &ts, sizeof(int64_t)) == TSDB_CODE_SUCCESS ) {
|
||||||
|
count++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
GTEST_ASSERT_EQ(pSBF1->numBits, 1152);
|
||||||
|
|
||||||
|
for( ; count < 300; index++) {
|
||||||
|
int64_t ts = index + ts1;
|
||||||
|
if(tScalableBfPut(pSBF1, &ts, sizeof(int64_t)) == TSDB_CODE_SUCCESS ) {
|
||||||
|
count++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
GTEST_ASSERT_EQ(pSBF1->numBits, 1152+2496);
|
||||||
|
|
||||||
|
for( ; count < 700; index++) {
|
||||||
|
int64_t ts = index + ts1;
|
||||||
|
if(tScalableBfPut(pSBF1, &ts, sizeof(int64_t)) == TSDB_CODE_SUCCESS ) {
|
||||||
|
count++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
GTEST_ASSERT_EQ(pSBF1->numBits, 1152+2496+5568);
|
||||||
|
|
||||||
|
for( ; count < 1500; index++) {
|
||||||
|
int64_t ts = index + ts1;
|
||||||
|
if(tScalableBfPut(pSBF1, &ts, sizeof(int64_t)) == TSDB_CODE_SUCCESS ) {
|
||||||
|
count++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
GTEST_ASSERT_EQ(pSBF1->numBits, 1152+2496+5568+12288);
|
||||||
|
|
||||||
|
int32_t aSize = taosArrayGetSize(pSBF1->bfArray);
|
||||||
|
int64_t totalBits = 0;
|
||||||
|
for(int64_t i = 0; i < aSize; i++) {
|
||||||
|
SBloomFilter *pBF = (SBloomFilter *)taosArrayGetP(pSBF1->bfArray, i);
|
||||||
|
ASSERT_TRUE(tBloomFilterIsFull(pBF));
|
||||||
|
totalBits += pBF->numBits;
|
||||||
|
}
|
||||||
|
GTEST_ASSERT_EQ(pSBF1->numBits, totalBits);
|
||||||
|
|
||||||
|
for(int64_t i = 0; i < index; i++) {
|
||||||
|
int64_t ts = i + ts1;
|
||||||
|
GTEST_ASSERT_EQ(tScalableBfNoContain(pSBF1, &ts, sizeof(int64_t)), TSDB_CODE_FAILED);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
int64_t size = 10000;
|
||||||
|
SScalableBf *pSBF4 = tScalableBfInit(size, 0.001);
|
||||||
|
for(int64_t i = 0; i < 1000; i++) {
|
||||||
|
int64_t ts = i + ts1;
|
||||||
|
GTEST_ASSERT_EQ(tScalableBfPut(pSBF4, &ts, sizeof(int64_t)), TSDB_CODE_SUCCESS);
|
||||||
|
}
|
||||||
|
|
||||||
|
for(int64_t i = 0; i < 1000; i++) {
|
||||||
|
int64_t ts = i + ts1;
|
||||||
|
GTEST_ASSERT_EQ(tScalableBfNoContain(pSBF4, &ts, sizeof(int64_t)), TSDB_CODE_FAILED);
|
||||||
|
}
|
||||||
|
|
||||||
|
for(int64_t i = 2000; i < 3000; i++) {
|
||||||
|
int64_t ts = i + ts1;
|
||||||
|
GTEST_ASSERT_EQ(tScalableBfNoContain(pSBF4, &ts, sizeof(int64_t)), TSDB_CODE_SUCCESS);
|
||||||
|
}
|
||||||
|
|
||||||
|
tScalableBfDestroy(pSBF1);
|
||||||
|
tScalableBfDestroy(pSBF4);
|
||||||
|
|
||||||
|
}
|
|
@ -81,7 +81,7 @@
|
||||||
./test.sh -f tsim/insert/backquote.sim -m
|
./test.sh -f tsim/insert/backquote.sim -m
|
||||||
./test.sh -f tsim/parser/fourArithmetic-basic.sim -m
|
./test.sh -f tsim/parser/fourArithmetic-basic.sim -m
|
||||||
./test.sh -f tsim/query/interval-offset.sim -m
|
./test.sh -f tsim/query/interval-offset.sim -m
|
||||||
#./test.sh -f tsim/tmq/basic1.sim -m
|
./test.sh -f tsim/tmq/basic3.sim -m
|
||||||
./test.sh -f tsim/stable/vnode3.sim -m
|
./test.sh -f tsim/stable/vnode3.sim -m
|
||||||
./test.sh -f tsim/qnode/basic1.sim -m
|
./test.sh -f tsim/qnode/basic1.sim -m
|
||||||
./test.sh -f tsim/mnode/basic1.sim -m
|
./test.sh -f tsim/mnode/basic1.sim -m
|
||||||
|
|
Loading…
Reference in New Issue