diff --git a/cmake/taosadapter_CMakeLists.txt.in b/cmake/taosadapter_CMakeLists.txt.in index 16444c07f2..a9f8868f50 100644 --- a/cmake/taosadapter_CMakeLists.txt.in +++ b/cmake/taosadapter_CMakeLists.txt.in @@ -2,7 +2,7 @@ # taosadapter ExternalProject_Add(taosadapter GIT_REPOSITORY https://github.com/taosdata/taosadapter.git - GIT_TAG be729ab + GIT_TAG cc43ef0 SOURCE_DIR "${TD_SOURCE_DIR}/tools/taosadapter" BINARY_DIR "" #BUILD_IN_SOURCE TRUE diff --git a/cmake/taostools_CMakeLists.txt.in b/cmake/taostools_CMakeLists.txt.in index cb9502b2b2..7bde332c8c 100644 --- a/cmake/taostools_CMakeLists.txt.in +++ b/cmake/taostools_CMakeLists.txt.in @@ -2,7 +2,7 @@ # taos-tools ExternalProject_Add(taos-tools GIT_REPOSITORY https://github.com/taosdata/taos-tools.git - GIT_TAG 85b582b + GIT_TAG d58230c SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools" BINARY_DIR "" #BUILD_IN_SOURCE TRUE diff --git a/include/common/tcommon.h b/include/common/tcommon.h index 8f7808cf02..eaddf4e983 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -171,7 +171,8 @@ typedef struct SDataBlockInfo { STimeWindow calWin; // used for stream, do not serialize TSKEY watermark; // used for stream - char parTbName[TSDB_TABLE_NAME_LEN]; // used for stream + char parTbName[TSDB_TABLE_NAME_LEN]; // used for stream partition + STag* pTag; // used for stream partition } SDataBlockInfo; typedef struct SSDataBlock { diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index fb99ba5361..1028a899b6 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -1235,6 +1235,7 @@ void blockDataFreeRes(SSDataBlock* pBlock) { taosArrayDestroy(pBlock->pDataBlock); pBlock->pDataBlock = NULL; taosMemoryFreeClear(pBlock->pBlockAgg); + taosMemoryFree(pBlock->info.pTag); memset(&pBlock->info, 0, sizeof(SDataBlockInfo)); } @@ -1317,7 +1318,7 @@ SSDataBlock* createSpecialDataBlock(EStreamType type) { pBlock->info.rows = 0; pBlock->info.type = type; pBlock->info.rowSize = sizeof(TSKEY) + sizeof(TSKEY) + sizeof(uint64_t) + sizeof(uint64_t) + sizeof(TSKEY) + - sizeof(TSKEY) + TSDB_TABLE_NAME_LEN; + sizeof(TSKEY) + VARSTR_HEADER_SIZE + TSDB_TABLE_NAME_LEN; pBlock->info.watermark = INT64_MIN; pBlock->pDataBlock = taosArrayInit(6, sizeof(SColumnInfoData)); @@ -1345,7 +1346,7 @@ SSDataBlock* createSpecialDataBlock(EStreamType type) { // table name infoData.info.type = TSDB_DATA_TYPE_VARCHAR; - infoData.info.bytes = TSDB_TABLE_NAME_LEN; + infoData.info.bytes = VARSTR_HEADER_SIZE + TSDB_TABLE_NAME_LEN; taosArrayPush(pBlock->pDataBlock, &infoData); return pBlock; diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index fa291177c9..ce6b3b0656 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -677,7 +677,6 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { // build stream obj from request if (mndBuildStreamObjFromCreateReq(pMnode, &streamObj, &createStreamReq) < 0) { - /*ASSERT(0);*/ mError("stream:%s, failed to create since %s", createStreamReq.name, terrstr()); goto _OVER; } diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index 7230b6232f..521d12fdab 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -84,26 +84,6 @@ SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchem continue; } - STagVal tagVal = { - .cid = taosArrayGetSize(pDataBlock->pDataBlock) + 1, - .type = TSDB_DATA_TYPE_UBIGINT, - .i64 = (int64_t)pDataBlock->info.groupId, - }; - STag* pTag = NULL; - taosArrayClear(tagArray); - taosArrayPush(tagArray, &tagVal); - tTagNew(tagArray, 1, false, &pTag); - if (pTag == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - taosArrayDestroy(tagArray); - return NULL; - } - - SArray* tagName = taosArrayInit(1, TSDB_COL_NAME_LEN); - char tagNameStr[TSDB_COL_NAME_LEN] = {0}; - strcpy(tagNameStr, "group_id"); - taosArrayPush(tagName, tagNameStr); - // STag* pTag = NULL; // taosArrayClear(tagArray); // SArray *tagName = taosArrayInit(1, TSDB_COL_NAME_LEN); @@ -126,42 +106,76 @@ SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchem // } SVCreateTbReq createTbReq = {0}; + + // set const createTbReq.flags = 0; createTbReq.type = TSDB_CHILD_TABLE; + createTbReq.ctb.suid = suid; + // set super table name SName name = {0}; tNameFromString(&name, stbFullName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); createTbReq.ctb.stbName = strdup((char*)tNameGetTableName(&name)); // strdup(stbFullName); + // set tag content + taosArrayClear(tagArray); + STagVal tagVal = { + .cid = taosArrayGetSize(pDataBlock->pDataBlock) + 1, + .type = TSDB_DATA_TYPE_UBIGINT, + .i64 = (int64_t)pDataBlock->info.groupId, + }; + taosArrayPush(tagArray, &tagVal); + createTbReq.ctb.tagNum = taosArrayGetSize(tagArray); + + STag* pTag = NULL; + tTagNew(tagArray, 1, false, &pTag); + if (pTag == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + taosArrayDestroy(tagArray); + taosArrayDestroyP(schemaReqs, taosMemoryFree); + taosArrayDestroy(schemaReqSz); + return NULL; + } + createTbReq.ctb.pTag = (uint8_t*)pTag; + + // set tag name + SArray* tagName = taosArrayInit(1, TSDB_COL_NAME_LEN); + char tagNameStr[TSDB_COL_NAME_LEN] = {0}; + strcpy(tagNameStr, "group_id"); + taosArrayPush(tagName, tagNameStr); + createTbReq.ctb.tagName = tagName; + + // set table name if (pDataBlock->info.parTbName[0]) { createTbReq.name = strdup(pDataBlock->info.parTbName); } else { createTbReq.name = buildCtbNameByGroupId(stbFullName, pDataBlock->info.groupId); } - createTbReq.ctb.suid = suid; - createTbReq.ctb.pTag = (uint8_t*)pTag; - createTbReq.ctb.tagNum = taosArrayGetSize(tagArray); - createTbReq.ctb.tagName = tagName; - + // save schema len int32_t code; int32_t schemaLen; tEncodeSize(tEncodeSVCreateTbReq, &createTbReq, schemaLen, code); if (code < 0) { tdDestroySVCreateTbReq(&createTbReq); taosArrayDestroy(tagArray); - taosMemoryFreeClear(ret); + taosArrayDestroyP(schemaReqs, taosMemoryFree); + taosArrayDestroy(schemaReqSz); return NULL; } + taosArrayPush(schemaReqSz, &schemaLen); + // save schema str void* schemaStr = taosMemoryMalloc(schemaLen); if (schemaStr == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; tdDestroySVCreateTbReq(&createTbReq); + taosArrayDestroy(tagArray); + taosArrayDestroyP(schemaReqs, taosMemoryFree); + taosArrayDestroy(schemaReqSz); return NULL; } taosArrayPush(schemaReqs, &schemaStr); - taosArrayPush(schemaReqSz, &schemaLen); SEncoder encoder = {0}; tEncoderInit(&encoder, schemaStr, schemaLen); @@ -169,6 +183,10 @@ SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchem if (code < 0) { terrno = TSDB_CODE_OUT_OF_MEMORY; tdDestroySVCreateTbReq(&createTbReq); + taosArrayDestroy(tagArray); + taosArrayDestroyP(schemaReqs, taosMemoryFree); + taosArrayDestroy(schemaReqSz); + tEncoderClear(&encoder); return NULL; } tEncoderClear(&encoder); @@ -221,10 +239,8 @@ SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchem tqDebug("tq sink, convert block %d, rows: %d", i, rows); int32_t dataLen = 0; - - void* blkSchema = POINTER_SHIFT(blkHead, sizeof(SSubmitBlk)); - int32_t schemaLen = 0; + void* blkSchema = POINTER_SHIFT(blkHead, sizeof(SSubmitBlk)); if (createTb) { schemaLen = *(int32_t*)taosArrayGet(schemaReqSz, i); void* schemaStr = taosArrayGetP(schemaReqs, i); @@ -262,7 +278,7 @@ SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchem ret->length = htonl(ret->length); - if (schemaReqs) taosArrayDestroyP(schemaReqs, taosMemoryFree); + taosArrayDestroyP(schemaReqs, taosMemoryFree); taosArrayDestroy(schemaReqSz); return ret; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 76f45a1cef..df7e5ff06f 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1300,6 +1300,56 @@ static int32_t generateScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, return code; } +static void calBlockTag(SExprSupp* pTagCalSup, SSDataBlock* pBlock, SSDataBlock* pResBlock) { + if (pTagCalSup == NULL || pTagCalSup->numOfExprs == 0) return; + if (pBlock == NULL || pBlock->info.rows == 0) return; + + SSDataBlock* pSrcBlock = blockCopyOneRow(pBlock, 0); + ASSERT(pSrcBlock->info.rows == 1); + + blockDataEnsureCapacity(pResBlock, 1); + + projectApplyFunctions(pTagCalSup->pExprInfo, pResBlock, pSrcBlock, pTagCalSup->pCtx, 1, NULL); + ASSERT(pResBlock->info.rows == 1); + + // build tagArray + // build STag + // set STag + + blockDataDestroy(pSrcBlock); +} + +static void calBlockTbName(SExprSupp* pTbNameCalSup, SSDataBlock* pBlock) { + if (pTbNameCalSup == NULL || pTbNameCalSup->numOfExprs == 0) return; + if (pBlock == NULL || pBlock->info.rows == 0) return; + + SSDataBlock* pSrcBlock = blockCopyOneRow(pBlock, 0); + ASSERT(pSrcBlock->info.rows == 1); + + SSDataBlock* pResBlock = createDataBlock(); + pResBlock->info.rowSize = VARSTR_HEADER_SIZE + TSDB_TABLE_NAME_LEN; + SColumnInfoData data = createColumnInfoData(TSDB_DATA_TYPE_VARCHAR, TSDB_TABLE_NAME_LEN, 0); + taosArrayPush(pResBlock->pDataBlock, &data); + blockDataEnsureCapacity(pResBlock, 1); + + projectApplyFunctions(pTbNameCalSup->pExprInfo, pResBlock, pSrcBlock, pTbNameCalSup->pCtx, 1, NULL); + ASSERT(pResBlock->info.rows == 1); + ASSERT(taosArrayGetSize(pResBlock->pDataBlock) == 1); + SColumnInfoData* pCol = taosArrayGet(pResBlock->pDataBlock, 0); + ASSERT(pCol->info.type == TSDB_DATA_TYPE_VARCHAR); + + void* pData = colDataGetData(pCol, 0); + // TODO check tbname validation + if (pData != (void*)-1 && pData != NULL) { + memcpy(pBlock->info.parTbName, varDataVal(pData), varDataLen(pData)); + } else { + pBlock->info.parTbName[0] = 0; + } + + blockDataDestroy(pSrcBlock); + blockDataDestroy(pResBlock); +} + void appendOneRowToStreamSpecialBlock(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t* pUid, uint64_t* pGp, void* pTbName) { SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX); @@ -1421,28 +1471,7 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex); blockDataFreeRes((SSDataBlock*)pBlock); - if (pInfo->tbnameCalSup.numOfExprs > 0 && pInfo->pRes->info.rows > 0) { - SSDataBlock* pTmpBlock = blockCopyOneRow(pInfo->pRes, 0); - SSDataBlock* pResBlock = createDataBlock(); - pResBlock->info.rowSize = TSDB_TABLE_NAME_LEN; - SColumnInfoData data = createColumnInfoData(TSDB_DATA_TYPE_VARCHAR, TSDB_TABLE_NAME_LEN, 0); - taosArrayPush(pResBlock->pDataBlock, &data); - blockDataEnsureCapacity(pResBlock, 1); - projectApplyFunctions(pInfo->tbnameCalSup.pExprInfo, pResBlock, pTmpBlock, pInfo->tbnameCalSup.pCtx, 1, NULL); - ASSERT(pResBlock->info.rows == 1); - ASSERT(taosArrayGetSize(pResBlock->pDataBlock) == 1); - SColumnInfoData* pCol = taosArrayGet(pResBlock->pDataBlock, 0); - ASSERT(pCol->info.type == TSDB_DATA_TYPE_VARCHAR); - void* pData = colDataGetData(pCol, 0); - // TODO check tbname validation - if (pData != (void*)-1) { - memcpy(pInfo->pRes->info.parTbName, varDataVal(pData), varDataLen(pData)); - } else { - pInfo->pRes->info.parTbName[0] = 0; - } - blockDataDestroy(pTmpBlock); - blockDataDestroy(pResBlock); - } + calBlockTbName(&pInfo->tbnameCalSup, pInfo->pRes); return 0; } @@ -1780,6 +1809,7 @@ FETCH_NEXT_BLOCK: pSDB->info.type = pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE ? STREAM_NORMAL : STREAM_PULL_DATA; checkUpdateData(pInfo, true, pSDB, false); // printDataBlock(pSDB, "stream scan update"); + calBlockTbName(&pInfo->tbnameCalSup, pSDB); return pSDB; } blockDataCleanup(pInfo->pUpdateDataRes); diff --git a/source/libs/wal/src/walMeta.c b/source/libs/wal/src/walMeta.c index 5e70dce72d..8f97f78556 100644 --- a/source/libs/wal/src/walMeta.c +++ b/source/libs/wal/src/walMeta.c @@ -492,18 +492,21 @@ int walSaveMeta(SWal* pWal) { int metaVer = walFindCurMetaVer(pWal); char fnameStr[WAL_FILE_LEN]; walBuildMetaName(pWal, metaVer + 1, fnameStr); - TdFilePtr pMataFile = taosOpenFile(fnameStr, TD_FILE_CREATE | TD_FILE_WRITE); - if (pMataFile == NULL) { + TdFilePtr pMetaFile = taosOpenFile(fnameStr, TD_FILE_CREATE | TD_FILE_WRITE); + if (pMetaFile == NULL) { return -1; } char* serialized = walMetaSerialize(pWal); int len = strlen(serialized); - if (len != taosWriteFile(pMataFile, serialized, len)) { + if (len != taosWriteFile(pMetaFile, serialized, len)) { // TODO:clean file + + taosCloseFile(&pMetaFile); + taosRemoveFile(fnameStr); return -1; } - taosCloseFile(&pMataFile); + taosCloseFile(&pMetaFile); // delete old file if (metaVer > -1) { walBuildMetaName(pWal, metaVer, fnameStr); diff --git a/tests/system-test/0-others/show.py b/tests/system-test/0-others/show.py new file mode 100644 index 0000000000..673e795297 --- /dev/null +++ b/tests/system-test/0-others/show.py @@ -0,0 +1,60 @@ +################################################################### +# Copyright (c) 2016 by TAOS Technologies, Inc. +# All rights reserved. +# +# This file is proprietary and confidential to TAOS Technologies. +# No part of this file may be reproduced, stored, transmitted, +# disclosed or used in any form or by any means other than as +# expressly provided by the written permission from Jianhui Tao +# +################################################################### + +# -*- coding: utf-8 -*- + + +from util.log import * +from util.cases import * +from util.sql import * +import subprocess +from util.common import * + + +class TDTestCase: + def init(self, conn, logSql): + tdLog.debug("start to execute %s" % __file__) + tdSql.init(conn.cursor()) + self.dbname = 'db' + self.ins_param_list = ['dnodes','mnodes','qnodes','cluster','functions','users','grants','topics','subscriptions','streams'] + self.perf_param = ['apps','connections','consumers','queries','transactions'] + self.perf_param_list = ['apps','connections','consumers','queries','trans'] + + def ins_check(self): + for param in self.ins_param_list: + tdSql.query(f'show {param}') + show_result = tdSql.queryResult + tdSql.query(f'select * from information_schema.ins_{param}') + select_result = tdSql.queryResult + tdSql.checkEqual(show_result,select_result) + + def perf_check(self): + for param in range(len(self.perf_param_list)): + tdSql.query(f'show {self.perf_param[param]}') + if len(tdSql.queryResult) != 0: + show_result = tdSql.queryResult[0][0] + tdSql.query(f'select * from performance_schema.perf_{self.perf_param_list[param]}') + select_result = tdSql.queryResult[0][0] + tdSql.checkEqual(show_result,select_result) + else : + continue + def run(self): + tdSql.prepare() + self.ins_check() + self.perf_check() + + + def stop(self): + tdSql.close() + tdLog.success("%s successfully executed" % __file__) + +tdCases.addWindows(__file__, TDTestCase()) +tdCases.addLinux(__file__, TDTestCase()) diff --git a/tests/system-test/fulltest.sh b/tests/system-test/fulltest.sh index d4ac3e2844..1c6aef6286 100755 --- a/tests/system-test/fulltest.sh +++ b/tests/system-test/fulltest.sh @@ -37,7 +37,7 @@ python3 ./test.py -f 1-insert/table_param_ttl.py -R python3 ./test.py -f 1-insert/update_data_muti_rows.py python3 ./test.py -f 1-insert/db_tb_name_check.py python3 ./test.py -f 1-insert/database_pre_suf.py - +python3 ./test.py -f 0-others/show.py python3 ./test.py -f 2-query/abs.py python3 ./test.py -f 2-query/abs.py -R python3 ./test.py -f 2-query/and_or_for_byte.py