diff --git a/include/common/tcommon.h b/include/common/tcommon.h index cc51f96f6c..2e646f4769 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -37,14 +37,6 @@ enum { TMQ_MSG_TYPE__EP_RSP, }; -enum { - STREAM_TRIGGER__AT_ONCE = 1, - STREAM_TRIGGER__WINDOW_CLOSE, - STREAM_TRIGGER__BY_COUNT, - STREAM_TRIGGER__BY_BATCH_COUNT, - STREAM_TRIGGER__BY_EVENT_TIME, -}; - typedef enum EStreamType { STREAM_NORMAL = 1, STREAM_INVERT, diff --git a/include/common/tglobal.h b/include/common/tglobal.h index 30ae6c2adb..1b44b6d7ea 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -96,6 +96,7 @@ extern bool tsDeadLockKillQuery; // query client extern int32_t tsQueryPolicy; +extern int32_t tsQuerySmaOptimize; // client extern int32_t tsMinSlidingTime; diff --git a/include/util/tlrucache.h b/include/util/tlrucache.h new file mode 100644 index 0000000000..5aee50c42a --- /dev/null +++ b/include/util/tlrucache.h @@ -0,0 +1,71 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_UTIL_LRUCACHE_H_ +#define _TD_UTIL_LRUCACHE_H_ + +#include "thash.h" + +#ifdef __cplusplus +extern "C" { +#endif + +typedef struct SLRUCache SLRUCache; + +typedef void (*_taos_lru_deleter_t)(const void *key, size_t keyLen, void *value); + +typedef struct LRUHandle LRUHandle; + +typedef enum { + TAOS_LRU_PRIORITY_HIGH, + TAOS_LRU_PRIORITY_LOW +} LRUPriority; + +typedef enum { + TAOS_LRU_STATUS_OK, + TAOS_LRU_STATUS_FAIL, + TAOS_LRU_STATUS_INCOMPLETE, + TAOS_LRU_STATUS_OK_OVERWRITTEN +} LRUStatus; + +SLRUCache *taosLRUCacheInit(size_t capacity, int numShardBits, double highPriPoolRatio); +void taosLRUCacheCleanup(SLRUCache *cache); + +LRUStatus taosLRUCacheInsert(SLRUCache *cache, const void *key, size_t keyLen, void *value, size_t charge, + _taos_lru_deleter_t deleter, LRUHandle **handle, LRUPriority priority); +LRUHandle *taosLRUCacheLookup(SLRUCache * cache, const void *key, size_t keyLen); +void taosLRUCacheErase(SLRUCache * cache, const void *key, size_t keyLen); + +void taosLRUCacheEraseUnrefEntries(SLRUCache *cache); + +bool taosLRUCacheRef(SLRUCache *cache, LRUHandle *handle); +bool taosLRUCacheRelease(SLRUCache *cache, LRUHandle *handle, bool eraseIfLastRef); + +void* taosLRUCacheValue(SLRUCache *cache, LRUHandle *handle); + +size_t taosLRUCacheGetUsage(SLRUCache *cache); +size_t taosLRUCacheGetPinnedUsage(SLRUCache *cache); + +void taosLRUCacheSetCapacity(SLRUCache *cache, size_t capacity); +size_t taosLRUCacheGetCapacity(SLRUCache *cache); + +void taosLRUCacheSetStrictCapacity(SLRUCache *cache, bool strict); +bool taosLRUCacheIsStrictCapacity(SLRUCache *cache); + +#ifdef __cplusplus +} +#endif + +#endif // _TD_UTIL_LRUCACHE_H_ diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index 171f06c257..bb60624145 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -167,7 +167,7 @@ void *createTscObj(const char *user, const char *auth, const char *db, int32_t c taosThreadMutexInit(&pObj->mutex, NULL); pObj->id = taosAddRef(clientConnRefPool, pObj); - pObj->schemalessType = 0; + pObj->schemalessType = 1; tscDebug("connObj created, 0x%" PRIx64, pObj->id); return pObj; diff --git a/source/client/src/clientSml.c b/source/client/src/clientSml.c index 68bfa99ead..bbf9eba363 100644 --- a/source/client/src/clientSml.c +++ b/source/client/src/clientSml.c @@ -306,19 +306,10 @@ static int32_t smlApplySchemaAction(SSmlHandle* info, SSchemaAction* action) { const char* errStr = taos_errstr(res); if (code != TSDB_CODE_SUCCESS) { uError("SML:0x%"PRIx64" apply schema action. error: %s", info->id, errStr); + taosMsleep(100); } taos_free_result(res); -// if (code == TSDB_CODE_MND_FIELD_ALREADY_EXIST || code == TSDB_CODE_MND_TAG_ALREADY_EXIST || tscDupColNames) { - if (code == TSDB_CODE_MND_TAG_ALREADY_EXIST) { - TAOS_RES* res2 = taos_query(info->taos, "RESET QUERY CACHE"); - code = taos_errno(res2); - if (code != TSDB_CODE_SUCCESS) { - uError("SML:0x%" PRIx64 " apply schema action. reset query cache. error: %s", info->id, taos_errstr(res2)); - } - taos_free_result(res2); - taosMsleep(500); - } break; } case SCHEMA_ACTION_ADD_TAG: { @@ -330,19 +321,10 @@ static int32_t smlApplySchemaAction(SSmlHandle* info, SSchemaAction* action) { const char* errStr = taos_errstr(res); if (code != TSDB_CODE_SUCCESS) { uError("SML:0x%"PRIx64" apply schema action. error : %s", info->id, taos_errstr(res)); + taosMsleep(100); } taos_free_result(res); -// if (code ==TSDB_CODE_MND_TAG_ALREADY_EXIST || code == TSDB_CODE_MND_FIELD_ALREAY_EXIST || tscDupColNames) { - if (code ==TSDB_CODE_MND_TAG_ALREADY_EXIST) { - TAOS_RES* res2 = taos_query(info->taos, "RESET QUERY CACHE"); - code = taos_errno(res2); - if (code != TSDB_CODE_SUCCESS) { - uError("SML:0x%" PRIx64 " apply schema action. reset query cache. error: %s", info->id, taos_errstr(res2)); - } - taos_free_result(res2); - taosMsleep(500); - } break; } case SCHEMA_ACTION_CHANGE_COLUMN_SIZE: { @@ -353,19 +335,10 @@ static int32_t smlApplySchemaAction(SSmlHandle* info, SSchemaAction* action) { code = taos_errno(res); if (code != TSDB_CODE_SUCCESS) { uError("SML:0x%"PRIx64" apply schema action. error : %s", info->id, taos_errstr(res)); + taosMsleep(100); } taos_free_result(res); -// if (code == TSDB_CODE_MND_INVALID_COLUMN_LENGTH || code == TSDB_CODE_TSC_INVALID_COLUMN_LENGTH) { - if (code == TSDB_CODE_TSC_INVALID_COLUMN_LENGTH) { - TAOS_RES* res2 = taos_query(info->taos, "RESET QUERY CACHE"); - code = taos_errno(res2); - if (code != TSDB_CODE_SUCCESS) { - uError("SML:0x%" PRIx64 " apply schema action. reset query cache. error: %s", info->id, taos_errstr(res2)); - } - taos_free_result(res2); - taosMsleep(500); - } break; } case SCHEMA_ACTION_CHANGE_TAG_SIZE: { @@ -376,19 +349,10 @@ static int32_t smlApplySchemaAction(SSmlHandle* info, SSchemaAction* action) { code = taos_errno(res); if (code != TSDB_CODE_SUCCESS) { uError("SML:0x%"PRIx64" apply schema action. error : %s", info->id, taos_errstr(res)); + taosMsleep(100); } taos_free_result(res); -// if (code == TSDB_CODE_MND_INVALID_TAG_LENGTH || code == TSDB_CODE_TSC_INVALID_TAG_LENGTH) { - if (code == TSDB_CODE_TSC_INVALID_TAG_LENGTH) { - TAOS_RES* res2 = taos_query(info->taos, "RESET QUERY CACHE"); - code = taos_errno(res2); - if (code != TSDB_CODE_SUCCESS) { - uError("SML:0x%" PRIx64 " apply schema action. reset query cache. error: %s", info->id, taos_errstr(res2)); - } - taos_free_result(res2); - taosMsleep(500); - } break; } case SCHEMA_ACTION_CREATE_STABLE: { @@ -428,18 +392,10 @@ static int32_t smlApplySchemaAction(SSmlHandle* info, SSchemaAction* action) { code = taos_errno(res); if (code != TSDB_CODE_SUCCESS) { uError("SML:0x%"PRIx64" apply schema action. error : %s", info->id, taos_errstr(res)); + taosMsleep(100); } taos_free_result(res); - if (code == TSDB_CODE_MND_STB_ALREADY_EXIST) { - TAOS_RES* res2 = taos_query(info->taos, "RESET QUERY CACHE"); - code = taos_errno(res2); - if (code != TSDB_CODE_SUCCESS) { - uError("SML:0x%" PRIx64 " apply schema action. reset query cache. error: %s", info->id, taos_errstr(res2)); - } - taos_free_result(res2); - taosMsleep(500); - } break; } @@ -473,6 +429,21 @@ static int32_t smlProcessSchemaAction(SSmlHandle* info, SSchema* schemaField, SH return TSDB_CODE_SUCCESS; } +static int32_t smlCheckMeta(SSchema* schema, int32_t length, SArray* cols){ + SHashObj *hashTmp = taosHashInit(length, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); + for(uint16_t i = 0; i < length; i++){ + taosHashPut(hashTmp, schema[i].name, strlen(schema[i].name), &i, SHORT_BYTES); + } + + for(int32_t i = 0; i < taosArrayGetSize(cols); i++){ + SSmlKv* kv = (SSmlKv*)taosArrayGetP(cols, i); + if(taosHashGet(hashTmp, kv->key, kv->keyLen) == NULL){ + return -1; + } + } + return 0; +} + static int32_t smlModifyDBSchemas(SSmlHandle* info) { int32_t code = 0; SEpSet ep = getEpSet_s(&info->taos->pAppInfo->mgmtEp); @@ -483,6 +454,7 @@ static int32_t smlModifyDBSchemas(SSmlHandle* info) { while (tableMetaSml) { SSmlSTableMeta* sTableData = *tableMetaSml; STableMeta *pTableMeta = NULL; + bool needCheckMeta = false; // for multi thread size_t superTableLen = 0; void *superTable = taosHashGetKey(tableMetaSml, &superTableLen); @@ -533,6 +505,7 @@ static int32_t smlModifyDBSchemas(SSmlHandle* info) { if (code != TSDB_CODE_SUCCESS) { goto end; } + needCheckMeta = true; } else { uError("SML:0x%"PRIx64" load table meta error: %s", info->id, tstrerror(code)); goto end; @@ -544,6 +517,20 @@ static int32_t smlModifyDBSchemas(SSmlHandle* info) { uError("SML:0x%"PRIx64" catalogGetSTableMeta failed. super table name %s", info->id, (char*)superTable); goto end; } + + if(needCheckMeta){ + code = smlCheckMeta(&(pTableMeta->schema[pTableMeta->tableInfo.numOfColumns]), pTableMeta->tableInfo.numOfTags, sTableData->tags); + if (code != TSDB_CODE_SUCCESS) { + uError("SML:0x%"PRIx64" check tag failed. super table name %s", info->id, (char*)superTable); + goto end; + } + code = smlCheckMeta(&(pTableMeta->schema[0]), pTableMeta->tableInfo.numOfColumns, sTableData->cols); + if (code != TSDB_CODE_SUCCESS) { + uError("SML:0x%"PRIx64" check cols failed. super table name %s", info->id, (char*)superTable); + goto end; + } + } + sTableData->tableMeta = pTableMeta; tableMetaSml = (SSmlSTableMeta**)taosHashIterate(info->superTables, tableMetaSml); @@ -2368,6 +2355,7 @@ static void smlInsertCallback(void* param, void* res, int32_t code) { SRequestObj *pRequest = (SRequestObj *)res; SSmlHandle* info = (SSmlHandle *)param; + uDebug("SML:0x%"PRIx64" result. code:%d, msg:%s", info->id, pRequest->code, pRequest->msgBuf); // lock if(code != TSDB_CODE_SUCCESS){ taosThreadSpinLock(&info->params->lock); @@ -2496,8 +2484,9 @@ TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int pr end: taosThreadSpinDestroy(¶ms.lock); tsem_destroy(¶ms.sem); - ((STscObj *)taos)->schemalessType = 0; - uDebug("result:%s", request->msgBuf); +// ((STscObj *)taos)->schemalessType = 0; + ((STscObj *)taos)->schemalessType = 1; + uDebug("resultend:%s", request->msgBuf); return (TAOS_RES*)request; } diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index 58905cac19..7d49c4206f 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -408,7 +408,7 @@ int32_t tmqCommitInner(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, int8_ pParam->userParam = userParam; if (!async) tsem_init(&pParam->rspSem, 0, 0); - sendInfo = taosMemoryMalloc(sizeof(SMsgSendInfo)); + sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); if (sendInfo == NULL) goto END; sendInfo->msgInfo = (SDataBuf){ .pData = buf, @@ -704,7 +704,7 @@ tmq_resp_err_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { void* abuf = buf; tSerializeSCMSubscribeReq(&abuf, &req); - SMsgSendInfo* sendInfo = taosMemoryMalloc(sizeof(SMsgSendInfo)); + SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); if (sendInfo == NULL) goto FAIL; SMqSubscribeCbParam param = { @@ -1008,7 +1008,7 @@ int32_t tmqAskEp(tmq_t* tmq, bool async) { pParam->async = async; tsem_init(&pParam->rspSem, 0, 0); - SMsgSendInfo* sendInfo = taosMemoryMalloc(sizeof(SMsgSendInfo)); + SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); if (sendInfo == NULL) { tsem_destroy(&pParam->rspSem); taosMemoryFree(pParam); @@ -1162,7 +1162,7 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) { pParam->vgId = pVg->vgId; pParam->epoch = tmq->epoch; - SMsgSendInfo* sendInfo = taosMemoryMalloc(sizeof(SMsgSendInfo)); + SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); if (sendInfo == NULL) { taosMemoryFree(pReq); taosMemoryFree(pParam); diff --git a/source/client/test/CMakeLists.txt b/source/client/test/CMakeLists.txt index bb9f0ed23e..03d2b48134 100644 --- a/source/client/test/CMakeLists.txt +++ b/source/client/test/CMakeLists.txt @@ -41,7 +41,7 @@ TARGET_INCLUDE_DIRECTORIES( PRIVATE "${TD_SOURCE_DIR}/source/client/inc" ) -#add_test( -# NAME smlTest -# COMMAND smlTest -#) +add_test( + NAME smlTest + COMMAND smlTest +) diff --git a/source/client/test/smlTest.cpp b/source/client/test/smlTest.cpp index 49e26a818f..4ad73a6424 100644 --- a/source/client/test/smlTest.cpp +++ b/source/client/test/smlTest.cpp @@ -499,6 +499,7 @@ TEST(testCase, smlGetTimestampLen_Test) { ASSERT_EQ(len, 3); } +/* TEST(testCase, smlProcess_influx_Test) { TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 0); ASSERT_NE(taos, nullptr); @@ -1259,4 +1260,4 @@ TEST(testCase, sml_16368_Test) { pRes = taos_schemaless_insert(taos, (char**)sql, sizeof(sql)/sizeof(sql[0]), TSDB_SML_JSON_PROTOCOL, TSDB_SML_TIMESTAMP_MICRO_SECONDS); ASSERT_EQ(taos_errno(pRes), 0); taos_free_result(pRes); -} +}*/ diff --git a/source/common/src/systable.c b/source/common/src/systable.c index bb00c28c13..08977abd61 100644 --- a/source/common/src/systable.c +++ b/source/common/src/systable.c @@ -91,7 +91,7 @@ static const SSysDbTableSchema userDBSchema[] = { {.name = "precision", .bytes = 2 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, {.name = "single_stable_model", .bytes = 1, .type = TSDB_DATA_TYPE_BOOL}, {.name = "status", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, - {.name = "schemaless", .bytes = 1, .type = TSDB_DATA_TYPE_BOOL}, +// {.name = "schemaless", .bytes = 1, .type = TSDB_DATA_TYPE_BOOL}, {.name = "retension", .bytes = 60 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, // {.name = "update", .bytes = 1, .type = TSDB_DATA_TYPE_TINYINT}, // disable update diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 91b740fc96..fbb4f78425 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -86,6 +86,7 @@ bool tsSmlDataFormat = // query int32_t tsQueryPolicy = 1; +int32_t tsQuerySmaOptimize = 1; /* * denote if the server needs to compress response message at the application layer to client, including query rsp, @@ -113,7 +114,7 @@ int32_t tsCompatibleModel = 1; int32_t tsCountAlwaysReturnValue = 1; // 10 ms for sliding time, the value will changed in case of time precision changed -int32_t tsMinSlidingTime = 10; +int32_t tsMinSlidingTime = 10; // the maxinum number of distict query result int32_t tsMaxNumOfDistinctResults = 1000 * 10000; @@ -331,6 +332,7 @@ static int32_t taosAddClientCfg(SConfig *pCfg) { if (cfgAddInt32(pCfg, "compressColData", tsCompressColData, -1, 100000000, 1) != 0) return -1; if (cfgAddBool(pCfg, "keepColumnName", tsKeepOriginalColumnName, 1) != 0) return -1; if (cfgAddInt32(pCfg, "queryPolicy", tsQueryPolicy, 1, 3, 1) != 0) return -1; + if (cfgAddInt32(pCfg, "querySmaOptimize", tsQuerySmaOptimize, 0, 1, 1) != 0) return -1; if (cfgAddString(pCfg, "smlChildTableName", "", 1) != 0) return -1; if (cfgAddString(pCfg, "smlTagName", tsSmlTagName, 1) != 0) return -1; if (cfgAddBool(pCfg, "smlDataFormat", tsSmlDataFormat, 1) != 0) return -1; @@ -541,6 +543,7 @@ static int32_t taosSetClientCfg(SConfig *pCfg) { tsKeepOriginalColumnName = cfgGetItem(pCfg, "keepColumnName")->bval; tsNumOfTaskQueueThreads = cfgGetItem(pCfg, "numOfTaskQueueThreads")->i32; tsQueryPolicy = cfgGetItem(pCfg, "queryPolicy")->i32; + tsQuerySmaOptimize = cfgGetItem(pCfg, "querySmaOptimize")->i32; return 0; } diff --git a/source/common/src/ttime.c b/source/common/src/ttime.c index 10ba58af29..0b59e9b6cc 100644 --- a/source/common/src/ttime.c +++ b/source/common/src/ttime.c @@ -76,22 +76,22 @@ void deltaToUtcInitOnce() { static int64_t parseFraction(char* str, char** end, int32_t timePrec); static int32_t parseTimeWithTz(const char* timestr, int64_t* time, int32_t timePrec, char delim); -static int32_t parseLocaltime(char* timestr, int64_t* time, int32_t timePrec); -static int32_t parseLocaltimeDst(char* timestr, int64_t* time, int32_t timePrec); +static int32_t parseLocaltime(char* timestr, int32_t len, int64_t* utime, int32_t timePrec); +static int32_t parseLocaltimeDst(char* timestr, int32_t len, int64_t* utime, int32_t timePrec); static char* forwardToTimeStringEnd(char* str); static bool checkTzPresent(const char* str, int32_t len); -static int32_t (*parseLocaltimeFp[])(char* timestr, int64_t* time, int32_t timePrec) = {parseLocaltime, +static int32_t (*parseLocaltimeFp[])(char* timestr, int32_t len, int64_t* utime, int32_t timePrec) = {parseLocaltime, parseLocaltimeDst}; -int32_t taosParseTime(const char* timestr, int64_t* time, int32_t len, int32_t timePrec, int8_t day_light) { +int32_t taosParseTime(const char* timestr, int64_t* utime, int32_t len, int32_t timePrec, int8_t day_light) { /* parse datatime string in with tz */ if (strnchr(timestr, 'T', len, false) != NULL) { - return parseTimeWithTz(timestr, time, timePrec, 'T'); + return parseTimeWithTz(timestr, utime, timePrec, 'T'); } else if (checkTzPresent(timestr, len)) { - return parseTimeWithTz(timestr, time, timePrec, 0); + return parseTimeWithTz(timestr, utime, timePrec, 0); } else { - return (*parseLocaltimeFp[day_light])((char*)timestr, time, timePrec); + return (*parseLocaltimeFp[day_light])((char*)timestr, len, utime, timePrec); } } @@ -309,12 +309,36 @@ int32_t parseTimeWithTz(const char* timestr, int64_t* time, int32_t timePrec, ch return 0; } -int32_t parseLocaltime(char* timestr, int64_t* time, int32_t timePrec) { +static FORCE_INLINE bool validateTm(struct tm* pTm) { + if (pTm == NULL) { + return false; + } + + int32_t dayOfMonth[12] = {31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31}; + + int32_t leapYearMonthDay = 29; + int32_t year = pTm->tm_year + 1900; + bool isLeapYear = ((year % 100) == 0)? ((year % 400) == 0):((year % 4) == 0); + + if (isLeapYear && (pTm->tm_mon == 1)) { + if (pTm->tm_mday > leapYearMonthDay) { + return false; + } + } else { + if (pTm->tm_mday > dayOfMonth[pTm->tm_mon]) { + return false; + } + } + + return true; +} + +int32_t parseLocaltime(char* timestr, int32_t len, int64_t* time, int32_t timePrec) { *time = 0; struct tm tm = {0}; char* str = taosStrpTime(timestr, "%Y-%m-%d %H:%M:%S", &tm); - if (str == NULL) { + if (str == NULL || (((str - timestr) < len) && (*str != '.')) || !validateTm(&tm)) { return -1; } @@ -343,13 +367,13 @@ int32_t parseLocaltime(char* timestr, int64_t* time, int32_t timePrec) { return 0; } -int32_t parseLocaltimeDst(char* timestr, int64_t* time, int32_t timePrec) { +int32_t parseLocaltimeDst(char* timestr, int32_t len, int64_t* time, int32_t timePrec) { *time = 0; struct tm tm = {0}; tm.tm_isdst = -1; char* str = taosStrpTime(timestr, "%Y-%m-%d %H:%M:%S", &tm); - if (str == NULL) { + if (str == NULL || (((str - timestr) < len) && (*str != '.')) || !validateTm(&tm)) { return -1; } diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index c20459829e..b4237466d4 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -1503,8 +1503,8 @@ static void dumpDbInfoData(SSDataBlock *pBlock, SDbObj *pDb, SShowObj *pShow, in pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataAppend(pColInfo, rows, (const char *)statusB, false); - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataAppend(pColInfo, rows, (const char *)&pDb->cfg.schemaless, false); +// pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); +// colDataAppend(pColInfo, rows, (const char *)&pDb->cfg.schemaless, false); char *p = buildRetension(pDb->cfg.pRetensions); diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index a4ddf96630..2454a4686d 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -351,9 +351,15 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { ASSERT(totLevel <= 2); pStream->tasks = taosArrayInit(totLevel, sizeof(void*)); - bool hasExtraSink = false; - bool externalTargetDB = strcmp(pStream->sourceDb, pStream->targetDb) != 0; - if (totLevel == 2 || externalTargetDB) { + bool hasExtraSink = false; + bool externalTargetDB = strcmp(pStream->sourceDb, pStream->targetDb) != 0; + SDbObj* pDbObj = mndAcquireDb(pMnode, pStream->targetDb); + ASSERT(pDbObj != NULL); + sdbRelease(pSdb, pDbObj); + + bool multiTarget = pDbObj->cfg.numOfVgroups > 1; + + if (totLevel == 2 || externalTargetDB || multiTarget) { SArray* taskOneLevel = taosArrayInit(0, sizeof(void*)); taosArrayPush(pStream->tasks, &taskOneLevel); // add extra sink diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index e810bb9908..4a5ea49d79 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -136,6 +136,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { pReq->subKey); return -1; } + if (pHandle->consumerId != consumerId) { tqError("tmq poll: consumer handle mismatch for consumer %ld in vg %d, subkey %s, handle consumer id %ld", consumerId, pTq->pVnode->config.vgId, pReq->subKey, pHandle->consumerId); diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index f5fb157870..812985f92a 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -142,10 +142,8 @@ typedef struct SElapsedInfo { typedef struct SHistoFuncBin { double lower; double upper; - union { - int64_t count; - double percentage; - }; + int64_t count; + double percentage; } SHistoFuncBin; typedef struct SHistoFuncInfo { @@ -2844,6 +2842,7 @@ void copyTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pS for (int32_t i = 0; i < pSrcBlock->info.numOfCols; ++i) { SColumnInfoData* pCol = taosArrayGet(pSrcBlock->pDataBlock, i); if ((nullList[i] = colDataIsNull_s(pCol, rowIndex)) == true) { + offset += pCol->info.bytes; continue; } @@ -3105,7 +3104,7 @@ int32_t spreadCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) { SResultRowEntryInfo* pSResInfo = GET_RES_INFO(pSourceCtx); SSpreadInfo* pSBuf = GET_ROWCELL_INTERBUF(pSResInfo); - spreadTransferInfo(pDBuf, pSBuf); + spreadTransferInfo(pSBuf, pDBuf); pDResInfo->numOfRes = TMAX(pDResInfo->numOfRes, pSResInfo->numOfRes); return TSDB_CODE_SUCCESS; } @@ -3276,7 +3275,7 @@ int32_t elapsedCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) { SResultRowEntryInfo* pSResInfo = GET_RES_INFO(pSourceCtx); SElapsedInfo* pSBuf = GET_ROWCELL_INTERBUF(pSResInfo); - elapsedTransferInfo(pDBuf, pSBuf); + elapsedTransferInfo(pSBuf, pDBuf); pDResInfo->numOfRes = TMAX(pDResInfo->numOfRes, pSResInfo->numOfRes); return TSDB_CODE_SUCCESS; } @@ -3583,7 +3582,7 @@ int32_t histogramCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) { SResultRowEntryInfo* pSResInfo = GET_RES_INFO(pSourceCtx); SHistoFuncInfo* pSBuf = GET_ROWCELL_INTERBUF(pSResInfo); - histogramTransferInfo(pDBuf, pSBuf); + histogramTransferInfo(pSBuf, pDBuf); pDResInfo->numOfRes = TMAX(pDResInfo->numOfRes, pSResInfo->numOfRes); return TSDB_CODE_SUCCESS; } @@ -3779,7 +3778,7 @@ int32_t hllCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) { SResultRowEntryInfo* pSResInfo = GET_RES_INFO(pSourceCtx); SHLLInfo* pSBuf = GET_ROWCELL_INTERBUF(pSResInfo); - hllTransferInfo(pDBuf, pSBuf); + hllTransferInfo(pSBuf, pDBuf); pDResInfo->numOfRes = TMAX(pDResInfo->numOfRes, pSResInfo->numOfRes); return TSDB_CODE_SUCCESS; } @@ -4187,6 +4186,8 @@ int32_t sampleFunction(SqlFunctionCtx* pCtx) { SColumnInfoData* pInputCol = pInput->pData[0]; SColumnInfoData* pOutput = (SColumnInfoData*)pCtx->pOutput; + int32_t alreadySampled = pInfo->numSampled; + int32_t startOffset = pCtx->offset; for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; i += 1) { if (colDataIsNull_s(pInputCol, i)) { diff --git a/source/libs/parser/inc/parInt.h b/source/libs/parser/inc/parInt.h index 3efe6700d2..adedcf0fd9 100644 --- a/source/libs/parser/inc/parInt.h +++ b/source/libs/parser/inc/parInt.h @@ -24,6 +24,9 @@ extern "C" { #include "parUtil.h" #include "parser.h" +#define QUERY_SMA_OPTIMIZE_DISABLE 0 +#define QUERY_SMA_OPTIMIZE_ENABLE 1 + int32_t parseInsertSyntax(SParseContext* pContext, SQuery** pQuery); int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery); int32_t parse(SParseContext* pParseCxt, SQuery** pQuery); diff --git a/source/libs/parser/src/parAstCreater.c b/source/libs/parser/src/parAstCreater.c index 054912d540..e7f4b05bc8 100644 --- a/source/libs/parser/src/parAstCreater.c +++ b/source/libs/parser/src/parAstCreater.c @@ -801,7 +801,8 @@ SNode* setDatabaseOption(SAstCreateContext* pCxt, SNode* pOptions, EDatabaseOpti ((SDatabaseOptions*)pOptions)->pRetentions = pVal; break; case DB_OPTION_SCHEMALESS: - ((SDatabaseOptions*)pOptions)->schemaless = taosStr2Int8(((SToken*)pVal)->z, NULL, 10); +// ((SDatabaseOptions*)pOptions)->schemaless = taosStr2Int8(((SToken*)pVal)->z, NULL, 10); + ((SDatabaseOptions*)pOptions)->schemaless = 1; break; default: break; diff --git a/source/libs/parser/src/parAstParser.c b/source/libs/parser/src/parAstParser.c index cc77b96f5f..aca7d9c9d3 100644 --- a/source/libs/parser/src/parAstParser.c +++ b/source/libs/parser/src/parAstParser.c @@ -19,6 +19,7 @@ #include "parInt.h" #include "parToken.h" #include "systable.h" +#include "tglobal.h" typedef void* (*FMalloc)(size_t); typedef void (*FFree)(void*); @@ -116,7 +117,7 @@ static EDealRes collectMetaKeyFromFunction(SCollectMetaKeyFromExprCxt* pCxt, SFu } static bool needGetTableIndex(SNode* pStmt) { - if (QUERY_NODE_SELECT_STMT == nodeType(pStmt)) { + if (QUERY_SMA_OPTIMIZE_ENABLE == tsQuerySmaOptimize && QUERY_NODE_SELECT_STMT == nodeType(pStmt)) { SSelectStmt* pSelect = (SSelectStmt*)pStmt; return (NULL != pSelect->pWindow && QUERY_NODE_INTERVAL_WINDOW == nodeType(pSelect->pWindow)); } diff --git a/source/libs/parser/src/parInsert.c b/source/libs/parser/src/parInsert.c index 54c8a18218..c721491489 100644 --- a/source/libs/parser/src/parInsert.c +++ b/source/libs/parser/src/parInsert.c @@ -1297,11 +1297,12 @@ static void destroyInsertParseContext(SInsertParseContext* pCxt) { } static int32_t checkSchemalessDb(SInsertParseContext* pCxt, char* pDbName) { - SDbCfgInfo pInfo = {0}; - char fullName[TSDB_TABLE_FNAME_LEN]; - snprintf(fullName, sizeof(fullName), "%d.%s", pCxt->pComCxt->acctId, pDbName); - CHECK_CODE(getDBCfg(pCxt, fullName, &pInfo)); - return pInfo.schemaless ? TSDB_CODE_SML_INVALID_DB_CONF : TSDB_CODE_SUCCESS; +// SDbCfgInfo pInfo = {0}; +// char fullName[TSDB_TABLE_FNAME_LEN]; +// snprintf(fullName, sizeof(fullName), "%d.%s", pCxt->pComCxt->acctId, pDbName); +// CHECK_CODE(getDBCfg(pCxt, fullName, &pInfo)); +// return pInfo.schemaless ? TSDB_CODE_SML_INVALID_DB_CONF : TSDB_CODE_SUCCESS; + return TSDB_CODE_SUCCESS; } // tb_name @@ -2119,9 +2120,11 @@ static int32_t smlBoundColumnData(SArray* cols, SParsedDataColInfo* pColList, SS isOrdered = false; } if (index < 0) { + uError("smlBoundColumnData. index:%d", index); return TSDB_CODE_SML_INVALID_DATA; } if (pColList->cols[index].valStat == VAL_STAT_HAS) { + uError("smlBoundColumnData. already set. index:%d", index); return TSDB_CODE_SML_INVALID_DATA; } lastColIdx = index; diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 8ca6332a8d..84bded8a58 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -1411,7 +1411,7 @@ static bool isSingleTable(SRealTableNode* pRealTable) { } static int32_t setTableIndex(STranslateContext* pCxt, SName* pName, SRealTableNode* pRealTable) { - if (pCxt->createStream) { + if (pCxt->createStream || QUERY_SMA_OPTIMIZE_DISABLE == tsQuerySmaOptimize) { return TSDB_CODE_SUCCESS; } if (NULL != pCxt->pCurrSelectStmt && NULL != pCxt->pCurrSelectStmt->pWindow && @@ -2762,15 +2762,16 @@ static int32_t checkTableSchema(STranslateContext* pCxt, SCreateTableStmt* pStmt } static int32_t checkSchemalessDb(STranslateContext* pCxt, const char* pDbName) { - if (0 != pCxt->pParseCxt->schemalessType) { - return TSDB_CODE_SUCCESS; - } - SDbCfgInfo info = {0}; - int32_t code = getDBCfg(pCxt, pDbName, &info); - if (TSDB_CODE_SUCCESS == code) { - code = info.schemaless ? TSDB_CODE_SML_INVALID_DB_CONF : TSDB_CODE_SUCCESS; - } - return code; +// if (0 != pCxt->pParseCxt->schemalessType) { +// return TSDB_CODE_SUCCESS; +// } +// SDbCfgInfo info = {0}; +// int32_t code = getDBCfg(pCxt, pDbName, &info); +// if (TSDB_CODE_SUCCESS == code) { +// code = info.schemaless ? TSDB_CODE_SML_INVALID_DB_CONF : TSDB_CODE_SUCCESS; +// } +// return code; + return TSDB_CODE_SUCCESS; } static int32_t checkCreateTable(STranslateContext* pCxt, SCreateTableStmt* pStmt) { @@ -5048,6 +5049,9 @@ static int32_t buildModifyVnodeArray(STranslateContext* pCxt, SAlterTableStmt* p static int32_t rewriteAlterTable(STranslateContext* pCxt, SQuery* pQuery) { SAlterTableStmt* pStmt = (SAlterTableStmt*)pQuery->pRoot; int32_t code = checkSchemalessDb(pCxt, pStmt->dbName); + if (TSDB_CODE_SUCCESS != code) { + return code; + } STableMeta* pTableMeta = NULL; code = getTableMeta(pCxt, pStmt->dbName, pStmt->tableName, &pTableMeta); if (TSDB_CODE_SUCCESS != code) { diff --git a/source/libs/parser/test/mockCatalogService.cpp b/source/libs/parser/test/mockCatalogService.cpp index 57d47b8a48..010d4a2f72 100644 --- a/source/libs/parser/test/mockCatalogService.cpp +++ b/source/libs/parser/test/mockCatalogService.cpp @@ -158,7 +158,9 @@ class MockCatalogServiceImpl { } *pIndexes = taosArrayInit(it->second.size(), sizeof(STableIndexInfo)); for (const auto& index : it->second) { - taosArrayPush(*pIndexes, &index); + STableIndexInfo info; + + taosArrayPush(*pIndexes, copyTableIndexInfo(&info, &index)); } return TSDB_CODE_SUCCESS; } @@ -316,6 +318,12 @@ class MockCatalogServiceImpl { pEpSet->inUse = 0; } + STableIndexInfo* copyTableIndexInfo(STableIndexInfo* pDst, const STableIndexInfo* pSrc) const { + memcpy(pDst, pSrc, sizeof(STableIndexInfo)); + pDst->expr = strdup(pSrc->expr); + return pDst; + } + std::string toDbname(const std::string& dbFullName) const { std::string::size_type n = dbFullName.find("."); if (n == std::string::npos) { diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index bffe520d6d..ae689c53d6 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -17,6 +17,7 @@ #include "functionMgt.h" #include "index.h" #include "planInt.h" +#include "ttime.h" #define OPTIMIZE_FLAG_MASK(n) (1 << n) @@ -816,7 +817,8 @@ static int32_t smaOptCreateSmaScan(SScanLogicNode* pScan, STableIndexInfo* pInde pSmaScan->dataRequired = FUNC_DATA_REQUIRED_DATA_LOAD; pSmaScan->pVgroupList = taosMemoryCalloc(1, sizeof(SVgroupsInfo) + sizeof(SVgroupInfo)); - if (NULL == pSmaScan->pVgroupList) { + pSmaScan->node.pTargets = nodesCloneList(pCols); + if (NULL == pSmaScan->pVgroupList || NULL == pSmaScan->node.pTargets) { nodesDestroyNode(pSmaScan); return TSDB_CODE_OUT_OF_MEMORY; } @@ -828,19 +830,26 @@ static int32_t smaOptCreateSmaScan(SScanLogicNode* pScan, STableIndexInfo* pInde return TSDB_CODE_SUCCESS; } -static bool smaOptEqualInterval(SWindowLogicNode* pWindow, STableIndexInfo* pIndex) { +static bool smaOptEqualInterval(SScanLogicNode* pScan, SWindowLogicNode* pWindow, STableIndexInfo* pIndex) { if (pWindow->interval != pIndex->interval || pWindow->intervalUnit != pIndex->intervalUnit || pWindow->offset != pIndex->offset || pWindow->sliding != pIndex->sliding || pWindow->slidingUnit != pIndex->slidingUnit) { return false; } - // todo time range + if (IS_TSWINDOW_SPECIFIED(pScan->scanRange)) { + SInterval interval = {.interval = pIndex->interval, + .intervalUnit = pIndex->intervalUnit, + .offset = pIndex->offset, + .offsetUnit = TIME_UNIT_MILLISECOND, + .sliding = pIndex->sliding, + .slidingUnit = pIndex->slidingUnit, + .precision = pScan->node.precision}; + return (pScan->scanRange.skey == taosTimeTruncate(pScan->scanRange.skey, &interval, pScan->node.precision)) && + (pScan->scanRange.ekey + 1 == taosTimeTruncate(pScan->scanRange.ekey + 1, &interval, pScan->node.precision)); + } return true; } -// #define SMA_TABLE_NAME "#sma_table" -// #define SMA_COL_NAME_PREFIX "#sma_col_" - static SNode* smaOptCreateSmaCol(SNode* pFunc, uint64_t tableId, int32_t colId) { SColumnNode* pCol = nodesMakeNode(QUERY_NODE_COLUMN); if (NULL == pCol) { @@ -850,9 +859,7 @@ static SNode* smaOptCreateSmaCol(SNode* pFunc, uint64_t tableId, int32_t colId) pCol->tableType = TSDB_SUPER_TABLE; pCol->colId = colId; pCol->colType = COLUMN_TYPE_COLUMN; - snprintf(pCol->colName, sizeof(pCol->colName), "#sma_col_%d", pCol->colId); - // strcpy(pCol->tableName, SMA_TABLE_NAME); - // strcpy(pCol->tableAlias, SMA_TABLE_NAME); + strcpy(pCol->colName, ((SExprNode*)pFunc)->aliasName); pCol->node.resType = ((SExprNode*)pFunc)->resType; strcpy(pCol->node.aliasName, ((SExprNode*)pFunc)->aliasName); return (SNode*)pCol; @@ -876,12 +883,13 @@ static int32_t smaOptCreateSmaCols(SNodeList* pFuncs, uint64_t tableId, SNodeLis SNode* pFunc = NULL; int32_t code = TSDB_CODE_SUCCESS; int32_t index = 0; + int32_t smaFuncIndex = -1; *pWStrartIndex = -1; FOREACH(pFunc, pFuncs) { if (FUNCTION_TYPE_WSTARTTS == ((SFunctionNode*)pFunc)->funcType) { *pWStrartIndex = index; } - int32_t smaFuncIndex = smaOptFindSmaFunc(pFunc, pSmaFuncs); + smaFuncIndex = smaOptFindSmaFunc(pFunc, pSmaFuncs); if (smaFuncIndex < 0) { break; } else { @@ -893,7 +901,7 @@ static int32_t smaOptCreateSmaCols(SNodeList* pFuncs, uint64_t tableId, SNodeLis ++index; } - if (TSDB_CODE_SUCCESS == code) { + if (TSDB_CODE_SUCCESS == code && smaFuncIndex >= 0) { *pOutput = pCols; } else { nodesDestroyList(pCols); @@ -902,9 +910,10 @@ static int32_t smaOptCreateSmaCols(SNodeList* pFuncs, uint64_t tableId, SNodeLis return code; } -static int32_t smaOptCouldApplyIndex(SWindowLogicNode* pWindow, STableIndexInfo* pIndex, SNodeList** pCols, +static int32_t smaOptCouldApplyIndex(SScanLogicNode* pScan, STableIndexInfo* pIndex, SNodeList** pCols, int32_t* pWStrartIndex) { - if (!smaOptEqualInterval(pWindow, pIndex)) { + SWindowLogicNode* pWindow = (SWindowLogicNode*)pScan->node.pParent; + if (!smaOptEqualInterval(pScan, pWindow, pIndex)) { return TSDB_CODE_SUCCESS; } SNodeList* pSmaFuncs = NULL; @@ -961,8 +970,8 @@ static int32_t smaOptRewriteInterval(SWindowLogicNode* pInterval, int32_t wstrar return smaOptCreateMergeKey(nodesListGetNode(pInterval->node.pTargets, wstrartIndex), pMergeKeys); } -static int32_t smaOptApplyIndex(SLogicSubplan* pLogicSubplan, SScanLogicNode* pScan, STableIndexInfo* pIndex, - SNodeList* pSmaCols, int32_t wstrartIndex) { +static int32_t smaOptApplyIndexExt(SLogicSubplan* pLogicSubplan, SScanLogicNode* pScan, STableIndexInfo* pIndex, + SNodeList* pSmaCols, int32_t wstrartIndex) { SWindowLogicNode* pInterval = (SWindowLogicNode*)pScan->node.pParent; SNodeList* pMergeTargets = nodesCloneList(pInterval->node.pTargets); if (NULL == pMergeTargets) { @@ -984,6 +993,16 @@ static int32_t smaOptApplyIndex(SLogicSubplan* pLogicSubplan, SScanLogicNode* pS return code; } +static int32_t smaOptApplyIndex(SLogicSubplan* pLogicSubplan, SScanLogicNode* pScan, STableIndexInfo* pIndex, + SNodeList* pSmaCols, int32_t wstrartIndex) { + SLogicNode* pSmaScan = NULL; + int32_t code = smaOptCreateSmaScan(pScan, pIndex, pSmaCols, &pSmaScan); + if (TSDB_CODE_SUCCESS == code) { + code = replaceLogicNode(pLogicSubplan, pScan->node.pParent, pSmaScan); + } + return code; +} + static void smaOptDestroySmaIndex(void* p) { taosMemoryFree(((STableIndexInfo*)p)->expr); } static int32_t smaOptimizeImpl(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan, SScanLogicNode* pScan) { @@ -993,7 +1012,7 @@ static int32_t smaOptimizeImpl(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubp STableIndexInfo* pIndex = taosArrayGet(pScan->pSmaIndexes, i); SNodeList* pSmaCols = NULL; int32_t wstrartIndex = -1; - code = smaOptCouldApplyIndex((SWindowLogicNode*)pScan->node.pParent, pIndex, &pSmaCols, &wstrartIndex); + code = smaOptCouldApplyIndex(pScan, pIndex, &pSmaCols, &wstrartIndex); if (TSDB_CODE_SUCCESS == code && NULL != pSmaCols) { code = smaOptApplyIndex(pLogicSubplan, pScan, pIndex, pSmaCols, wstrartIndex); taosArrayDestroyEx(pScan->pSmaIndexes, smaOptDestroySmaIndex); diff --git a/source/libs/planner/test/planOtherTest.cpp b/source/libs/planner/test/planOtherTest.cpp index 85f8b7d9f6..4bfb9a6fda 100644 --- a/source/libs/planner/test/planOtherTest.cpp +++ b/source/libs/planner/test/planOtherTest.cpp @@ -15,6 +15,7 @@ #include "planTestUtil.h" #include "planner.h" +#include "tglobal.h" using namespace std; @@ -45,6 +46,14 @@ TEST_F(PlanOtherTest, createSmaIndex) { run("CREATE SMA INDEX idx1 ON t1 FUNCTION(MAX(c1), MIN(c3 + 10), SUM(c4)) INTERVAL(10s)"); run("SELECT SUM(c4) FROM t1 INTERVAL(10s)"); + + run("SELECT _WSTARTTS, MIN(c3 + 10) FROM t1 " + "WHERE ts BETWEEN TIMESTAMP '2022-04-01 00:00:00' AND TIMESTAMP '2022-04-30 23:59:59.999' INTERVAL(10s)"); + + run("SELECT SUM(c4), MAX(c3) FROM t1 INTERVAL(10s)"); + + tsQuerySmaOptimize = 0; + run("SELECT SUM(c4) FROM t1 INTERVAL(10s)"); } TEST_F(PlanOtherTest, explain) { diff --git a/source/libs/planner/test/planSubqueryTest.cpp b/source/libs/planner/test/planSubqueryTest.cpp index f82e10e998..7d1ac84aea 100644 --- a/source/libs/planner/test/planSubqueryTest.cpp +++ b/source/libs/planner/test/planSubqueryTest.cpp @@ -28,6 +28,12 @@ TEST_F(PlanSubqeuryTest, basic) { run("SELECT LAST(c1) FROM (SELECT * FROM t1)"); run("SELECT c1 FROM (SELECT TODAY() AS c1 FROM t1)"); + + run("SELECT NOW() FROM t1"); + + run("SELECT NOW() FROM (SELECT * FROM t1)"); + + // run("SELECT NOW() FROM (SELECT * FROM t1) ORDER BY ts"); } TEST_F(PlanSubqeuryTest, doubleGroupBy) { diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 069595390d..d3828d9ee4 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -110,9 +110,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDisp return 0; } // continue dispatch - if (pTask->dispatchType != TASK_DISPATCH__NONE) { - streamDispatch(pTask, pMsgCb); - } + streamDispatch(pTask, pMsgCb); return 0; } diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index d523374638..59ec2b5ceb 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -182,6 +182,7 @@ FAIL: } int32_t streamDispatch(SStreamTask* pTask, SMsgCb* pMsgCb) { + ASSERT(pTask->dispatchType != TASK_DISPATCH__NONE); #if 1 int8_t old = atomic_val_compare_exchange_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL, TASK_OUTPUT_STATUS__WAIT); diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index b33f3481e7..29ee263756 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -713,7 +713,7 @@ static int32_t syncNodeMakeLogSame(SSyncNode* ths, SyncAppendEntries* pMsg) { // delete confict entries code = ths->pLogStore->syncLogTruncate(ths->pLogStore, delBegin); ASSERT(code == 0); - sInfo("sync event vgId:%d log truncate, from %ld to %ld", ths->vgId, delBegin, delEnd); + sDebug("vgId:%d sync event log truncate, from %ld to %ld", ths->vgId, delBegin, delEnd); logStoreSimpleLog2("after syncNodeMakeLogSame", ths->pLogStore); return code; @@ -994,7 +994,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs SyncIndex commitEnd = snapshot.lastApplyIndex; ths->commitIndex = snapshot.lastApplyIndex; - sInfo("sync event vgId:%d commit by snapshot from index:%ld to index:%ld, %s", ths->vgId, commitBegin, + sDebug("vgId:%d sync event commit by snapshot from index:%ld to index:%ld, %s", ths->vgId, commitBegin, commitEnd, syncUtilState2String(ths->state)); } diff --git a/source/libs/sync/src/syncAppendEntriesReply.c b/source/libs/sync/src/syncAppendEntriesReply.c index 5caf814cc5..0e116e13ee 100644 --- a/source/libs/sync/src/syncAppendEntriesReply.c +++ b/source/libs/sync/src/syncAppendEntriesReply.c @@ -189,16 +189,16 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries if (gRaftDetailLog) { char* s = snapshotSender2Str(pSender); - sInfo( - "sync event vgId:%d snapshot send to %s:%d start sender first time, lastApplyIndex:%ld lastApplyTerm:%lu " + sDebug( + "vgId:%d sync event snapshot send to %s:%d start sender first time, lastApplyIndex:%ld lastApplyTerm:%lu " "lastConfigIndex:%ld" "sender:%s", ths->vgId, host, port, pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex, s); taosMemoryFree(s); } else { - sInfo( - "sync event vgId:%d snapshot send to %s:%d start sender first time, lastApplyIndex:%ld " + sDebug( + "vgId:%d sync event snapshot send to %s:%d start sender first time, lastApplyIndex:%ld " "lastApplyTerm:%lu lastConfigIndex:%ld", ths->vgId, host, port, pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex); diff --git a/source/libs/sync/src/syncCommit.c b/source/libs/sync/src/syncCommit.c index 8236301f8e..eac9c1fbd8 100644 --- a/source/libs/sync/src/syncCommit.c +++ b/source/libs/sync/src/syncCommit.c @@ -56,7 +56,7 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { SyncIndex commitEnd = snapshot.lastApplyIndex; pSyncNode->commitIndex = snapshot.lastApplyIndex; - sInfo("sync event vgId:%d commit by snapshot from index:%ld to index:%ld, %s", pSyncNode->vgId, + sDebug("vgId:%d sync event commit by snapshot from index:%ld to index:%ld, %s", pSyncNode->vgId, pSyncNode->commitIndex, snapshot.lastApplyIndex, syncUtilState2String(pSyncNode->state)); } diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index c480df0ec0..4d410644bb 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -470,7 +470,7 @@ int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak) { return TAOS_SYNC_PROPOSE_OTHER_ERROR; } assert(rid == pSyncNode->rid); - sTrace("sync event vgId:%d propose msgType:%s", pSyncNode->vgId, TMSG_INFO(pMsg->msgType)); + sDebug("vgId:%d sync event propose msgType:%s", pSyncNode->vgId, TMSG_INFO(pMsg->msgType)); if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) { SRespStub stub; @@ -501,7 +501,7 @@ int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak) { SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) { SSyncInfo* pSyncInfo = (SSyncInfo*)pOldSyncInfo; - sInfo("sync event vgId:%d sync open", pSyncInfo->vgId); + sDebug("vgId:%d sync event sync open", pSyncInfo->vgId); SSyncNode* pSyncNode = (SSyncNode*)taosMemoryMalloc(sizeof(SSyncNode)); assert(pSyncNode != NULL); @@ -761,7 +761,7 @@ void syncNodeStartStandBy(SSyncNode* pSyncNode) { } void syncNodeClose(SSyncNode* pSyncNode) { - sInfo("sync event vgId:%d sync close", pSyncNode->vgId); + sDebug("vgId:%d sync event sync close", pSyncNode->vgId); int32_t ret; assert(pSyncNode != NULL); @@ -1240,7 +1240,8 @@ void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term) { } void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) { - sInfo("sync event vgId:%d become follower, isStandBy:%d, %s", pSyncNode->vgId, pSyncNode->pRaftCfg->isStandBy, debugStr); + sDebug("vgId:%d sync event become follower, isStandBy:%d, %s", pSyncNode->vgId, pSyncNode->pRaftCfg->isStandBy, + debugStr); // maybe clear leader cache if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) { @@ -1274,7 +1275,8 @@ void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) { // /\ UNCHANGED <> // void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) { - sInfo("sync event vgId:%d become leader, isStandBy:%d, %s", pSyncNode->vgId, pSyncNode->pRaftCfg->isStandBy, debugStr); + sDebug("vgId:%d sync event become leader, isStandBy:%d, %s", pSyncNode->vgId, pSyncNode->pRaftCfg->isStandBy, + debugStr); // state change pSyncNode->state = TAOS_SYNC_STATE_LEADER; @@ -1882,7 +1884,7 @@ static int32_t syncNodeConfigChange(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftE int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, uint64_t flag) { int32_t code = 0; ESyncState state = flag; - sInfo("sync event vgId:%d commit by wal from index:%" PRId64 " to index:%" PRId64 ", %s", ths->vgId, beginIndex, + sDebug("vgId:%d sync event commit by wal from index:%" PRId64 " to index:%" PRId64 ", %s", ths->vgId, beginIndex, endIndex, syncUtilState2String(state)); // execute fsm @@ -1931,7 +1933,7 @@ int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, ths->pFsm->FpRestoreFinishCb(ths->pFsm); } ths->restoreFinish = true; - sInfo("sync event vgId:%d restore finish", ths->vgId); + sDebug("vgId:%d sync event restore finish", ths->vgId); } } diff --git a/source/libs/sync/src/syncRaftLog.c b/source/libs/sync/src/syncRaftLog.c index 92699ab24d..996cd12e4a 100644 --- a/source/libs/sync/src/syncRaftLog.c +++ b/source/libs/sync/src/syncRaftLog.c @@ -162,7 +162,7 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr walFsync(pWal, true); - sTrace("sync event vgId:%d write index:%ld, %s, isStandBy:%d, msgType:%s, originalRpcType:%s", pData->pSyncNode->vgId, + sDebug("vgId:%d sync event write index:%ld, %s, isStandBy:%d, msgType:%s, originalRpcType:%s", pData->pSyncNode->vgId, pEntry->index, syncUtilState2String(pData->pSyncNode->state), pData->pSyncNode->pRaftCfg->isStandBy, TMSG_INFO(pEntry->msgType), TMSG_INFO(pEntry->originalRpcType)); @@ -320,7 +320,7 @@ int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) { walFsync(pWal, true); - sTrace("sync event old write wal: %ld", pEntry->index); + sDebug("sync event old write wal: %ld", pEntry->index); return code; } diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index 36598cc2bd..38742220fe 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -140,16 +140,16 @@ void snapshotSenderStart(SSyncSnapshotSender *pSender) { if (gRaftDetailLog) { char *msgStr = syncSnapshotSend2Str(pMsg); - sTrace( - "sync event vgId:%d snapshot send to %s:%d begin seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu " + sDebug( + "vgId:%d sync event snapshot send to %s:%d begin seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu " "lastConfigIndex:%ld send " "msg:%s", pSender->pSyncNode->vgId, host, port, pSender->seq, pSender->ack, pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex, msgStr); taosMemoryFree(msgStr); } else { - sTrace( - "sync event vgId:%d snapshot send to %s:%d begin seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu " + sDebug( + "vgId:%d sync event snapshot send to %s:%d begin seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu " "lastConfigIndex:%ld", pSender->pSyncNode->vgId, host, port, pSender->seq, pSender->ack, pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex); @@ -278,23 +278,23 @@ int32_t snapshotSend(SSyncSnapshotSender *pSender) { if (pSender->seq == SYNC_SNAPSHOT_SEQ_END) { if (gRaftDetailLog) { char *msgStr = syncSnapshotSend2Str(pMsg); - sTrace( - "sync event vgId:%d snapshot send to %s:%d finish seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu " + sDebug( + "vgId:%d sync event snapshot send to %s:%d finish seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu " "lastConfigIndex:%ld send " "msg:%s", pSender->pSyncNode->vgId, host, port, pSender->seq, pSender->ack, pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex, msgStr); taosMemoryFree(msgStr); } else { - sTrace( - "sync event vgId:%d snapshot send to %s:%d finish seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu " + sDebug( + "vgId:%d sync event snapshot send to %s:%d finish seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu " "lastConfigIndex:%ld", pSender->pSyncNode->vgId, host, port, pSender->seq, pSender->ack, pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex); } } else { - sTrace( - "sync event vgId:%d snapshot send to %s:%d sending seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu " + sDebug( + "vgId:%d sync event snapshot send to %s:%d sending seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu " "lastConfigIndex:%ld", pSender->pSyncNode->vgId, host, port, pSender->seq, pSender->ack, pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex); @@ -328,11 +328,11 @@ int32_t snapshotReSend(SSyncSnapshotSender *pSender) { if (gRaftDetailLog) { char *msgStr = syncSnapshotSend2Str(pMsg); - sTrace("sync event vgId:%d snapshot send to %s:%d resend seq:%d ack:%d send msg:%s", pSender->pSyncNode->vgId, + sDebug("vgId:%d sync event snapshot send to %s:%d resend seq:%d ack:%d send msg:%s", pSender->pSyncNode->vgId, host, port, pSender->seq, pSender->ack, msgStr); taosMemoryFree(msgStr); } else { - sTrace("sync event vgId:%d snapshot send to %s:%d resend seq:%d ack:%d", pSender->pSyncNode->vgId, host, port, + sDebug("vgId:%d sync event snapshot send to %s:%d resend seq:%d ack:%d", pSender->pSyncNode->vgId, host, port, pSender->seq, pSender->ack); } @@ -565,12 +565,17 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { if (gRaftDetailLog) { char *msgStr = syncSnapshotSend2Str(pMsg); - sTrace("sync event vgId:%d snapshot recv from %s:%d begin ack:%d, lastIndex:%ld, lastTerm:%lu, recv msg:%s", - pSyncNode->vgId, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, msgStr); + sDebug( + "vgId:%d sync event snapshot recv from %s:%d begin ack:%d, lastIndex:%ld, lastTerm:%lu, " + "lastConfigIndex:%ld, recv msg:%s", + pSyncNode->vgId, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex, + msgStr); taosMemoryFree(msgStr); } else { - sTrace("sync event vgId:%d snapshot recv from %s:%d begin ack:%d, lastIndex:%ld, lastTerm:%lu", - pSyncNode->vgId, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm); + sDebug( + "vgId:%d sync event snapshot recv from %s:%d begin ack:%d, lastIndex:%ld, lastTerm:%lu, " + "lastConfigIndex:%ld", + pSyncNode->vgId, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex); } } else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_END) { @@ -597,12 +602,12 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { bool isDrop; if (IamInNew) { - sTrace("sync event vgId:%d update config by snapshot, lastIndex:%ld, lastTerm:%lu, lastConfigIndex:%ld ", + sDebug("vgId:%d sync event update config by snapshot, lastIndex:%ld, lastTerm:%lu, lastConfigIndex:%ld ", pSyncNode->vgId, pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex); syncNodeUpdateConfig(pSyncNode, &newSyncCfg, pMsg->lastConfigIndex, &isDrop); } else { - sTrace( - "sync event vgId:%d do not update config by snapshot, I am not in newCfg, lastIndex:%ld, lastTerm:%lu, " + sDebug( + "vgId:%d sync event do not update config by snapshot, I am not in newCfg, lastIndex:%ld, lastTerm:%lu, " "lastConfigIndex:%ld ", pSyncNode->vgId, pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex); } @@ -626,19 +631,20 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { if (gRaftDetailLog) { char *logSimpleStr = logStoreSimple2Str(pSyncNode->pLogStore); - sInfo( - "sync event vgId:%d snapshot recv from %s:%d finish, update log begin index:%ld, " + sDebug( + "vgId:%d sync event snapshot recv from %s:%d finish, update log begin index:%ld, " "snapshot.lastApplyIndex:%ld, " - "snapshot.lastApplyTerm:%lu, raft log:%s", + "snapshot.lastApplyTerm:%lu, snapshot.lastConfigIndex:%ld, raft log:%s", pSyncNode->vgId, host, port, pMsg->lastIndex + 1, snapshot.lastApplyIndex, snapshot.lastApplyTerm, - logSimpleStr); + snapshot.lastConfigIndex, logSimpleStr); taosMemoryFree(logSimpleStr); } else { - sInfo( - "sync event vgId:%d snapshot recv from %s:%d finish, update log begin index:%ld, " + sDebug( + "vgId:%d sync event snapshot recv from %s:%d finish, update log begin index:%ld, " "snapshot.lastApplyIndex:%ld, " - "snapshot.lastApplyTerm:%lu", - pSyncNode->vgId, host, port, pMsg->lastIndex + 1, snapshot.lastApplyIndex, snapshot.lastApplyTerm); + "snapshot.lastApplyTerm:%lu, snapshot.lastConfigIndex:%ld", + pSyncNode->vgId, host, port, pMsg->lastIndex + 1, snapshot.lastApplyIndex, snapshot.lastApplyTerm, + snapshot.lastConfigIndex); } pReceiver->pWriter = NULL; @@ -648,12 +654,18 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { if (gRaftDetailLog) { char *msgStr = syncSnapshotSend2Str(pMsg); - sTrace("sync event vgId:%d snapshot recv from %s:%d end ack:%d, lastIndex:%ld, lastTerm:%lu, recv msg:%s", - pReceiver->pSyncNode->vgId, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, msgStr); + sDebug( + "vgId:%d sync event snapshot recv from %s:%d end ack:%d, lastIndex:%ld, lastTerm:%lu, " + "lastConfigIndex:%ld, recv msg:%s", + pReceiver->pSyncNode->vgId, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, + pMsg->lastConfigIndex, msgStr); taosMemoryFree(msgStr); } else { - sTrace("sync event vgId:%d snapshot recv from %s:%d end ack:%d, lastIndex:%ld, lastTerm:%lu", - pReceiver->pSyncNode->vgId, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm); + sDebug( + "vgId:%d sync event snapshot recv from %s:%d end ack:%d, lastIndex:%ld, lastTerm:%lu, " + "lastConfigIndex:%ld", + pReceiver->pSyncNode->vgId, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, + pMsg->lastConfigIndex); } } else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_FORCE_CLOSE) { @@ -667,14 +679,19 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { if (gRaftDetailLog) { char *msgStr = syncSnapshotSend2Str(pMsg); - sTrace( - "sync event vgId:%d snapshot recv from %s:%d force close ack:%d, lastIndex:%ld, lastTerm:%lu, recv " + sDebug( + "vgId:%d sync event snapshot recv from %s:%d force close ack:%d, lastIndex:%ld, lastTerm:%lu, " + "lastConfigIndex:%ld, recv " "msg:%s", - pReceiver->pSyncNode->vgId, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, msgStr); + pReceiver->pSyncNode->vgId, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, + pMsg->lastConfigIndex, msgStr); taosMemoryFree(msgStr); } else { - sTrace("sync event vgId:%d snapshot recv from %s:%d force close ack:%d, lastIndex:%ld, lastTerm:%lu", - pReceiver->pSyncNode->vgId, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm); + sDebug( + "vgId:%d sync event snapshot recv from %s:%d force close ack:%d, lastIndex:%ld, lastTerm:%lu, " + "lastConfigIndex:%ld", + pReceiver->pSyncNode->vgId, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, + pMsg->lastConfigIndex); } } else if (pMsg->seq > SYNC_SNAPSHOT_SEQ_BEGIN && pMsg->seq < SYNC_SNAPSHOT_SEQ_END) { @@ -693,13 +710,17 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { if (gRaftDetailLog) { char *msgStr = syncSnapshotSend2Str(pMsg); - sTrace( - "sync event vgId:%d snapshot recv from %s:%d receiving ack:%d, lastIndex:%ld, lastTerm:%lu, recv msg:%s", - pSyncNode->vgId, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, msgStr); + sDebug( + "vgId:%d sync event snapshot recv from %s:%d receiving ack:%d, lastIndex:%ld, lastTerm:%lu, " + "lastConfigIndex:%ld, recv msg:%s", + pSyncNode->vgId, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex, + msgStr); taosMemoryFree(msgStr); } else { - sTrace("sync event vgId:%d snapshot recv from %s:%d receiving ack:%d, lastIndex:%ld, lastTerm:%lu", - pSyncNode->vgId, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm); + sDebug( + "vgId:%d sync event snapshot recv from %s:%d receiving ack:%d, lastIndex:%ld, lastTerm:%lu, " + "lastConfigIndex:%ld", + pSyncNode->vgId, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex); } } else { diff --git a/source/os/src/osSemaphore.c b/source/os/src/osSemaphore.c index 3b68073c7e..11f62455fd 100644 --- a/source/os/src/osSemaphore.c +++ b/source/os/src/osSemaphore.c @@ -81,24 +81,24 @@ int32_t tsem_timewait(tsem_t* sem, int64_t nanosecs) { rel.tv_nsec = nanosecs; GetSystemTimeAsFileTime(&ft_before); - errno = 0; - rc = sem_timedwait(&sem, pthread_win32_getabstime_np(&ts, &rel)); + // errno = 0; + rc = sem_timedwait(sem, pthread_win32_getabstime_np(&ts, &rel)); /* This should have timed out */ - assert(errno == ETIMEDOUT); - assert(rc != 0); - GetSystemTimeAsFileTime(&ft_after); - // We specified a non-zero wait. Time must advance. - if (ft_before.dwLowDateTime == ft_after.dwLowDateTime && ft_before.dwHighDateTime == ft_after.dwHighDateTime) - { - printf("nanoseconds: %d, rc: %d, errno: %d. before filetime: %d, %d; after filetime: %d, %d\n", - nanosecs, rc, errno, - (int)ft_before.dwLowDateTime, (int)ft_before.dwHighDateTime, - (int)ft_after.dwLowDateTime, (int)ft_after.dwHighDateTime); - printf("time must advance during sem_timedwait."); - return 1; - } - return 0; + // assert(errno == ETIMEDOUT); + // assert(rc != 0); + // GetSystemTimeAsFileTime(&ft_after); + // // We specified a non-zero wait. Time must advance. + // if (ft_before.dwLowDateTime == ft_after.dwLowDateTime && ft_before.dwHighDateTime == ft_after.dwHighDateTime) + // { + // printf("nanoseconds: %d, rc: %d, errno: %d. before filetime: %d, %d; after filetime: %d, %d\n", + // nanosecs, rc, errno, + // (int)ft_before.dwLowDateTime, (int)ft_before.dwHighDateTime, + // (int)ft_after.dwLowDateTime, (int)ft_after.dwHighDateTime); + // printf("time must advance during sem_timedwait."); + // return 1; + // } + return rc; } #elif defined(_TD_DARWIN_64) diff --git a/source/util/src/tlrucache.c b/source/util/src/tlrucache.c new file mode 100644 index 0000000000..b034a6e73e --- /dev/null +++ b/source/util/src/tlrucache.c @@ -0,0 +1,794 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define _DEFAULT_SOURCE +#include "tlrucache.h" +#include "os.h" +#include "tdef.h" +#include "taoserror.h" +#include "tlog.h" +#include "tarray.h" + +typedef struct SLRUEntry SLRUEntry; +typedef struct SLRUEntryTable SLRUEntryTable; +typedef struct SLRUCacheShard SLRUCacheShard; +typedef struct SShardedCache SShardedCache; + +enum { + TAOS_LRU_IN_CACHE = (1 << 0), // Whether this entry is referenced by the hash table. + + TAOS_LRU_IS_HIGH_PRI = (1 << 1), // Whether this entry is high priority entry. + + TAOS_LRU_IN_HIGH_PRI_POOL = (1 << 2), // Whether this entry is in high-pri pool. + + TAOS_LRU_HAS_HIT = (1 << 3), // Whether this entry has had any lookups (hits). +}; + +struct SLRUEntry { + void *value; + _taos_lru_deleter_t deleter; + SLRUEntry *nextHash; + SLRUEntry *next; + SLRUEntry *prev; + size_t totalCharge; + size_t keyLength; + uint32_t hash; + uint32_t refs; + uint8_t flags; + char keyData[1]; +}; + +#define TAOS_LRU_ENTRY_IN_CACHE(h) ((h)->flags & TAOS_LRU_IN_CACHE) +#define TAOS_LRU_ENTRY_IN_HIGH_POOL(h) ((h)->flags & TAOS_LRU_IN_HIGH_PRI_POOL) +#define TAOS_LRU_ENTRY_IS_HIGH_PRI(h) ((h)->flags & TAOS_LRU_IS_HIGH_PRI) +#define TAOS_LRU_ENTRY_HAS_HIT(h) ((h)->flags & TAOS_LRU_HAS_HIT) + +#define TAOS_LRU_ENTRY_SET_IN_CACHE(h, inCache) do { if(inCache) {(h)->flags |= TAOS_LRU_IN_CACHE;} else {(h)->flags &= ~TAOS_LRU_IN_CACHE;} } while(0) +#define TAOS_LRU_ENTRY_SET_IN_HIGH_POOL(h, inHigh) do { if(inHigh) {(h)->flags |= TAOS_LRU_IN_HIGH_PRI_POOL;} else {(h)->flags &= ~TAOS_LRU_IN_HIGH_PRI_POOL;} } while(0) +#define TAOS_LRU_ENTRY_SET_PRIORITY(h, priority) do { if(priority == TAOS_LRU_PRIORITY_HIGH) {(h)->flags |= TAOS_LRU_IS_HIGH_PRI;} else {(h)->flags &= ~TAOS_LRU_IS_HIGH_PRI;} } while(0) +#define TAOS_LRU_ENTRY_SET_HIT(h) ((h)->flags |= TAOS_LRU_HAS_HIT) + +#define TAOS_LRU_ENTRY_HAS_REFS(h) ((h)->refs > 0) +#define TAOS_LRU_ENTRY_REF(h) (++(h)->refs) + +static bool taosLRUEntryUnref(SLRUEntry *entry) { + assert(entry->refs > 0); + --entry->refs; + return entry->refs == 0; +} + +static void taosLRUEntryFree(SLRUEntry *entry) { + assert(entry->refs == 0); + + if (entry->deleter) { + (*entry->deleter)(entry->keyData, entry->keyLength, entry->value); + } + + taosMemoryFree(entry); +} + +typedef void (*_taos_lru_table_func_t)(SLRUEntry *entry); + +struct SLRUEntryTable { + int lengthBits; + SLRUEntry **list; + uint32_t elems; + int maxLengthBits; +}; + +static int taosLRUEntryTableInit(SLRUEntryTable *table, int maxUpperHashBits) { + table->lengthBits = 4; + table->list = taosMemoryCalloc(1 << table->lengthBits, sizeof(SLRUEntry*)); + if (!table->list) { + return -1; + } + + table->elems = 0; + table->maxLengthBits = maxUpperHashBits; + + return 0; +} + +static void taosLRUEntryTableApply(SLRUEntryTable *table, _taos_lru_table_func_t func, uint32_t begin, uint32_t end) { + for (uint32_t i = begin; i < end; ++i) { + SLRUEntry *h = table->list[i]; + while (h) { + SLRUEntry *n = h->nextHash; + assert(TAOS_LRU_ENTRY_IN_CACHE(h)); + func(h); + h = n; + } + } +} + +static void taosLRUEntryTableFree(SLRUEntry *entry) { + if (!TAOS_LRU_ENTRY_HAS_REFS(entry)) { + taosLRUEntryFree(entry); + } +} + +static void taosLRUEntryTableCleanup(SLRUEntryTable *table) { + taosLRUEntryTableApply(table, taosLRUEntryTableFree, 0, 1 << table->lengthBits); + + taosMemoryFree(table->list); +} + +static SLRUEntry **taosLRUEntryTableFindPtr(SLRUEntryTable * table, const void *key, size_t keyLen, uint32_t hash) { + SLRUEntry **entry = &table->list[hash >> (32 - table->lengthBits)]; + while (*entry && ((*entry)->hash != hash || memcmp(key, (*entry)->keyData, keyLen) != 0)) { + entry = &(*entry)->nextHash; + } + + return entry; +} + +static void taosLRUEntryTableResize(SLRUEntryTable * table) { + int lengthBits = table->lengthBits; + if (lengthBits >= table->maxLengthBits) { + return; + } + + if (lengthBits >= 31) { + return; + } + + uint32_t oldLength = 1 << lengthBits; + int newLengthBits = lengthBits + 1; + SLRUEntry **newList = taosMemoryCalloc(1 << newLengthBits, sizeof(SLRUEntry*)); + if (!newList) { + return; + } + uint32_t count = 0; + for (uint32_t i = 0; i < oldLength; ++i) { + SLRUEntry *entry = table->list[i]; + while (entry) { + SLRUEntry *next = entry->nextHash; + uint32_t hash = entry->hash; + SLRUEntry **ptr = &newList[hash >> (32 - newLengthBits)]; + entry->nextHash = *ptr; + *ptr = entry; + entry = next; + ++count; + } + } + assert(table->elems == count); + + taosMemoryFree(table->list); + table->list = newList; + table->lengthBits = newLengthBits; +} + +static SLRUEntry *taosLRUEntryTableLookup(SLRUEntryTable * table, const void *key, size_t keyLen, uint32_t hash) { + return *taosLRUEntryTableFindPtr(table, key, keyLen, hash); +} + +static SLRUEntry *taosLRUEntryTableInsert(SLRUEntryTable * table, SLRUEntry *entry) { + SLRUEntry **ptr = taosLRUEntryTableFindPtr(table, entry->keyData, entry->keyLength, entry->hash); + SLRUEntry *old = *ptr; + entry->nextHash = (old == NULL) ? NULL : old->nextHash; + *ptr = entry; + if (old == NULL) { + ++table->elems; + if ((table->elems >> table->lengthBits) > 0) { + taosLRUEntryTableResize(table); + } + } + + return old; +} + +static SLRUEntry *taosLRUEntryTableRemove(SLRUEntryTable * table, const void *key, size_t keyLen, uint32_t hash) { + SLRUEntry **entry = taosLRUEntryTableFindPtr(table, key, keyLen, hash); + SLRUEntry *result = *entry; + if (result) { + *entry = result->nextHash; + --table->elems; + } + + return result; +} + +struct SLRUCacheShard { + size_t capacity; + size_t highPriPoolUsage; + bool strictCapacity; + double highPriPoolRatio; + double highPriPoolCapacity; + SLRUEntry lru; + SLRUEntry *lruLowPri; + SLRUEntryTable table; + size_t usage; // Memory size for entries residing in the cache. + size_t lruUsage; // Memory size for entries residing only in the LRU list. + TdThreadMutex mutex; +}; + +#define TAOS_LRU_CACHE_SHARD_HASH32(key, len) (MurmurHash3_32((key), (len))) + +static void taosLRUCacheShardMaintainPoolSize(SLRUCacheShard *shard) { + while (shard->highPriPoolUsage > shard->highPriPoolCapacity) { + shard->lruLowPri = shard->lruLowPri->next; + assert(shard->lruLowPri != &shard->lru); + TAOS_LRU_ENTRY_SET_IN_HIGH_POOL(shard->lruLowPri, false); + + assert(shard->highPriPoolUsage >= shard->lruLowPri->totalCharge); + shard->highPriPoolUsage -= shard->lruLowPri->totalCharge; + } +} + +static void taosLRUCacheShardLRUInsert(SLRUCacheShard *shard, SLRUEntry *e) { + assert(e->next == NULL); + assert(e->prev == NULL); + + if (shard->highPriPoolRatio > 0 + && (TAOS_LRU_ENTRY_IS_HIGH_PRI(e) || TAOS_LRU_ENTRY_HAS_HIT(e))) { + e->next = &shard->lru; + e->prev = shard->lru.prev; + + e->prev->next = e; + e->next->prev = e; + + TAOS_LRU_ENTRY_SET_IN_HIGH_POOL(e, true); + shard->highPriPoolUsage += e->totalCharge; + taosLRUCacheShardMaintainPoolSize(shard); + } else { + e->next = shard->lruLowPri->next; + e->prev = shard->lruLowPri; + + e->prev->next = e; + e->next->prev = e; + + TAOS_LRU_ENTRY_SET_IN_HIGH_POOL(e, false); + shard->lruLowPri = e; + } + + shard->lruUsage += e->totalCharge; +} + +static void taosLRUCacheShardLRURemove(SLRUCacheShard *shard, SLRUEntry *e) { + assert(e->next); + assert(e->prev); + + if (shard->lruLowPri == e) { + shard->lruLowPri = e->prev; + } + e->next->prev = e->prev; + e->prev->next = e->next; + e->prev = e->next = NULL; + + assert(shard->lruUsage >= e->totalCharge); + shard->lruUsage -= e->totalCharge; + if (TAOS_LRU_ENTRY_IN_HIGH_POOL(e)) { + assert(shard->highPriPoolUsage >= e->totalCharge); + shard->highPriPoolUsage -= e->totalCharge; + } +} + +static void taosLRUCacheShardEvictLRU(SLRUCacheShard *shard, size_t charge, SArray *deleted) { + while (shard->usage + charge > shard->capacity && shard->lru.next != &shard->lru) { + SLRUEntry *old = shard->lru.next; + assert(TAOS_LRU_ENTRY_IN_CACHE(old) && !TAOS_LRU_ENTRY_HAS_REFS(old)); + + taosLRUCacheShardLRURemove(shard, old); + taosLRUEntryTableRemove(&shard->table, old->keyData, old->keyLength, old->hash); + + TAOS_LRU_ENTRY_SET_IN_CACHE(old, false); + assert(shard->usage >= old->totalCharge); + shard->usage -= old->totalCharge; + + taosArrayPush(deleted, &old); + } +} + +static void taosLRUCacheShardSetCapacity(SLRUCacheShard *shard, size_t capacity) { + SArray *lastReferenceList = taosArrayInit(16, POINTER_BYTES); + + taosThreadMutexLock(&shard->mutex); + + shard->capacity = capacity; + shard->highPriPoolCapacity = capacity * shard->highPriPoolRatio; + taosLRUCacheShardEvictLRU(shard, 0, lastReferenceList); + + taosThreadMutexUnlock(&shard->mutex); + + for (int i = 0; i < taosArrayGetSize(lastReferenceList); ++i) { + SLRUEntry *entry = taosArrayGetP(lastReferenceList, i); + taosLRUEntryFree(entry); + } + taosArrayDestroy(lastReferenceList); +} + +static int taosLRUCacheShardInit(SLRUCacheShard *shard, size_t capacity, bool strict, + double highPriPoolRatio, int maxUpperHashBits) { + if (taosLRUEntryTableInit(&shard->table, maxUpperHashBits) < 0) { + return -1; + } + + taosThreadMutexInit(&shard->mutex, NULL); + + shard->capacity = 0; + shard->highPriPoolUsage = 0; + shard->strictCapacity = strict; + shard->highPriPoolRatio = highPriPoolRatio; + shard->highPriPoolCapacity = 0; + + shard->usage = 0; + shard->lruUsage = 0; + + shard->lru.next = &shard->lru; + shard->lru.prev = &shard->lru; + shard->lruLowPri = &shard->lru; + + taosLRUCacheShardSetCapacity(shard, capacity); + + return 0; +} + +static void taosLRUCacheShardCleanup(SLRUCacheShard *shard) { + taosThreadMutexDestroy(&shard->mutex); + + taosLRUEntryTableCleanup(&shard->table); +} + +static LRUStatus taosLRUCacheShardInsertEntry(SLRUCacheShard *shard, SLRUEntry *e, LRUHandle **handle, bool freeOnFail) { + LRUStatus status = TAOS_LRU_STATUS_OK; + SArray *lastReferenceList = taosArrayInit(16, POINTER_BYTES); + + taosThreadMutexLock(&shard->mutex); + + taosLRUCacheShardEvictLRU(shard, e->totalCharge, lastReferenceList); + + if (shard->usage + e->totalCharge > shard->capacity && (shard->strictCapacity || handle == NULL)) { + TAOS_LRU_ENTRY_SET_IN_CACHE(e, false); + if (handle == NULL) { + taosArrayPush(lastReferenceList, &e); + } else { + if (freeOnFail) { + taosMemoryFree(e); + + *handle = NULL; + } + + status = TAOS_LRU_STATUS_INCOMPLETE; + } + } else { + SLRUEntry *old = taosLRUEntryTableInsert(&shard->table, e); + shard->usage += e->totalCharge; + if (old != NULL) { + status = TAOS_LRU_STATUS_OK_OVERWRITTEN; + + assert(TAOS_LRU_ENTRY_IN_CACHE(old)); + TAOS_LRU_ENTRY_SET_IN_CACHE(old, false); + if (!TAOS_LRU_ENTRY_HAS_REFS(e)) { + taosLRUCacheShardLRURemove(shard, old); + assert(shard->usage >= old->totalCharge); + shard->usage -= old->totalCharge; + + taosArrayPush(lastReferenceList, &old); + } + } + if (handle == NULL) { + taosLRUCacheShardLRUInsert(shard, e); + } else { + if (!TAOS_LRU_ENTRY_HAS_REFS(e)) { + TAOS_LRU_ENTRY_REF(e); + } + + *handle = (LRUHandle*) e; + } + } + + taosThreadMutexUnlock(&shard->mutex); + + for (int i = 0; i < taosArrayGetSize(lastReferenceList); ++i) { + SLRUEntry *entry = taosArrayGetP(lastReferenceList, i); + + taosLRUEntryFree(entry); + } + taosArrayDestroy(lastReferenceList); + + return status; +} + +static LRUStatus taosLRUCacheShardInsert(SLRUCacheShard *shard, const void *key, size_t keyLen, uint32_t hash, + void *value, size_t charge, _taos_lru_deleter_t deleter, + LRUHandle **handle, LRUPriority priority) { + SLRUEntry *e = taosMemoryCalloc(1, sizeof(SLRUEntry) - 1 + keyLen); + if (!e) { + return TAOS_LRU_STATUS_FAIL; + } + + e->value = value; + e->flags = 0; + e->deleter = deleter; + e->keyLength = keyLen; + e->hash = hash; + e->refs = 0; + e->next = e->prev = NULL; + TAOS_LRU_ENTRY_SET_IN_CACHE(e, true); + + TAOS_LRU_ENTRY_SET_PRIORITY(e, priority); + memcpy(e->keyData, key, keyLen); + // TODO: e->CalcTotalCharge(charge, metadataChargePolicy); + e->totalCharge = charge; + + return taosLRUCacheShardInsertEntry(shard, e, handle, true); +} + +static LRUHandle *taosLRUCacheShardLookup(SLRUCacheShard *shard, const void *key, size_t keyLen, uint32_t hash) { + SLRUEntry *e = NULL; + + taosThreadMutexLock(&shard->mutex); + e = taosLRUEntryTableLookup(&shard->table, key, keyLen, hash); + if (e != NULL) { + assert(TAOS_LRU_ENTRY_IN_CACHE(e)); + if (!TAOS_LRU_ENTRY_HAS_REFS(e)) { + taosLRUCacheShardLRURemove(shard, e); + } + TAOS_LRU_ENTRY_REF(e); + TAOS_LRU_ENTRY_SET_HIT(e); + } + + taosThreadMutexUnlock(&shard->mutex); + + return (LRUHandle *) e; +} + +static void taosLRUCacheShardErase(SLRUCacheShard *shard, const void *key, size_t keyLen, uint32_t hash) { + bool lastReference = false; + taosThreadMutexLock(&shard->mutex); + + SLRUEntry *e = taosLRUEntryTableRemove(&shard->table, key, keyLen, hash); + if (e != NULL) { + assert(TAOS_LRU_ENTRY_IN_CACHE(e)); + TAOS_LRU_ENTRY_SET_IN_CACHE(e, false); + if (!TAOS_LRU_ENTRY_HAS_REFS(e)) { + taosLRUCacheShardLRURemove(shard, e); + + assert(shard->usage >= e->totalCharge); + shard->usage -= e->totalCharge; + lastReference = true; + } + } + + taosThreadMutexUnlock(&shard->mutex); + + if (lastReference) { + taosLRUEntryFree(e); + } +} + +static void taosLRUCacheShardEraseUnrefEntries(SLRUCacheShard *shard) { + SArray *lastReferenceList = taosArrayInit(16, POINTER_BYTES); + + taosThreadMutexLock(&shard->mutex); + + while (shard->lru.next != &shard->lru) { + SLRUEntry *old = shard->lru.next; + assert(TAOS_LRU_ENTRY_IN_CACHE(old) && !TAOS_LRU_ENTRY_HAS_REFS(old)); + taosLRUCacheShardLRURemove(shard, old); + taosLRUEntryTableRemove(&shard->table, old->keyData, old->keyLength, old->hash); + TAOS_LRU_ENTRY_SET_IN_CACHE(old, false); + assert(shard->usage >= old->totalCharge); + shard->usage -= old->totalCharge; + + taosArrayPush(lastReferenceList, &old); + } + + taosThreadMutexUnlock(&shard->mutex); + + for (int i = 0; i < taosArrayGetSize(lastReferenceList); ++i) { + SLRUEntry *entry = taosArrayGetP(lastReferenceList, i); + + taosLRUEntryFree(entry); + } + + taosArrayDestroy(lastReferenceList); +} + +static bool taosLRUCacheShardRef(SLRUCacheShard *shard, LRUHandle *handle) { + SLRUEntry *e = (SLRUEntry *) handle; + taosThreadMutexLock(&shard->mutex); + + assert(TAOS_LRU_ENTRY_HAS_REFS(e)); + TAOS_LRU_ENTRY_REF(e); + + taosThreadMutexUnlock(&shard->mutex); + + return true; +} + +static bool taosLRUCacheShardRelease(SLRUCacheShard *shard, LRUHandle *handle, bool eraseIfLastRef) { + if (handle == NULL) { + return false; + } + + SLRUEntry *e = (SLRUEntry *) handle; + bool lastReference = false; + + taosThreadMutexLock(&shard->mutex); + + lastReference = taosLRUEntryUnref(e); + if (lastReference && TAOS_LRU_ENTRY_IN_CACHE(e)) { + if (shard->usage > shard->capacity || eraseIfLastRef) { + assert(shard->lru.next == &shard->lru || eraseIfLastRef); + + taosLRUEntryTableRemove(&shard->table, e->keyData, e->keyLength, e->hash); + TAOS_LRU_ENTRY_SET_IN_CACHE(e, false); + } else { + taosLRUCacheShardLRUInsert(shard, e); + + lastReference = false; + } + } + + if (lastReference && e->value) { + assert(shard->usage >= e->totalCharge); + shard->usage -= e->totalCharge; + } + + taosThreadMutexUnlock(&shard->mutex); + + if (lastReference) { + taosLRUEntryFree(e); + } + + return lastReference; +} + +static size_t taosLRUCacheShardGetUsage(SLRUCacheShard *shard) { + size_t usage = 0; + + taosThreadMutexLock(&shard->mutex); + usage = shard->usage; + taosThreadMutexUnlock(&shard->mutex); + + return usage; +} + +static size_t taosLRUCacheShardGetPinnedUsage(SLRUCacheShard *shard) { + size_t usage = 0; + + taosThreadMutexLock(&shard->mutex); + + assert(shard->usage >= shard->lruUsage); + usage = shard->usage - shard->lruUsage; + + taosThreadMutexUnlock(&shard->mutex); + + return usage; +} + +static void taosLRUCacheShardSetStrictCapacity(SLRUCacheShard *shard, bool strict) { + taosThreadMutexLock(&shard->mutex); + + shard->strictCapacity = strict; + + taosThreadMutexUnlock(&shard->mutex); +} + +struct SShardedCache { + uint32_t shardMask; + TdThreadMutex capacityMutex; + size_t capacity; + bool strictCapacity; + uint64_t lastId; // atomic var for last id +}; + +struct SLRUCache { + SShardedCache shardedCache; + SLRUCacheShard *shards; + int numShards; +}; + +static int getDefaultCacheShardBits(size_t capacity) { + int numShardBits = 0; + size_t minShardSize = 512 * 1024; + size_t numShards = capacity / minShardSize; + while (numShards >>= 1) { + if (++numShardBits >= 6) { + return numShardBits; + } + } + + return numShardBits; +} + +SLRUCache *taosLRUCacheInit(size_t capacity, int numShardBits, double highPriPoolRatio) { + if (numShardBits >= 20) { + return NULL; + } + if (highPriPoolRatio < 0.0 || highPriPoolRatio > 1.0) { + return NULL; + } + SLRUCache *cache = taosMemoryCalloc(1, sizeof(SLRUCache)); + if (!cache) { + return NULL; + } + + if (numShardBits < 0) { + numShardBits = getDefaultCacheShardBits(capacity); + } + + int numShards = 1 << numShardBits; + cache->shards = taosMemoryCalloc(numShards, sizeof(SLRUCacheShard)); + if (!cache->shards) { + taosMemoryFree(cache); + + return NULL; + } + + bool strictCapacity = 1; + size_t perShard = (capacity + (numShards - 1)) / numShards; + for (int i = 0; i < numShards; ++i) { + taosLRUCacheShardInit(&cache->shards[i], perShard, strictCapacity, highPriPoolRatio, 32 - numShardBits); + } + + cache->numShards = numShards; + + cache->shardedCache.shardMask = (1 << numShardBits) - 1; + cache->shardedCache.strictCapacity = strictCapacity; + cache->shardedCache.capacity = capacity; + cache->shardedCache.lastId = 1; + + taosThreadMutexInit(&cache->shardedCache.capacityMutex, NULL); + + return cache; +} + +void taosLRUCacheCleanup(SLRUCache *cache) { + if (cache) { + if (cache->shards) { + int numShards = cache->numShards; + assert(numShards > 0); + for (int i = 0; i < numShards; ++i) { + taosLRUCacheShardCleanup(&cache->shards[i]); + } + taosMemoryFree(cache->shards); + cache->shards = 0; + } + + taosThreadMutexDestroy(&cache->shardedCache.capacityMutex); + + taosMemoryFree(cache); + } +} + +LRUStatus taosLRUCacheInsert(SLRUCache *cache, const void *key, size_t keyLen, void *value, size_t charge, + _taos_lru_deleter_t deleter, LRUHandle **handle, LRUPriority priority) { + uint32_t hash = TAOS_LRU_CACHE_SHARD_HASH32(key, keyLen); + uint32_t shardIndex = hash & cache->shardedCache.shardMask; + + return taosLRUCacheShardInsert(&cache->shards[shardIndex], key, keyLen, hash, value, charge, deleter, handle, priority); +} + +LRUHandle *taosLRUCacheLookup(SLRUCache *cache, const void *key, size_t keyLen) { + uint32_t hash = TAOS_LRU_CACHE_SHARD_HASH32(key, keyLen); + uint32_t shardIndex = hash & cache->shardedCache.shardMask; + + return taosLRUCacheShardLookup(&cache->shards[shardIndex], key, keyLen, hash); +} + +void taosLRUCacheErase(SLRUCache *cache, const void *key, size_t keyLen) { + uint32_t hash = TAOS_LRU_CACHE_SHARD_HASH32(key, keyLen); + uint32_t shardIndex = hash & cache->shardedCache.shardMask; + + return taosLRUCacheShardErase(&cache->shards[shardIndex], key, keyLen, hash); +} + +void taosLRUCacheEraseUnrefEntries(SLRUCache *cache) { + int numShards = cache->numShards; + for (int i = 0; i < numShards; ++i) { + taosLRUCacheShardEraseUnrefEntries(&cache->shards[i]); + } +} + +bool taosLRUCacheRef(SLRUCache *cache, LRUHandle *handle) { + if (handle == NULL) { + return false; + } + + uint32_t hash = ((SLRUEntry *) handle)->hash; + uint32_t shardIndex = hash & cache->shardedCache.shardMask; + + return taosLRUCacheShardRef(&cache->shards[shardIndex], handle); +} + +bool taosLRUCacheRelease(SLRUCache *cache, LRUHandle *handle, bool eraseIfLastRef) { + if (handle == NULL) { + return false; + } + + uint32_t hash = ((SLRUEntry *) handle)->hash; + uint32_t shardIndex = hash & cache->shardedCache.shardMask; + + return taosLRUCacheShardRelease(&cache->shards[shardIndex], handle, eraseIfLastRef); +} + +void* taosLRUCacheValue(SLRUCache *cache, LRUHandle *handle) { + return ((SLRUEntry*) handle)->value; +} + +size_t taosLRUCacheGetUsage(SLRUCache *cache) { + size_t usage = 0; + + for (int i = 0; i < cache->numShards; ++i) { + usage += taosLRUCacheShardGetUsage(&cache->shards[i]); + } + + return usage; +} + +size_t taosLRUCacheGetPinnedUsage(SLRUCache *cache) { + size_t usage = 0; + + for (int i = 0; i < cache->numShards; ++i) { + usage += taosLRUCacheShardGetPinnedUsage(&cache->shards[i]); + } + + return usage; +} + +void taosLRUCacheSetCapacity(SLRUCache *cache, size_t capacity) { + uint32_t numShards = cache->numShards; + size_t perShard = (capacity + (numShards = 1)) / numShards; + + taosThreadMutexLock(&cache->shardedCache.capacityMutex); + + for (int i = 0; i < numShards; ++i) { + taosLRUCacheShardSetCapacity(&cache->shards[i], perShard); + } + + cache->shardedCache.capacity = capacity; + + taosThreadMutexUnlock(&cache->shardedCache.capacityMutex); +} + +size_t taosLRUCacheGetCapacity(SLRUCache *cache) { + size_t capacity = 0; + + taosThreadMutexLock(&cache->shardedCache.capacityMutex); + + capacity = cache->shardedCache.capacity; + + taosThreadMutexUnlock(&cache->shardedCache.capacityMutex); + + return capacity; +} + +void taosLRUCacheSetStrictCapacity(SLRUCache *cache, bool strict) { + uint32_t numShards = cache->numShards; + + taosThreadMutexLock(&cache->shardedCache.capacityMutex); + + for (int i = 0; i < numShards; ++i) { + taosLRUCacheShardSetStrictCapacity(&cache->shards[i], strict); + } + + cache->shardedCache.strictCapacity = strict; + + taosThreadMutexUnlock(&cache->shardedCache.capacityMutex); +} + +bool taosLRUCacheIsStrictCapacity(SLRUCache *cache) { + bool strict = false; + + taosThreadMutexLock(&cache->shardedCache.capacityMutex); + + strict = cache->shardedCache.strictCapacity; + + taosThreadMutexUnlock(&cache->shardedCache.capacityMutex); + + return strict; +} diff --git a/tests/pytest/util/dnodes.py b/tests/pytest/util/dnodes.py index 0d70ab82c0..b5916e2d76 100644 --- a/tests/pytest/util/dnodes.py +++ b/tests/pytest/util/dnodes.py @@ -589,7 +589,10 @@ class TDDnodes: psCmd = "ps -ef|grep -w taosd| grep -v grep| grep -v defunct | awk '{print $2}'" processID = subprocess.check_output(psCmd, shell=True).decode("utf-8") while(processID): - killCmd = "kill -9 %s > /dev/null 2>&1" % processID + if platform.system().lower() == 'windows': + killCmd = "kill -9 %s > nul 2>&1" % processID + else: + killCmd = "kill -9 %s > /dev/null 2>&1" % processID os.system(killCmd) time.sleep(1) processID = subprocess.check_output( @@ -599,7 +602,10 @@ class TDDnodes: psCmd = "ps -ef|grep -w valgrind.bin| grep -v grep | awk '{print $2}'" processID = subprocess.check_output(psCmd, shell=True).decode("utf-8") while(processID): - killCmd = "kill -TERM %s > /dev/null 2>&1" % processID + if platform.system().lower() == 'windows': + killCmd = "kill -TERM %s > nul 2>&1" % processID + else: + killCmd = "kill -TERM %s > /dev/null 2>&1" % processID os.system(killCmd) time.sleep(1) processID = subprocess.check_output( diff --git a/tests/script/jenkins/basic.txt b/tests/script/jenkins/basic.txt index dab30a54e5..db8a055362 100644 --- a/tests/script/jenkins/basic.txt +++ b/tests/script/jenkins/basic.txt @@ -2,20 +2,20 @@ #======================b1-start=============== # ---- user -./test.sh -f tsim/user/basic1.sim -./test.sh -f tsim/user/pass_alter.sim -./test.sh -f tsim/user/pass_len.sim -./test.sh -f tsim/user/user_len.sim -./test.sh -f tsim/user/privilege1.sim -./test.sh -f tsim/user/privilege2.sim +#./test.sh -f tsim/user/basic1.sim +#./test.sh -f tsim/user/pass_alter.sim +#./test.sh -f tsim/user/pass_len.sim +#./test.sh -f tsim/user/user_len.sim +#./test.sh -f tsim/user/privilege1.sim +#./test.sh -f tsim/user/privilege2.sim# -# ---- db -./test.sh -f tsim/db/create_all_options.sim -./test.sh -f tsim/db/alter_option.sim -./test.sh -f tsim/db/basic1.sim -./test.sh -f tsim/db/basic2.sim -./test.sh -f tsim/db/basic3.sim -./test.sh -f tsim/db/basic6.sim +## ---- db +#./test.sh -f tsim/db/create_all_options.sim +#./test.sh -f tsim/db/alter_option.sim +#./test.sh -f tsim/db/basic1.sim +#./test.sh -f tsim/db/basic2.sim +#./test.sh -f tsim/db/basic3.sim +#./test.sh -f tsim/db/basic6.sim ./test.sh -f tsim/db/basic7.sim ./test.sh -f tsim/db/error1.sim ./test.sh -f tsim/db/taosdlog.sim diff --git a/tests/script/tsim/stream/session0.sim b/tests/script/tsim/stream/session0.sim index c021e14de7..2a737578bc 100644 --- a/tests/script/tsim/stream/session0.sim +++ b/tests/script/tsim/stream/session0.sim @@ -231,8 +231,10 @@ sql use test3; sql create table t1(ts timestamp, a int, b int , c int, d double); sql create stream streams3 trigger at_once watermark 1d into streamt3 as select _wstartts, min(b), a,c from t1 session(ts,10s); sql create stream streams4 trigger at_once watermark 1d into streamt4 as select _wstartts, max(b), a,c from t1 session(ts,10s); -sql create stream streams5 trigger at_once watermark 1d into streamt5 as select _wstartts, max(b), a,c from t1 session(ts,10s); -sql create stream streams6 trigger at_once watermark 1d into streamt6 as select _wstartts, max(b), a,c from t1 session(ts,10s); +sql create stream streams5 trigger at_once watermark 1d into streamt5 as select _wstartts, top(b,3), a,c from t1 session(ts,10s); +sql create stream streams6 trigger at_once watermark 1d into streamt6 as select _wstartts, bottom(b,3), a,c from t1 session(ts,10s); +sql create stream streams7 trigger at_once watermark 1d into streamt7 as select _wstartts, spread(a), elapsed(ts), hyperloglog(a) from t1 session(ts,10s); +sql create stream streams8 trigger at_once watermark 1d into streamt8 as select _wstartts, histogram(a,"user_input", "[1,3,5,7]", 1), histogram(a,"user_input", "[1,3,5,7]", 0) from t1 session(ts,10s); sql insert into t1 values(1648791213001,1,1,1,1.0); sql insert into t1 values(1648791213002,2,3,2,3.4); sql insert into t1 values(1648791213003,4,9,3,4.8); @@ -279,4 +281,16 @@ if $rows == 0 then goto loop3 endi +sql select * from streamt7; +if $rows == 0 then + print ======$rows + goto loop3 +endi + +sql select * from streamt8; +if $rows == 0 then + print ======$rows + goto loop3 +endi + system sh/exec.sh -n dnode1 -s stop -x SIGINT diff --git a/tests/system-test/2-query/To_iso8601.py b/tests/system-test/2-query/To_iso8601.py index 973e1e49eb..da7b7e272f 100644 --- a/tests/system-test/2-query/To_iso8601.py +++ b/tests/system-test/2-query/To_iso8601.py @@ -1,3 +1,4 @@ +import time from time import sleep from util.log import * @@ -16,7 +17,7 @@ class TDTestCase: self.ts = 1640966400000 # 2022-1-1 00:00:00.000 def check_customize_param_ms(self): - time_zone = os.popen('date "+%z"').read().strip() + time_zone = time.strftime('%z') tdSql.execute('create database db1 precision "ms"') tdSql.execute('use db1') tdSql.execute('create table if not exists ntb(ts timestamp, c1 int, c2 timestamp)') diff --git a/tests/system-test/2-query/csum.py b/tests/system-test/2-query/csum.py index 5c2de5ea14..b7fa9a0cbb 100644 --- a/tests/system-test/2-query/csum.py +++ b/tests/system-test/2-query/csum.py @@ -83,6 +83,8 @@ class TDTestCase: tdSql.query(f"select {col} {alias} from {table_expr} {pre_condition}") pre_data = np.array(tdSql.queryResult)[np.array(tdSql.queryResult) != None] + if (platform.system().lower() == 'windows' and pre_data.dtype == 'int32'): + pre_data = np.array(pre_data, dtype = 'int64') print("data is ", pre_data) pre_csum = np.cumsum(pre_data) tdSql.query(self.csum_query_form( @@ -124,6 +126,8 @@ class TDTestCase: tdSql.query(f"select {col} from {table_expr} {re.sub('limit [0-9]*|offset [0-9]*','',condition)}") offset_val = condition.split("offset")[1].split(" ")[1] if "offset" in condition else 0 pre_result = np.array(tdSql.queryResult)[np.array(tdSql.queryResult) != None] + if (platform.system().lower() == 'windows' and pre_result.dtype == 'int32'): + pre_result = np.array(pre_result, dtype = 'int64') pre_csum = np.cumsum(pre_result)[offset_val:] tdSql.query(self.csum_query_form( col=col, alias=alias, table_expr=table_expr, condition=condition diff --git a/tests/system-test/2-query/function_diff.py b/tests/system-test/2-query/function_diff.py index 325bd2bc8e..8edc96ed81 100644 --- a/tests/system-test/2-query/function_diff.py +++ b/tests/system-test/2-query/function_diff.py @@ -83,6 +83,8 @@ class TDTestCase: tdSql.query(f"select {col} {alias} from {table_expr} {pre_condition}") pre_data = np.array(tdSql.queryResult)[np.array(tdSql.queryResult) != None] + if (platform.system().lower() == 'windows' and pre_data.dtype == 'int32'): + pre_data = np.array(pre_data, dtype = 'int64') pre_diff = np.diff(pre_data) # trans precision for data tdSql.query(self.diff_query_form( @@ -127,6 +129,8 @@ class TDTestCase: tdSql.query(f"select {col} from {table_expr} {re.sub('limit [0-9]*|offset [0-9]*','',condition)}") offset_val = condition.split("offset")[1].split(" ")[1] if "offset" in condition else 0 pre_result = np.array(tdSql.queryResult)[np.array(tdSql.queryResult) != None] + if (platform.system().lower() == 'windows' and pre_result.dtype == 'int32'): + pre_result = np.array(pre_result, dtype = 'int64') pre_diff = np.diff(pre_result)[offset_val:] tdSql.query(self.diff_query_form( col=col, alias=alias, table_expr=table_expr, condition=condition diff --git a/tests/system-test/2-query/mavg.py b/tests/system-test/2-query/mavg.py index 13d2b4d420..fa2d0f47a4 100644 --- a/tests/system-test/2-query/mavg.py +++ b/tests/system-test/2-query/mavg.py @@ -245,6 +245,8 @@ class TDTestCase: tdSql.query(f"select {col} {alias} from {table_expr} {pre_condition}") pre_data = np.array(tdSql.queryResult)[np.array(tdSql.queryResult) != None] + if (platform.system().lower() == 'windows' and pre_data.dtype == 'int32'): + pre_data = np.array(pre_data, dtype = 'int64') pre_mavg = np.convolve(pre_data, np.ones(k), "valid")/k tdSql.query(self.mavg_query_form( sel=sel, func=func, col=col, m_comm=m_comm, k=k, r_comm=r_comm, alias=alias, fr=fr, @@ -291,6 +293,8 @@ class TDTestCase: # print(f"select {col} from {table_expr} {re.sub('limit [0-9]*|offset [0-9]*','',condition)}") if not tdSql.queryResult: pre_result = np.array(tdSql.queryResult)[np.array(tdSql.queryResult) != None] + if (platform.system().lower() == 'windows' and pre_result.dtype == 'int32'): + pre_result = np.array(pre_result, dtype = 'int64') pre_mavg = pre_mavg = np.convolve(pre_result, np.ones(k), "valid")[offset_val:]/k tdSql.query(self.mavg_query_form( diff --git a/tests/system-test/7-tmq/basic5.py b/tests/system-test/7-tmq/basic5.py index d6ac4d4208..3d9efea938 100644 --- a/tests/system-test/7-tmq/basic5.py +++ b/tests/system-test/7-tmq/basic5.py @@ -196,11 +196,13 @@ class TDTestCase: showMsg = 1 showRow = 1 - shellCmd = 'nohup ' + buildPath + '/build/bin/tmq_sim -c ' + cfgPath - shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, parameterDict["dbName"], showMsg, showRow, cdbName) if (platform.system().lower() == 'windows'): + shellCmd = 'mintty -h never -w hide ' + buildPath + '\\build\\bin\\tmq_sim.exe -c ' + cfgPath + shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, parameterDict["dbName"], showMsg, showRow, cdbName) shellCmd += "> nul 2>&1 &" else: + shellCmd = 'nohup ' + buildPath + '/build/bin/tmq_sim -c ' + cfgPath + shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, parameterDict["dbName"], showMsg, showRow, cdbName) shellCmd += "> /dev/null 2>&1 &" tdLog.info(shellCmd) os.system(shellCmd) @@ -312,12 +314,13 @@ class TDTestCase: pollDelay = 100 showMsg = 1 showRow = 1 - - shellCmd = 'nohup ' + buildPath + '/build/bin/tmq_sim -c ' + cfgPath - shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, parameterDict["dbName"], showMsg, showRow, cdbName) if (platform.system().lower() == 'windows'): - shellCmd += "> nul 2>&1 &" + shellCmd = 'mintty -h never -w hide ' + buildPath + '\\build\\bin\\tmq_sim.exe -c ' + cfgPath + shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, parameterDict["dbName"], showMsg, showRow, cdbName) + shellCmd += "> nul 2>&1 &" else: + shellCmd = 'nohup ' + buildPath + '/build/bin/tmq_sim -c ' + cfgPath + shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, parameterDict["dbName"], showMsg, showRow, cdbName) shellCmd += "> /dev/null 2>&1 &" tdLog.info(shellCmd) os.system(shellCmd) @@ -448,11 +451,13 @@ class TDTestCase: showMsg = 1 showRow = 1 - shellCmd = 'nohup ' + buildPath + '/build/bin/tmq_sim -c ' + cfgPath - shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, parameterDict["dbName"], showMsg, showRow, cdbName) if (platform.system().lower() == 'windows'): + shellCmd = 'mintty -h never -w hide ' + buildPath + '\\build\\bin\\tmq_sim.exe -c ' + cfgPath + shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, parameterDict["dbName"], showMsg, showRow, cdbName) shellCmd += "> nul 2>&1 &" else: + shellCmd = 'nohup ' + buildPath + '/build/bin/tmq_sim -c ' + cfgPath + shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, parameterDict["dbName"], showMsg, showRow, cdbName) shellCmd += "> /dev/null 2>&1 &" tdLog.info(shellCmd) os.system(shellCmd) diff --git a/tests/system-test/7-tmq/db.py b/tests/system-test/7-tmq/db.py index 70d02c4e29..fd793fd841 100644 --- a/tests/system-test/7-tmq/db.py +++ b/tests/system-test/7-tmq/db.py @@ -98,15 +98,19 @@ class TDTestCase: return resultList def startTmqSimProcess(self,buildPath,cfgPath,pollDelay,dbName,showMsg=1,showRow=1,cdbName='cdb',valgrind=0): - shellCmd = 'nohup ' if valgrind == 1: logFile = cfgPath + '/../log/valgrind-tmq.log' shellCmd = 'nohup valgrind --log-file=' + logFile shellCmd += '--tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all --num-callers=20 -v --workaround-gcc296-bugs=yes ' - shellCmd += buildPath + '/build/bin/tmq_sim -c ' + cfgPath - shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName) - shellCmd += "> /dev/null 2>&1 &" + if (platform.system().lower() == 'windows'): + shellCmd = 'mintty -h never -w hide ' + buildPath + '\\build\\bin\\tmq_sim.exe -c ' + cfgPath + shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName) + shellCmd += "> nul 2>&1 &" + else: + shellCmd = 'nohup ' + buildPath + '/build/bin/tmq_sim -c ' + cfgPath + shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName) + shellCmd += "> /dev/null 2>&1 &" tdLog.info(shellCmd) os.system(shellCmd) diff --git a/tests/system-test/7-tmq/subscribeDb.py b/tests/system-test/7-tmq/subscribeDb.py index 279518d283..b2c569e31e 100644 --- a/tests/system-test/7-tmq/subscribeDb.py +++ b/tests/system-test/7-tmq/subscribeDb.py @@ -81,15 +81,19 @@ class TDTestCase: return resultList def startTmqSimProcess(self,buildPath,cfgPath,pollDelay,dbName,showMsg=1,showRow=1,cdbName='cdb',valgrind=0): - shellCmd = 'nohup ' if valgrind == 1: logFile = cfgPath + '/../log/valgrind-tmq.log' shellCmd = 'nohup valgrind --log-file=' + logFile shellCmd += '--tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all --num-callers=20 -v --workaround-gcc296-bugs=yes ' - shellCmd += buildPath + '/build/bin/tmq_sim -c ' + cfgPath - shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName) - shellCmd += "> /dev/null 2>&1 &" + if (platform.system().lower() == 'windows'): + shellCmd = 'mintty -h never -w hide ' + buildPath + '\\build\\bin\\tmq_sim.exe -c ' + cfgPath + shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName) + shellCmd += "> nul 2>&1 &" + else: + shellCmd = 'nohup ' + buildPath + '/build/bin/tmq_sim -c ' + cfgPath + shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName) + shellCmd += "> /dev/null 2>&1 &" tdLog.info(shellCmd) os.system(shellCmd) diff --git a/tests/system-test/7-tmq/subscribeDb0.py b/tests/system-test/7-tmq/subscribeDb0.py index b0b8b06076..c9f256ed74 100644 --- a/tests/system-test/7-tmq/subscribeDb0.py +++ b/tests/system-test/7-tmq/subscribeDb0.py @@ -81,15 +81,19 @@ class TDTestCase: return resultList def startTmqSimProcess(self,buildPath,cfgPath,pollDelay,dbName,showMsg=1,showRow=1,cdbName='cdb',valgrind=0): - shellCmd = 'nohup ' if valgrind == 1: logFile = cfgPath + '/../log/valgrind-tmq.log' shellCmd = 'nohup valgrind --log-file=' + logFile shellCmd += '--tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all --num-callers=20 -v --workaround-gcc296-bugs=yes ' - shellCmd += buildPath + '/build/bin/tmq_sim -c ' + cfgPath - shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName) - shellCmd += "> /dev/null 2>&1 &" + if (platform.system().lower() == 'windows'): + shellCmd = 'mintty -h never -w hide ' + buildPath + '\\build\\bin\\tmq_sim.exe -c ' + cfgPath + shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName) + shellCmd += "> nul 2>&1 &" + else: + shellCmd = 'nohup ' + buildPath + '/build/bin/tmq_sim -c ' + cfgPath + shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName) + shellCmd += "> /dev/null 2>&1 &" tdLog.info(shellCmd) os.system(shellCmd) diff --git a/tests/system-test/7-tmq/subscribeDb1.py b/tests/system-test/7-tmq/subscribeDb1.py index 9af78ce6c3..44c24eb616 100644 --- a/tests/system-test/7-tmq/subscribeDb1.py +++ b/tests/system-test/7-tmq/subscribeDb1.py @@ -81,15 +81,19 @@ class TDTestCase: return resultList def startTmqSimProcess(self,buildPath,cfgPath,pollDelay,dbName,showMsg=1,showRow=1,cdbName='cdb',valgrind=0): - shellCmd = 'nohup ' if valgrind == 1: logFile = cfgPath + '/../log/valgrind-tmq.log' shellCmd = 'nohup valgrind --log-file=' + logFile shellCmd += '--tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all --num-callers=20 -v --workaround-gcc296-bugs=yes ' - - shellCmd += buildPath + '/build/bin/tmq_sim -c ' + cfgPath - shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName) - shellCmd += "> /dev/null 2>&1 &" + + if (platform.system().lower() == 'windows'): + shellCmd = 'mintty -h never -w hide ' + buildPath + '\\build\\bin\\tmq_sim.exe -c ' + cfgPath + shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName) + shellCmd += "> nul 2>&1 &" + else: + shellCmd = 'nohup ' + buildPath + '/build/bin/tmq_sim -c ' + cfgPath + shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName) + shellCmd += "> /dev/null 2>&1 &" tdLog.info(shellCmd) os.system(shellCmd) @@ -291,8 +295,12 @@ class TDTestCase: for i in range(expectRows): totalConsumeRows += resultList[i] + tdSql.query("select count(*) from %s.%s" %(parameterDict['dbName'], parameterDict['stbName'])) + countOfStb = tdSql.getData(0,0) + print ("====total rows of stb: %d"%countOfStb) + tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt)) - if totalConsumeRows != expectrowcnt: + if totalConsumeRows < expectrowcnt: tdLog.exit("tmq consume rows error!") tdLog.info("again start consume processer") @@ -361,7 +369,10 @@ class TDTestCase: time.sleep(2) tdLog.info("pkill consume processor") - os.system('pkill tmq_sim') + if (platform.system().lower() == 'windows'): + os.system("TASKKILL /F /IM tmq_sim.exe") + else: + os.system('pkill tmq_sim') expectRows = 0 resultList = self.selectConsumeResult(expectRows) @@ -433,7 +444,10 @@ class TDTestCase: time.sleep(5) tdLog.info("pkill consume processor") - os.system('pkill tmq_sim') + if (platform.system().lower() == 'windows'): + os.system("TASKKILL /F /IM tmq_sim.exe") + else: + os.system('pkill tmq_sim') expectRows = 0 resultList = self.selectConsumeResult(expectRows) diff --git a/tests/system-test/7-tmq/subscribeStb.py b/tests/system-test/7-tmq/subscribeStb.py index 9f308abd7c..4f70340b5a 100644 --- a/tests/system-test/7-tmq/subscribeStb.py +++ b/tests/system-test/7-tmq/subscribeStb.py @@ -93,15 +93,19 @@ class TDTestCase: return resultList def startTmqSimProcess(self,buildPath,cfgPath,pollDelay,dbName,showMsg=1,showRow=1,cdbName='cdb',valgrind=0): - shellCmd = 'nohup ' if valgrind == 1: logFile = cfgPath + '/../log/valgrind-tmq.log' shellCmd = 'nohup valgrind --log-file=' + logFile shellCmd += '--tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all --num-callers=20 -v --workaround-gcc296-bugs=yes ' - shellCmd += buildPath + '/build/bin/tmq_sim -c ' + cfgPath - shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName) - shellCmd += "> /dev/null 2>&1 &" + if (platform.system().lower() == 'windows'): + shellCmd = 'mintty -h never -w hide ' + buildPath + '\\build\\bin\\tmq_sim.exe -c ' + cfgPath + shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName) + shellCmd += "> nul 2>&1 &" + else: + shellCmd = 'nohup ' + buildPath + '/build/bin/tmq_sim -c ' + cfgPath + shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName) + shellCmd += "> /dev/null 2>&1 &" tdLog.info(shellCmd) os.system(shellCmd) diff --git a/tests/system-test/7-tmq/subscribeStb0.py b/tests/system-test/7-tmq/subscribeStb0.py index f7e56b4550..65eaab897d 100644 --- a/tests/system-test/7-tmq/subscribeStb0.py +++ b/tests/system-test/7-tmq/subscribeStb0.py @@ -93,15 +93,19 @@ class TDTestCase: return resultList def startTmqSimProcess(self,buildPath,cfgPath,pollDelay,dbName,showMsg=1,showRow=1,cdbName='cdb',valgrind=0): - shellCmd = 'nohup ' if valgrind == 1: logFile = cfgPath + '/../log/valgrind-tmq.log' shellCmd = 'nohup valgrind --log-file=' + logFile shellCmd += '--tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all --num-callers=20 -v --workaround-gcc296-bugs=yes ' - shellCmd += buildPath + '/build/bin/tmq_sim -c ' + cfgPath - shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName) - shellCmd += "> /dev/null 2>&1 &" + if (platform.system().lower() == 'windows'): + shellCmd = 'mintty -h never -w hide ' + buildPath + '\\build\\bin\\tmq_sim.exe -c ' + cfgPath + shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName) + shellCmd += "> nul 2>&1 &" + else: + shellCmd = 'nohup ' + buildPath + '/build/bin/tmq_sim -c ' + cfgPath + shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName) + shellCmd += "> /dev/null 2>&1 &" tdLog.info(shellCmd) os.system(shellCmd) diff --git a/tests/system-test/7-tmq/subscribeStb1.py b/tests/system-test/7-tmq/subscribeStb1.py index 4098d151d1..90d77dba0d 100644 --- a/tests/system-test/7-tmq/subscribeStb1.py +++ b/tests/system-test/7-tmq/subscribeStb1.py @@ -93,15 +93,19 @@ class TDTestCase: return resultList def startTmqSimProcess(self,buildPath,cfgPath,pollDelay,dbName,showMsg=1,showRow=1,cdbName='cdb',valgrind=0): - shellCmd = 'nohup ' if valgrind == 1: logFile = cfgPath + '/../log/valgrind-tmq.log' shellCmd = 'nohup valgrind --log-file=' + logFile shellCmd += '--tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all --num-callers=20 -v --workaround-gcc296-bugs=yes ' - shellCmd += buildPath + '/build/bin/tmq_sim -c ' + cfgPath - shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName) - shellCmd += "> /dev/null 2>&1 &" + if (platform.system().lower() == 'windows'): + shellCmd = 'mintty -h never -w hide ' + buildPath + '\\build\\bin\\tmq_sim.exe -c ' + cfgPath + shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName) + shellCmd += "> nul 2>&1 &" + else: + shellCmd = 'nohup ' + buildPath + '/build/bin/tmq_sim -c ' + cfgPath + shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName) + shellCmd += "> /dev/null 2>&1 &" tdLog.info(shellCmd) os.system(shellCmd) diff --git a/tests/system-test/7-tmq/subscribeStb2.py b/tests/system-test/7-tmq/subscribeStb2.py index 45feb21019..74caa139f1 100644 --- a/tests/system-test/7-tmq/subscribeStb2.py +++ b/tests/system-test/7-tmq/subscribeStb2.py @@ -93,15 +93,19 @@ class TDTestCase: return resultList def startTmqSimProcess(self,buildPath,cfgPath,pollDelay,dbName,showMsg=1,showRow=1,cdbName='cdb',valgrind=0): - shellCmd = 'nohup ' if valgrind == 1: logFile = cfgPath + '/../log/valgrind-tmq.log' shellCmd = 'nohup valgrind --log-file=' + logFile shellCmd += '--tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all --num-callers=20 -v --workaround-gcc296-bugs=yes ' - shellCmd += buildPath + '/build/bin/tmq_sim -c ' + cfgPath - shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName) - shellCmd += "> /dev/null 2>&1 &" + if (platform.system().lower() == 'windows'): + shellCmd = 'mintty -h never -w hide ' + buildPath + '\\build\\bin\\tmq_sim.exe -c ' + cfgPath + shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName) + shellCmd += "> nul 2>&1 &" + else: + shellCmd = 'nohup ' + buildPath + '/build/bin/tmq_sim -c ' + cfgPath + shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName) + shellCmd += "> /dev/null 2>&1 &" tdLog.info(shellCmd) os.system(shellCmd) diff --git a/tests/system-test/7-tmq/subscribeStb3.py b/tests/system-test/7-tmq/subscribeStb3.py index 81105f5352..e6eaa17564 100644 --- a/tests/system-test/7-tmq/subscribeStb3.py +++ b/tests/system-test/7-tmq/subscribeStb3.py @@ -93,15 +93,19 @@ class TDTestCase: return resultList def startTmqSimProcess(self,buildPath,cfgPath,pollDelay,dbName,showMsg=1,showRow=1,cdbName='cdb',valgrind=0): - shellCmd = 'nohup ' if valgrind == 1: logFile = cfgPath + '/../log/valgrind-tmq.log' shellCmd = 'nohup valgrind --log-file=' + logFile shellCmd += '--tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all --num-callers=20 -v --workaround-gcc296-bugs=yes ' - shellCmd += buildPath + '/build/bin/tmq_sim -c ' + cfgPath - shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName) - shellCmd += "> /dev/null 2>&1 &" + if (platform.system().lower() == 'windows'): + shellCmd = 'mintty -h never -w hide ' + buildPath + '\\build\\bin\\tmq_sim.exe -c ' + cfgPath + shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName) + shellCmd += "> nul 2>&1 &" + else: + shellCmd = 'nohup ' + buildPath + '/build/bin/tmq_sim -c ' + cfgPath + shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName) + shellCmd += "> /dev/null 2>&1 &" tdLog.info(shellCmd) os.system(shellCmd) diff --git a/tests/system-test/7-tmq/subscribeStb4.py b/tests/system-test/7-tmq/subscribeStb4.py index a6f1cab4a4..d8a9ca95b0 100644 --- a/tests/system-test/7-tmq/subscribeStb4.py +++ b/tests/system-test/7-tmq/subscribeStb4.py @@ -93,15 +93,19 @@ class TDTestCase: return resultList def startTmqSimProcess(self,buildPath,cfgPath,pollDelay,dbName,showMsg=1,showRow=1,cdbName='cdb',valgrind=0): - shellCmd = 'nohup ' if valgrind == 1: logFile = cfgPath + '/../log/valgrind-tmq.log' shellCmd = 'nohup valgrind --log-file=' + logFile shellCmd += '--tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all --num-callers=20 -v --workaround-gcc296-bugs=yes ' - shellCmd += buildPath + '/build/bin/tmq_sim -c ' + cfgPath - shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName) - shellCmd += "> /dev/null 2>&1 &" + if (platform.system().lower() == 'windows'): + shellCmd = 'mintty -h never -w hide ' + buildPath + '\\build\\bin\\tmq_sim.exe -c ' + cfgPath + shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName) + shellCmd += "> nul 2>&1 &" + else: + shellCmd = 'nohup ' + buildPath + '/build/bin/tmq_sim -c ' + cfgPath + shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName) + shellCmd += "> /dev/null 2>&1 &" tdLog.info(shellCmd) os.system(shellCmd) diff --git a/tests/system-test/fulltest.bat b/tests/system-test/fulltest.bat index d2b808e83a..8a11ad42d4 100644 --- a/tests/system-test/fulltest.bat +++ b/tests/system-test/fulltest.bat @@ -6,7 +6,7 @@ python3 .\test.py -f 0-others\telemetry.py python3 .\test.py -f 0-others\taosdMonitor.py python3 .\test.py -f 0-others\udfTest.py python3 .\test.py -f 0-others\udf_create.py -@REM python3 .\test.py -f 0-others\udf_restart_taosd.py +python3 .\test.py -f 0-others\udf_restart_taosd.py python3 .\test.py -f 0-others\cachelast.py python3 .\test.py -f 0-others\user_control.py @@ -21,7 +21,7 @@ python3 .\test.py -f 1-insert\alter_table.py python3 .\test.py -f 2-query\between.py python3 .\test.py -f 2-query\distinct.py python3 .\test.py -f 2-query\varchar.py -@REM python3 .\test.py -f 2-query\ltrim.py +python3 .\test.py -f 2-query\ltrim.py python3 .\test.py -f 2-query\rtrim.py python3 .\test.py -f 2-query\length.py python3 .\test.py -f 2-query\char_length.py @@ -32,12 +32,12 @@ python3 .\test.py -f 2-query\join2.py python3 .\test.py -f 2-query\cast.py python3 .\test.py -f 2-query\union.py python3 .\test.py -f 2-query\union1.py -@REM python3 .\test.py -f 2-query\concat.py +python3 .\test.py -f 2-query\concat.py python3 .\test.py -f 2-query\concat2.py python3 .\test.py -f 2-query\concat_ws.py python3 .\test.py -f 2-query\concat_ws2.py -@REM python3 .\test.py -f 2-query\check_tsdb.py -@REM python3 .\test.py -f 2-query\spread.py +python3 .\test.py -f 2-query\check_tsdb.py +python3 .\test.py -f 2-query\spread.py @REM python3 .\test.py -f 2-query\hyperloglog.py python3 .\test.py -f 2-query\timezone.py @@ -71,7 +71,7 @@ python3 .\test.py -f 2-query\tan.py python3 .\test.py -f 2-query\arcsin.py python3 .\test.py -f 2-query\arccos.py python3 .\test.py -f 2-query\arctan.py -@REM python3 .\test.py -f 2-query\query_cols_tags_and_or.py +python3 .\test.py -f 2-query\query_cols_tags_and_or.py @REM # python3 .\test.py -f 2-query\nestedQuery.py @REM # TD-15983 subquery output duplicate name column. @REM # Please Xiangyang Guo modify the following script @@ -79,24 +79,24 @@ python3 .\test.py -f 2-query\arctan.py python3 .\test.py -f 2-query\avg.py python3 .\test.py -f 2-query\elapsed.py -@REM python3 .\test.py -f 2-query\csum.py +python3 .\test.py -f 2-query\csum.py python3 .\test.py -f 2-query\mavg.py python3 .\test.py -f 2-query\diff.py python3 .\test.py -f 2-query\sample.py -@REM python3 .\test.py -f 2-query\function_diff.py +python3 .\test.py -f 2-query\function_diff.py python3 .\test.py -f 2-query\unique.py python3 .\test.py -f 2-query\stateduration.py python3 .\test.py -f 2-query\function_stateduration.py python3 .\test.py -f 2-query\statecount.py -@REM python3 .\test.py -f 7-tmq\basic5.py -@REM python3 .\test.py -f 7-tmq\subscribeDb.py -@REM python3 .\test.py -f 7-tmq\subscribeDb0.py -@REM python3 .\test.py -f 7-tmq\subscribeDb1.py -@REM python3 .\test.py -f 7-tmq\subscribeStb.py -@REM python3 .\test.py -f 7-tmq\subscribeStb0.py -@REM python3 .\test.py -f 7-tmq\subscribeStb1.py -@REM python3 .\test.py -f 7-tmq\subscribeStb2.py -@REM python3 .\test.py -f 7-tmq\subscribeStb3.py -@REM python3 .\test.py -f 7-tmq\subscribeStb4.py -@REM python3 .\test.py -f 7-tmq\db.py \ No newline at end of file +python3 .\test.py -f 7-tmq\basic5.py +python3 .\test.py -f 7-tmq\subscribeDb.py +python3 .\test.py -f 7-tmq\subscribeDb0.py +python3 .\test.py -f 7-tmq\subscribeDb1.py +python3 .\test.py -f 7-tmq\subscribeStb.py +python3 .\test.py -f 7-tmq\subscribeStb0.py +python3 .\test.py -f 7-tmq\subscribeStb1.py +python3 .\test.py -f 7-tmq\subscribeStb2.py +python3 .\test.py -f 7-tmq\subscribeStb3.py +python3 .\test.py -f 7-tmq\subscribeStb4.py +python3 .\test.py -f 7-tmq\db.py \ No newline at end of file diff --git a/tests/system-test/test-all.bat b/tests/system-test/test-all.bat index 819de3d87e..076be6563a 100644 --- a/tests/system-test/test-all.bat +++ b/tests/system-test/test-all.bat @@ -22,10 +22,11 @@ echo Windows Taosd Test for /F "usebackq tokens=*" %%i in (simpletest.bat) do ( for /f "tokens=1* delims= " %%a in ("%%i") do if not "%%a" == "@REM" ( set /a a+=1 + set timeNow=!time! echo !a! Processing %%i - call :GetTimeSeconds !time! + call :GetTimeSeconds !timeNow! set time1=!_timeTemp! - echo Start at !time! + echo Start at !timeNow! call %%i ARG1 > result_!a!.txt 2>error_!a!.txt if errorlevel 1 ( call :colorEcho 0c "failed" &echo. && echo result: && cat result_!a!.txt && echo error: && cat error_!a!.txt && exit 8 ) else ( call :colorEcho 0a "Success" &echo. ) ) @@ -45,10 +46,11 @@ for /F "usebackq tokens=*" %%i in (simpletest.bat) do ( exit :colorEcho -call :GetTimeSeconds %time% +set timeNow=%time% +call :GetTimeSeconds %timeNow% set time2=%_timeTemp% set /a interTime=%time2% - %time1% -echo End at %time% , cast %interTime%s +echo End at %timeNow% , cast %interTime%s echo off "%~2" findstr /v /a:%1 /R "^$" "%~2" nul @@ -62,14 +64,14 @@ set tt=%tt::= % set index=1 for %%a in (%tt%) do ( if !index! EQU 1 ( - set hh=%%a + set /a hh=%%a )^ else if !index! EQU 2 ( - set mm=%%a + set /a mm=%%a )^ else if !index! EQU 3 ( - set ss=%%a + set /a ss=%%a ) set /a index=index+1 ) diff --git a/tests/test/c/tmqSim.c b/tests/test/c/tmqSim.c index 4d9e2275f4..2710a00a72 100644 --- a/tests/test/c/tmqSim.c +++ b/tests/test/c/tmqSim.c @@ -128,7 +128,12 @@ void initLogFile() { sprintf(filename,"%s/../log/tmqlog_%s.txt", configDir, getCurrentTimeString(tmpString)); //sprintf(filename, "%s/../log/tmqlog.txt", configDir); - +#ifdef WINDOWS + for (int i = 2; i < sizeof(filename); i++) { + if (filename[i] == ':') filename[i] = '-'; + if (filename[i] == '\0') break; + } +#endif TdFilePtr pFile = taosOpenFile(filename, TD_FILE_TEXT | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_STREAM); if (NULL == pFile) { fprintf(stderr, "Failed to open %s for save result\n", filename); @@ -308,8 +313,10 @@ static int32_t msg_process(TAOS_RES* msg, SThreadInfo* pInfo, int32_t msgIndex) taos_print_row(buf, row, fields, numOfFields); + const char* tbName = tmq_get_table_name(msg); + if (0 != g_stConfInfo.showRowFlag) { - taosFprintfFile(g_fp, "rows[%d]: %s\n", totalRows, buf); + taosFprintfFile(g_fp, "tbname:%s, rows[%d]: %s\n", (tbName != NULL ? tbName:"null table"), totalRows, buf); if (0 != g_stConfInfo.saveRowFlag) { saveConsumeContentToTbl(pInfo, buf); } @@ -356,6 +363,8 @@ void build_consumer(SThreadInfo* pInfo) { tmq_conf_set(conf, pInfo->key[i], pInfo->value[i]); } + tmq_conf_set(conf, "msg.with.table.name", "true"); + // tmq_conf_set(conf, "client.id", "c-001"); // tmq_conf_set(conf, "enable.auto.commit", "true"); @@ -528,6 +537,12 @@ void* consumeThreadFunc(void* param) { // save consume result into consumeresult table saveConsumeResult(pInfo); + // save rows from per vgroup + taosFprintfFile(g_fp, "======== consumerId: %d, consume rows from per vgroups ========\n", pInfo->consumerId); + for (int32_t i = 0; i < pInfo->numOfVgroups; i++) { + taosFprintfFile(g_fp, "vgroups: %04d, rows: %d\n", pInfo->rowsOfPerVgroups[i][0], pInfo->rowsOfPerVgroups[i][1]); + } + return NULL; } diff --git a/tools/taos-tools b/tools/taos-tools index 717f5aaa5f..1446be9516 160000 --- a/tools/taos-tools +++ b/tools/taos-tools @@ -1 +1 @@ -Subproject commit 717f5aaa5f0a1b4d92bb2ae68858fec554fb5eda +Subproject commit 1446be95164e6cda3ff88270f7cfa50d8430503f