diff --git a/cmake/stub_CMakeLists.txt.in b/cmake/stub_CMakeLists.txt.in index e227e820d6..c768239006 100644 --- a/cmake/stub_CMakeLists.txt.in +++ b/cmake/stub_CMakeLists.txt.in @@ -1,12 +1,12 @@ # stub -ExternalProject_Add(stub - GIT_REPOSITORY https://github.com/coolxv/cpp-stub.git - GIT_SUBMODULES "src" - SOURCE_DIR "${CMAKE_CONTRIB_DIR}/cpp-stub" - BINARY_DIR "${CMAKE_CONTRIB_DIR}/cpp-stub/src" - CONFIGURE_COMMAND "" - BUILD_COMMAND "" - INSTALL_COMMAND "" - TEST_COMMAND "" -) +#ExternalProject_Add(stub +# GIT_REPOSITORY https://github.com/coolxv/cpp-stub.git +# GIT_SUBMODULES "src" +# SOURCE_DIR "${CMAKE_CONTRIB_DIR}/cpp-stub" +# BINARY_DIR "${CMAKE_CONTRIB_DIR}/cpp-stub/src" +# CONFIGURE_COMMAND "" +# BUILD_COMMAND "" +# INSTALL_COMMAND "" +# TEST_COMMAND "" +#) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index ab0472a575..0dd7a513d6 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1834,6 +1834,135 @@ static FORCE_INLINE void* tDecodeSSchemaWrapper(void* buf, SSchemaWrapper* pSW) return buf; } +typedef struct { + uint8_t version; // for compatibility + uint8_t intervalUnit; + uint8_t slidingUnit; + char indexName[TSDB_INDEX_NAME_LEN + 1]; + col_id_t numOfColIds; + uint16_t numOfFuncIds; + uint64_t tableUid; // super/common table uid + int64_t interval; + int64_t sliding; + col_id_t* colIds; // N.B. sorted column ids + uint16_t* funcIds; // N.B. sorted sma function ids +} STSma; // Time-range-wise SMA + +typedef struct { + uint32_t number; + STSma* tSma; +} STSmaWrapper; + +static FORCE_INLINE void tdDestroyTSma(STSma* pSma, bool releaseSelf) { + if (pSma) { + tfree(pSma->colIds); + tfree(pSma->funcIds); + if (releaseSelf) { + free(pSma); + } + } +} + +static FORCE_INLINE int32_t tEncodeTSma(void** buf, const STSma* pSma) { + int32_t tlen = 0; + + tlen += taosEncodeFixedU8(buf, pSma->version); + tlen += taosEncodeFixedU8(buf, pSma->intervalUnit); + tlen += taosEncodeFixedU8(buf, pSma->slidingUnit); + tlen += taosEncodeString(buf, pSma->indexName); + tlen += taosEncodeFixedU16(buf, pSma->numOfColIds); + tlen += taosEncodeFixedU16(buf, pSma->numOfFuncIds); + tlen += taosEncodeFixedU64(buf, pSma->tableUid); + tlen += taosEncodeFixedI64(buf, pSma->interval); + tlen += taosEncodeFixedI64(buf, pSma->sliding); + + for (uint16_t i = 0; i < pSma->numOfColIds; ++i) { + tlen += taosEncodeFixedU16(buf, *(pSma->colIds + i)); + } + for (uint16_t i = 0; i < pSma->numOfFuncIds; ++i) { + tlen += taosEncodeFixedU16(buf, *(pSma->funcIds + i)); + } + return tlen; +} + +static FORCE_INLINE int32_t tEncodeTSmaWrapper(void** buf, const STSmaWrapper* pSW) { + int32_t tlen = 0; + + tlen += taosEncodeFixedU32(buf, pSW->number); + for (uint32_t i = 0; i < pSW->number; ++i) { + tlen += tEncodeTSma(buf, pSW->tSma + i); + } + return tlen; +} + +static FORCE_INLINE void* tDecodeTSma(void* buf, STSma* pSma) { + buf = taosDecodeFixedU8(buf, &pSma->version); + buf = taosDecodeFixedU8(buf, &pSma->intervalUnit); + buf = taosDecodeFixedU8(buf, &pSma->slidingUnit); + buf = taosDecodeStringTo(buf, pSma->indexName); + buf = taosDecodeFixedU16(buf, &pSma->numOfColIds); + buf = taosDecodeFixedU16(buf, &pSma->numOfFuncIds); + buf = taosDecodeFixedU64(buf, &pSma->tableUid); + buf = taosDecodeFixedI64(buf, &pSma->interval); + buf = taosDecodeFixedI64(buf, &pSma->sliding); + + if (pSma->numOfColIds > 0) { + pSma->colIds = (col_id_t*)calloc(pSma->numOfColIds, sizeof(STSma)); + if (pSma->colIds == NULL) { + return NULL; + } + for (uint16_t i = 0; i < pSma->numOfColIds; ++i) { + buf = taosDecodeFixedU16(buf, pSma->colIds + i); + } + } + + if (pSma->numOfFuncIds > 0) { + pSma->funcIds = (uint16_t*)calloc(pSma->numOfFuncIds, sizeof(STSma)); + if (pSma->funcIds == NULL) { + return NULL; + } + for (uint16_t i = 0; i < pSma->numOfFuncIds; ++i) { + buf = taosDecodeFixedU16(buf, pSma->funcIds + i); + } + } + + return buf; +} + +static FORCE_INLINE void* tDecodeTSmaWrapper(void* buf, STSmaWrapper* pSW) { + buf = taosDecodeFixedU32(buf, &pSW->number); + + pSW->tSma = (STSma*)calloc(pSW->number, sizeof(STSma)); + if (pSW->tSma == NULL) { + return NULL; + } + + for (uint32_t i = 0; i < pSW->number; ++i) { + if ((buf = tDecodeTSma(buf, pSW->tSma + i)) == NULL) { + for (uint32_t j = i; j >= 0; --i) { + tdDestroyTSma(pSW->tSma + j, false); + } + free(pSW->tSma); + return NULL; + } + } + return buf; +} + +// RSma: Time-range-wise Rollup SMA +typedef struct { + int64_t interval; + int32_t retention; // unit: day + uint16_t days; // unit: day + int8_t intervalUnit; +} SSmaParams; + +typedef struct { + STSma tsma; + float xFilesFactor; + SArray* smaParams; // SSmaParams +} SRSma; + typedef struct { int64_t uid; int32_t numOfRows; diff --git a/include/util/tdef.h b/include/util/tdef.h index a1b4fc11cf..9695c2e4c8 100644 --- a/include/util/tdef.h +++ b/include/util/tdef.h @@ -206,6 +206,7 @@ typedef enum ELogicConditionType { #define TSDB_FUNC_TYPE_AGGREGATE 2 #define TSDB_FUNC_MAX_RETRIEVE 1024 +#define TSDB_INDEX_NAME_LEN 32 #define TSDB_TYPE_STR_MAX_LEN 32 #define TSDB_TABLE_FNAME_LEN (TSDB_DB_FNAME_LEN + TSDB_TABLE_NAME_LEN + TSDB_NAME_DELIMITER_LEN) #define TSDB_TOPIC_FNAME_LEN TSDB_TABLE_FNAME_LEN diff --git a/source/dnode/vnode/src/meta/metaBDBImpl.c b/source/dnode/vnode/src/meta/metaBDBImpl.c index 3c464391d2..91a7a91f3a 100644 --- a/source/dnode/vnode/src/meta/metaBDBImpl.c +++ b/source/dnode/vnode/src/meta/metaBDBImpl.c @@ -38,6 +38,7 @@ struct SMetaDB { // DB DB *pTbDB; DB *pSchemaDB; + // IDX DB *pNameIdx; DB *pStbIdx; diff --git a/source/dnode/vnode/src/vnd/vnodeWrite.c b/source/dnode/vnode/src/vnd/vnodeWrite.c index 81eb09f48f..a2616307ff 100644 --- a/source/dnode/vnode/src/vnd/vnodeWrite.c +++ b/source/dnode/vnode/src/vnd/vnodeWrite.c @@ -41,10 +41,8 @@ int vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs) { return 0; } -int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { - SVCreateTbReq vCreateTbReq; - SVCreateTbBatchReq vCreateTbBatchReq; - void *ptr = NULL; +int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { + void *ptr = NULL; if (pVnode->config.streamMode == 0) { ptr = vnodeMalloc(pVnode, pMsg->contLen); @@ -64,7 +62,8 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { } switch (pMsg->msgType) { - case TDMT_VND_CREATE_STB: + case TDMT_VND_CREATE_STB: { + SVCreateTbReq vCreateTbReq = {0}; tDeserializeSVCreateTbReq(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &vCreateTbReq); if (metaCreateTable(pVnode->pMeta, &(vCreateTbReq)) < 0) { // TODO: handle error @@ -75,7 +74,9 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { free(vCreateTbReq.stbCfg.pTagSchema); free(vCreateTbReq.name); break; - case TDMT_VND_CREATE_TABLE: + } + case TDMT_VND_CREATE_TABLE: { + SVCreateTbBatchReq vCreateTbBatchReq = {0}; tDeserializeSVCreateTbBatchReq(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &vCreateTbBatchReq); for (int i = 0; i < taosArrayGetSize(vCreateTbBatchReq.pArray); i++) { SVCreateTbReq *pCreateTbReq = taosArrayGet(vCreateTbBatchReq.pArray, i); @@ -97,14 +98,16 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { vTrace("vgId:%d process create %" PRIzu " tables", pVnode->vgId, taosArrayGetSize(vCreateTbBatchReq.pArray)); taosArrayDestroy(vCreateTbBatchReq.pArray); break; - - case TDMT_VND_ALTER_STB: + } + case TDMT_VND_ALTER_STB: { + SVCreateTbReq vAlterTbReq = {0}; vTrace("vgId:%d, process alter stb req", pVnode->vgId); - tDeserializeSVCreateTbReq(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &vCreateTbReq); - free(vCreateTbReq.stbCfg.pSchema); - free(vCreateTbReq.stbCfg.pTagSchema); - free(vCreateTbReq.name); + tDeserializeSVCreateTbReq(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &vAlterTbReq); + free(vAlterTbReq.stbCfg.pSchema); + free(vAlterTbReq.stbCfg.pTagSchema); + free(vAlterTbReq.name); break; + } case TDMT_VND_DROP_STB: vTrace("vgId:%d, process drop stb req", pVnode->vgId); break; diff --git a/source/dnode/vnode/test/CMakeLists.txt b/source/dnode/vnode/test/CMakeLists.txt index 6b468497d5..40d016300e 100644 --- a/source/dnode/vnode/test/CMakeLists.txt +++ b/source/dnode/vnode/test/CMakeLists.txt @@ -1,20 +1,39 @@ -add_executable(tqTest "") -target_sources(tqTest - PRIVATE - "tqMetaTest.cpp" -) -target_include_directories(tqTest - PUBLIC - "${CMAKE_SOURCE_DIR}/include/server/vnode/tq" - "${CMAKE_CURRENT_SOURCE_DIR}/../inc" +MESSAGE(STATUS "vnode unit test") + +# GoogleTest requires at least C++11 +SET(CMAKE_CXX_STANDARD 11) +AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST) + +# add_executable(tqTest "") +# target_sources(tqTest +# PRIVATE +# "tqMetaTest.cpp" +# ) +# target_include_directories(tqTest +# PUBLIC +# "${CMAKE_SOURCE_DIR}/include/server/vnode/tq" +# "${CMAKE_CURRENT_SOURCE_DIR}/../inc" +# ) + +# target_link_libraries(tqTest +# tq +# gtest_main +# ) +# enable_testing() +# add_test( +# NAME tq_test +# COMMAND tqTest +# ) + +ADD_EXECUTABLE(tsdbSmaTest tsdbSmaTest.cpp) +TARGET_LINK_LIBRARIES( + tsdbSmaTest + PUBLIC os util common vnode gtest_main ) -target_link_libraries(tqTest - tq - gtest_main -) -enable_testing() -add_test( - NAME tq_test - COMMAND tqTest +TARGET_INCLUDE_DIRECTORIES( + tsdbSmaTest + PUBLIC "${CMAKE_SOURCE_DIR}/include/common" + PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/../src/inc" + PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/../inc" ) diff --git a/source/dnode/vnode/test/tsdbSmaTest.cpp b/source/dnode/vnode/test/tsdbSmaTest.cpp new file mode 100644 index 0000000000..90158b68b6 --- /dev/null +++ b/source/dnode/vnode/test/tsdbSmaTest.cpp @@ -0,0 +1,76 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include +#include +#include +#include + +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wwrite-strings" +#pragma GCC diagnostic ignored "-Wunused-function" +#pragma GCC diagnostic ignored "-Wunused-variable" +#pragma GCC diagnostic ignored "-Wsign-compare" + +#include "tsdbDef.h" + + + +int main(int argc, char** argv) { + testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} + +#if 0 +TEST(testCase, tSmaInsertTest) { + STSma tSma = {0}; + STSmaData* pSmaData = NULL; + STsdb tsdb = {0}; + + // init + tSma.intervalUnit = TD_TIME_UNIT_DAY; + tSma.interval = 1; + tSma.numOfFuncIds = 5; // sum/min/max/avg/last + + int32_t blockSize = tSma.numOfFuncIds * sizeof(int64_t); + int32_t numOfColIds = 3; + int32_t numOfSmaBlocks = 10; + + int32_t dataLen = numOfColIds * numOfSmaBlocks * blockSize; + + pSmaData = (STSmaData*)malloc(sizeof(STSmaData) + dataLen); + ASSERT_EQ(pSmaData != NULL, true); + pSmaData->tableUid = 3232329230; + pSmaData->numOfColIds = numOfColIds; + pSmaData->numOfSmaBlocks = numOfSmaBlocks; + pSmaData->dataLen = dataLen; + pSmaData->tsWindow.skey = 1640000000; + pSmaData->tsWindow.ekey = 1645788649; + pSmaData->colIds = (col_id_t*)malloc(sizeof(col_id_t) * numOfColIds); + ASSERT_EQ(pSmaData->colIds != NULL, true); + + for (int32_t i = 0; i < numOfColIds; ++i) { + *(pSmaData->colIds + i) = (i + PRIMARYKEY_TIMESTAMP_COL_ID); + } + + // execute + EXPECT_EQ(tsdbInsertTSmaData(&tsdb, &tSma, pSmaData), TSDB_CODE_SUCCESS); + + // release + tdDestroySmaData(pSmaData); +} +#endif + +#pragma GCC diagnostic pop \ No newline at end of file