add STSma defintion
This commit is contained in:
parent
0ea7e453bc
commit
ffeba1b1d4
|
@ -1,12 +1,12 @@
|
||||||
|
|
||||||
# stub
|
# stub
|
||||||
ExternalProject_Add(stub
|
#ExternalProject_Add(stub
|
||||||
GIT_REPOSITORY https://github.com/coolxv/cpp-stub.git
|
# GIT_REPOSITORY https://github.com/coolxv/cpp-stub.git
|
||||||
GIT_SUBMODULES "src"
|
# GIT_SUBMODULES "src"
|
||||||
SOURCE_DIR "${CMAKE_CONTRIB_DIR}/cpp-stub"
|
# SOURCE_DIR "${CMAKE_CONTRIB_DIR}/cpp-stub"
|
||||||
BINARY_DIR "${CMAKE_CONTRIB_DIR}/cpp-stub/src"
|
# BINARY_DIR "${CMAKE_CONTRIB_DIR}/cpp-stub/src"
|
||||||
CONFIGURE_COMMAND ""
|
# CONFIGURE_COMMAND ""
|
||||||
BUILD_COMMAND ""
|
# BUILD_COMMAND ""
|
||||||
INSTALL_COMMAND ""
|
# INSTALL_COMMAND ""
|
||||||
TEST_COMMAND ""
|
# TEST_COMMAND ""
|
||||||
)
|
#)
|
||||||
|
|
|
@ -1834,6 +1834,135 @@ static FORCE_INLINE void* tDecodeSSchemaWrapper(void* buf, SSchemaWrapper* pSW)
|
||||||
return buf;
|
return buf;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
uint8_t version; // for compatibility
|
||||||
|
uint8_t intervalUnit;
|
||||||
|
uint8_t slidingUnit;
|
||||||
|
char indexName[TSDB_INDEX_NAME_LEN + 1];
|
||||||
|
col_id_t numOfColIds;
|
||||||
|
uint16_t numOfFuncIds;
|
||||||
|
uint64_t tableUid; // super/common table uid
|
||||||
|
int64_t interval;
|
||||||
|
int64_t sliding;
|
||||||
|
col_id_t* colIds; // N.B. sorted column ids
|
||||||
|
uint16_t* funcIds; // N.B. sorted sma function ids
|
||||||
|
} STSma; // Time-range-wise SMA
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
uint32_t number;
|
||||||
|
STSma* tSma;
|
||||||
|
} STSmaWrapper;
|
||||||
|
|
||||||
|
static FORCE_INLINE void tdDestroyTSma(STSma* pSma, bool releaseSelf) {
|
||||||
|
if (pSma) {
|
||||||
|
tfree(pSma->colIds);
|
||||||
|
tfree(pSma->funcIds);
|
||||||
|
if (releaseSelf) {
|
||||||
|
free(pSma);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static FORCE_INLINE int32_t tEncodeTSma(void** buf, const STSma* pSma) {
|
||||||
|
int32_t tlen = 0;
|
||||||
|
|
||||||
|
tlen += taosEncodeFixedU8(buf, pSma->version);
|
||||||
|
tlen += taosEncodeFixedU8(buf, pSma->intervalUnit);
|
||||||
|
tlen += taosEncodeFixedU8(buf, pSma->slidingUnit);
|
||||||
|
tlen += taosEncodeString(buf, pSma->indexName);
|
||||||
|
tlen += taosEncodeFixedU16(buf, pSma->numOfColIds);
|
||||||
|
tlen += taosEncodeFixedU16(buf, pSma->numOfFuncIds);
|
||||||
|
tlen += taosEncodeFixedU64(buf, pSma->tableUid);
|
||||||
|
tlen += taosEncodeFixedI64(buf, pSma->interval);
|
||||||
|
tlen += taosEncodeFixedI64(buf, pSma->sliding);
|
||||||
|
|
||||||
|
for (uint16_t i = 0; i < pSma->numOfColIds; ++i) {
|
||||||
|
tlen += taosEncodeFixedU16(buf, *(pSma->colIds + i));
|
||||||
|
}
|
||||||
|
for (uint16_t i = 0; i < pSma->numOfFuncIds; ++i) {
|
||||||
|
tlen += taosEncodeFixedU16(buf, *(pSma->funcIds + i));
|
||||||
|
}
|
||||||
|
return tlen;
|
||||||
|
}
|
||||||
|
|
||||||
|
static FORCE_INLINE int32_t tEncodeTSmaWrapper(void** buf, const STSmaWrapper* pSW) {
|
||||||
|
int32_t tlen = 0;
|
||||||
|
|
||||||
|
tlen += taosEncodeFixedU32(buf, pSW->number);
|
||||||
|
for (uint32_t i = 0; i < pSW->number; ++i) {
|
||||||
|
tlen += tEncodeTSma(buf, pSW->tSma + i);
|
||||||
|
}
|
||||||
|
return tlen;
|
||||||
|
}
|
||||||
|
|
||||||
|
static FORCE_INLINE void* tDecodeTSma(void* buf, STSma* pSma) {
|
||||||
|
buf = taosDecodeFixedU8(buf, &pSma->version);
|
||||||
|
buf = taosDecodeFixedU8(buf, &pSma->intervalUnit);
|
||||||
|
buf = taosDecodeFixedU8(buf, &pSma->slidingUnit);
|
||||||
|
buf = taosDecodeStringTo(buf, pSma->indexName);
|
||||||
|
buf = taosDecodeFixedU16(buf, &pSma->numOfColIds);
|
||||||
|
buf = taosDecodeFixedU16(buf, &pSma->numOfFuncIds);
|
||||||
|
buf = taosDecodeFixedU64(buf, &pSma->tableUid);
|
||||||
|
buf = taosDecodeFixedI64(buf, &pSma->interval);
|
||||||
|
buf = taosDecodeFixedI64(buf, &pSma->sliding);
|
||||||
|
|
||||||
|
if (pSma->numOfColIds > 0) {
|
||||||
|
pSma->colIds = (col_id_t*)calloc(pSma->numOfColIds, sizeof(STSma));
|
||||||
|
if (pSma->colIds == NULL) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
for (uint16_t i = 0; i < pSma->numOfColIds; ++i) {
|
||||||
|
buf = taosDecodeFixedU16(buf, pSma->colIds + i);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pSma->numOfFuncIds > 0) {
|
||||||
|
pSma->funcIds = (uint16_t*)calloc(pSma->numOfFuncIds, sizeof(STSma));
|
||||||
|
if (pSma->funcIds == NULL) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
for (uint16_t i = 0; i < pSma->numOfFuncIds; ++i) {
|
||||||
|
buf = taosDecodeFixedU16(buf, pSma->funcIds + i);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return buf;
|
||||||
|
}
|
||||||
|
|
||||||
|
static FORCE_INLINE void* tDecodeTSmaWrapper(void* buf, STSmaWrapper* pSW) {
|
||||||
|
buf = taosDecodeFixedU32(buf, &pSW->number);
|
||||||
|
|
||||||
|
pSW->tSma = (STSma*)calloc(pSW->number, sizeof(STSma));
|
||||||
|
if (pSW->tSma == NULL) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (uint32_t i = 0; i < pSW->number; ++i) {
|
||||||
|
if ((buf = tDecodeTSma(buf, pSW->tSma + i)) == NULL) {
|
||||||
|
for (uint32_t j = i; j >= 0; --i) {
|
||||||
|
tdDestroyTSma(pSW->tSma + j, false);
|
||||||
|
}
|
||||||
|
free(pSW->tSma);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return buf;
|
||||||
|
}
|
||||||
|
|
||||||
|
// RSma: Time-range-wise Rollup SMA
|
||||||
|
typedef struct {
|
||||||
|
int64_t interval;
|
||||||
|
int32_t retention; // unit: day
|
||||||
|
uint16_t days; // unit: day
|
||||||
|
int8_t intervalUnit;
|
||||||
|
} SSmaParams;
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
STSma tsma;
|
||||||
|
float xFilesFactor;
|
||||||
|
SArray* smaParams; // SSmaParams
|
||||||
|
} SRSma;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int64_t uid;
|
int64_t uid;
|
||||||
int32_t numOfRows;
|
int32_t numOfRows;
|
||||||
|
|
|
@ -206,6 +206,7 @@ typedef enum ELogicConditionType {
|
||||||
#define TSDB_FUNC_TYPE_AGGREGATE 2
|
#define TSDB_FUNC_TYPE_AGGREGATE 2
|
||||||
#define TSDB_FUNC_MAX_RETRIEVE 1024
|
#define TSDB_FUNC_MAX_RETRIEVE 1024
|
||||||
|
|
||||||
|
#define TSDB_INDEX_NAME_LEN 32
|
||||||
#define TSDB_TYPE_STR_MAX_LEN 32
|
#define TSDB_TYPE_STR_MAX_LEN 32
|
||||||
#define TSDB_TABLE_FNAME_LEN (TSDB_DB_FNAME_LEN + TSDB_TABLE_NAME_LEN + TSDB_NAME_DELIMITER_LEN)
|
#define TSDB_TABLE_FNAME_LEN (TSDB_DB_FNAME_LEN + TSDB_TABLE_NAME_LEN + TSDB_NAME_DELIMITER_LEN)
|
||||||
#define TSDB_TOPIC_FNAME_LEN TSDB_TABLE_FNAME_LEN
|
#define TSDB_TOPIC_FNAME_LEN TSDB_TABLE_FNAME_LEN
|
||||||
|
|
|
@ -38,6 +38,7 @@ struct SMetaDB {
|
||||||
// DB
|
// DB
|
||||||
DB *pTbDB;
|
DB *pTbDB;
|
||||||
DB *pSchemaDB;
|
DB *pSchemaDB;
|
||||||
|
|
||||||
// IDX
|
// IDX
|
||||||
DB *pNameIdx;
|
DB *pNameIdx;
|
||||||
DB *pStbIdx;
|
DB *pStbIdx;
|
||||||
|
|
|
@ -41,10 +41,8 @@ int vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
||||||
SVCreateTbReq vCreateTbReq;
|
void *ptr = NULL;
|
||||||
SVCreateTbBatchReq vCreateTbBatchReq;
|
|
||||||
void *ptr = NULL;
|
|
||||||
|
|
||||||
if (pVnode->config.streamMode == 0) {
|
if (pVnode->config.streamMode == 0) {
|
||||||
ptr = vnodeMalloc(pVnode, pMsg->contLen);
|
ptr = vnodeMalloc(pVnode, pMsg->contLen);
|
||||||
|
@ -64,7 +62,8 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
||||||
}
|
}
|
||||||
|
|
||||||
switch (pMsg->msgType) {
|
switch (pMsg->msgType) {
|
||||||
case TDMT_VND_CREATE_STB:
|
case TDMT_VND_CREATE_STB: {
|
||||||
|
SVCreateTbReq vCreateTbReq = {0};
|
||||||
tDeserializeSVCreateTbReq(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &vCreateTbReq);
|
tDeserializeSVCreateTbReq(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &vCreateTbReq);
|
||||||
if (metaCreateTable(pVnode->pMeta, &(vCreateTbReq)) < 0) {
|
if (metaCreateTable(pVnode->pMeta, &(vCreateTbReq)) < 0) {
|
||||||
// TODO: handle error
|
// TODO: handle error
|
||||||
|
@ -75,7 +74,9 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
||||||
free(vCreateTbReq.stbCfg.pTagSchema);
|
free(vCreateTbReq.stbCfg.pTagSchema);
|
||||||
free(vCreateTbReq.name);
|
free(vCreateTbReq.name);
|
||||||
break;
|
break;
|
||||||
case TDMT_VND_CREATE_TABLE:
|
}
|
||||||
|
case TDMT_VND_CREATE_TABLE: {
|
||||||
|
SVCreateTbBatchReq vCreateTbBatchReq = {0};
|
||||||
tDeserializeSVCreateTbBatchReq(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &vCreateTbBatchReq);
|
tDeserializeSVCreateTbBatchReq(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &vCreateTbBatchReq);
|
||||||
for (int i = 0; i < taosArrayGetSize(vCreateTbBatchReq.pArray); i++) {
|
for (int i = 0; i < taosArrayGetSize(vCreateTbBatchReq.pArray); i++) {
|
||||||
SVCreateTbReq *pCreateTbReq = taosArrayGet(vCreateTbBatchReq.pArray, i);
|
SVCreateTbReq *pCreateTbReq = taosArrayGet(vCreateTbBatchReq.pArray, i);
|
||||||
|
@ -97,14 +98,16 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
||||||
vTrace("vgId:%d process create %" PRIzu " tables", pVnode->vgId, taosArrayGetSize(vCreateTbBatchReq.pArray));
|
vTrace("vgId:%d process create %" PRIzu " tables", pVnode->vgId, taosArrayGetSize(vCreateTbBatchReq.pArray));
|
||||||
taosArrayDestroy(vCreateTbBatchReq.pArray);
|
taosArrayDestroy(vCreateTbBatchReq.pArray);
|
||||||
break;
|
break;
|
||||||
|
}
|
||||||
case TDMT_VND_ALTER_STB:
|
case TDMT_VND_ALTER_STB: {
|
||||||
|
SVCreateTbReq vAlterTbReq = {0};
|
||||||
vTrace("vgId:%d, process alter stb req", pVnode->vgId);
|
vTrace("vgId:%d, process alter stb req", pVnode->vgId);
|
||||||
tDeserializeSVCreateTbReq(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &vCreateTbReq);
|
tDeserializeSVCreateTbReq(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &vAlterTbReq);
|
||||||
free(vCreateTbReq.stbCfg.pSchema);
|
free(vAlterTbReq.stbCfg.pSchema);
|
||||||
free(vCreateTbReq.stbCfg.pTagSchema);
|
free(vAlterTbReq.stbCfg.pTagSchema);
|
||||||
free(vCreateTbReq.name);
|
free(vAlterTbReq.name);
|
||||||
break;
|
break;
|
||||||
|
}
|
||||||
case TDMT_VND_DROP_STB:
|
case TDMT_VND_DROP_STB:
|
||||||
vTrace("vgId:%d, process drop stb req", pVnode->vgId);
|
vTrace("vgId:%d, process drop stb req", pVnode->vgId);
|
||||||
break;
|
break;
|
||||||
|
|
|
@ -1,20 +1,39 @@
|
||||||
add_executable(tqTest "")
|
MESSAGE(STATUS "vnode unit test")
|
||||||
target_sources(tqTest
|
|
||||||
PRIVATE
|
# GoogleTest requires at least C++11
|
||||||
"tqMetaTest.cpp"
|
SET(CMAKE_CXX_STANDARD 11)
|
||||||
)
|
AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST)
|
||||||
target_include_directories(tqTest
|
|
||||||
PUBLIC
|
# add_executable(tqTest "")
|
||||||
"${CMAKE_SOURCE_DIR}/include/server/vnode/tq"
|
# target_sources(tqTest
|
||||||
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
|
# PRIVATE
|
||||||
|
# "tqMetaTest.cpp"
|
||||||
|
# )
|
||||||
|
# target_include_directories(tqTest
|
||||||
|
# PUBLIC
|
||||||
|
# "${CMAKE_SOURCE_DIR}/include/server/vnode/tq"
|
||||||
|
# "${CMAKE_CURRENT_SOURCE_DIR}/../inc"
|
||||||
|
# )
|
||||||
|
|
||||||
|
# target_link_libraries(tqTest
|
||||||
|
# tq
|
||||||
|
# gtest_main
|
||||||
|
# )
|
||||||
|
# enable_testing()
|
||||||
|
# add_test(
|
||||||
|
# NAME tq_test
|
||||||
|
# COMMAND tqTest
|
||||||
|
# )
|
||||||
|
|
||||||
|
ADD_EXECUTABLE(tsdbSmaTest tsdbSmaTest.cpp)
|
||||||
|
TARGET_LINK_LIBRARIES(
|
||||||
|
tsdbSmaTest
|
||||||
|
PUBLIC os util common vnode gtest_main
|
||||||
)
|
)
|
||||||
|
|
||||||
target_link_libraries(tqTest
|
TARGET_INCLUDE_DIRECTORIES(
|
||||||
tq
|
tsdbSmaTest
|
||||||
gtest_main
|
PUBLIC "${CMAKE_SOURCE_DIR}/include/common"
|
||||||
)
|
PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/../src/inc"
|
||||||
enable_testing()
|
PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/../inc"
|
||||||
add_test(
|
|
||||||
NAME tq_test
|
|
||||||
COMMAND tqTest
|
|
||||||
)
|
)
|
||||||
|
|
|
@ -0,0 +1,76 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include <gtest/gtest.h>
|
||||||
|
#include <taoserror.h>
|
||||||
|
#include <tglobal.h>
|
||||||
|
#include <iostream>
|
||||||
|
|
||||||
|
#pragma GCC diagnostic push
|
||||||
|
#pragma GCC diagnostic ignored "-Wwrite-strings"
|
||||||
|
#pragma GCC diagnostic ignored "-Wunused-function"
|
||||||
|
#pragma GCC diagnostic ignored "-Wunused-variable"
|
||||||
|
#pragma GCC diagnostic ignored "-Wsign-compare"
|
||||||
|
|
||||||
|
#include "tsdbDef.h"
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
int main(int argc, char** argv) {
|
||||||
|
testing::InitGoogleTest(&argc, argv);
|
||||||
|
return RUN_ALL_TESTS();
|
||||||
|
}
|
||||||
|
|
||||||
|
#if 0
|
||||||
|
TEST(testCase, tSmaInsertTest) {
|
||||||
|
STSma tSma = {0};
|
||||||
|
STSmaData* pSmaData = NULL;
|
||||||
|
STsdb tsdb = {0};
|
||||||
|
|
||||||
|
// init
|
||||||
|
tSma.intervalUnit = TD_TIME_UNIT_DAY;
|
||||||
|
tSma.interval = 1;
|
||||||
|
tSma.numOfFuncIds = 5; // sum/min/max/avg/last
|
||||||
|
|
||||||
|
int32_t blockSize = tSma.numOfFuncIds * sizeof(int64_t);
|
||||||
|
int32_t numOfColIds = 3;
|
||||||
|
int32_t numOfSmaBlocks = 10;
|
||||||
|
|
||||||
|
int32_t dataLen = numOfColIds * numOfSmaBlocks * blockSize;
|
||||||
|
|
||||||
|
pSmaData = (STSmaData*)malloc(sizeof(STSmaData) + dataLen);
|
||||||
|
ASSERT_EQ(pSmaData != NULL, true);
|
||||||
|
pSmaData->tableUid = 3232329230;
|
||||||
|
pSmaData->numOfColIds = numOfColIds;
|
||||||
|
pSmaData->numOfSmaBlocks = numOfSmaBlocks;
|
||||||
|
pSmaData->dataLen = dataLen;
|
||||||
|
pSmaData->tsWindow.skey = 1640000000;
|
||||||
|
pSmaData->tsWindow.ekey = 1645788649;
|
||||||
|
pSmaData->colIds = (col_id_t*)malloc(sizeof(col_id_t) * numOfColIds);
|
||||||
|
ASSERT_EQ(pSmaData->colIds != NULL, true);
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < numOfColIds; ++i) {
|
||||||
|
*(pSmaData->colIds + i) = (i + PRIMARYKEY_TIMESTAMP_COL_ID);
|
||||||
|
}
|
||||||
|
|
||||||
|
// execute
|
||||||
|
EXPECT_EQ(tsdbInsertTSmaData(&tsdb, &tSma, pSmaData), TSDB_CODE_SUCCESS);
|
||||||
|
|
||||||
|
// release
|
||||||
|
tdDestroySmaData(pSmaData);
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
#pragma GCC diagnostic pop
|
Loading…
Reference in New Issue