From f6200dd923153163717a520de0828e1dc3c8beb0 Mon Sep 17 00:00:00 2001 From: wangjiaming0909 <604227650@qq.com> Date: Tue, 26 Mar 2024 11:08:16 +0800 Subject: [PATCH] support long ctb names --- include/common/tcommon.h | 2 +- include/libs/function/functionMgt.h | 1 + include/libs/scalar/scalar.h | 1 + source/dnode/mnode/impl/src/mndSma.c | 29 +++-- source/libs/function/src/builtins.c | 24 ++++ source/libs/parser/src/parTranslater.c | 154 ++++++++++-------------- source/libs/planner/src/planOptimizer.c | 75 +++++------- source/libs/scalar/src/sclfunc.c | 30 +++++ tests/script/tsim/query/tableCount.sim | 2 +- tests/system-test/2-query/tsma.py | 51 +++++--- 10 files changed, 204 insertions(+), 165 deletions(-) diff --git a/include/common/tcommon.h b/include/common/tcommon.h index d543d8caab..58bf4f6365 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -415,7 +415,7 @@ typedef struct STUidTagInfo { int32_t taosGenCrashJsonMsg(int signum, char **pMsg, int64_t clusterId, int64_t startTime); #define TSMA_RES_STB_POSTFIX "_tsma_res_stb_" -#define TSMA_RES_CTB_PREFIX_LEN 33 // md5 output(32) and _ +#define MD5_OUTPUT_LEN 32 static inline bool isTsmaResSTb(const char* stbName) { const char* pos = strstr(stbName, TSMA_RES_STB_POSTFIX); diff --git a/include/libs/function/functionMgt.h b/include/libs/function/functionMgt.h index cff5f3f790..e77635727b 100644 --- a/include/libs/function/functionMgt.h +++ b/include/libs/function/functionMgt.h @@ -88,6 +88,7 @@ typedef enum EFunctionType { FUNCTION_TYPE_LTRIM, FUNCTION_TYPE_RTRIM, FUNCTION_TYPE_SUBSTR, + FUNCTION_TYPE_MD5, // conversion function FUNCTION_TYPE_CAST = 2000, diff --git a/include/libs/scalar/scalar.h b/include/libs/scalar/scalar.h index ac7f3e5c20..bee8d2e943 100644 --- a/include/libs/scalar/scalar.h +++ b/include/libs/scalar/scalar.h @@ -72,6 +72,7 @@ int32_t upperFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOut int32_t ltrimFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); int32_t rtrimFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); int32_t substrFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); +int32_t md5Function(SScalarParam* pInput, int32_t inputNum, SScalarParam* pOutput); /* Conversion functions */ int32_t castFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); diff --git a/source/dnode/mnode/impl/src/mndSma.c b/source/dnode/mnode/impl/src/mndSma.c index 8c90795472..88dff29d30 100644 --- a/source/dnode/mnode/impl/src/mndSma.c +++ b/source/dnode/mnode/impl/src/mndSma.c @@ -70,7 +70,7 @@ typedef struct SCreateTSMACxt { const SDbObj * pDb; SStbObj * pSrcStb; SSmaObj * pSma; - const SSmaObj * pRecursiveSma; + const SSmaObj * pBaseSma; SCMCreateStreamReq *pCreateStreamReq; SMDropStreamReq * pDropStreamReq; const char * streamName; @@ -1396,7 +1396,7 @@ static void initSMAObj(SCreateTSMACxt* pCxt) { memcpy(pCxt->pSma->name, pCxt->pCreateSmaReq->name, TSDB_TABLE_FNAME_LEN); memcpy(pCxt->pSma->stb, pCxt->pCreateSmaReq->stb, TSDB_TABLE_FNAME_LEN); memcpy(pCxt->pSma->db, pCxt->pDb->name, TSDB_DB_FNAME_LEN); - if (pCxt->pRecursiveSma) memcpy(pCxt->pSma->baseSmaName, pCxt->pRecursiveSma->name, TSDB_TABLE_FNAME_LEN); + if (pCxt->pBaseSma) memcpy(pCxt->pSma->baseSmaName, pCxt->pBaseSma->name, TSDB_TABLE_FNAME_LEN); pCxt->pSma->createdTime = taosGetTimestampMs(); pCxt->pSma->uid = mndGenerateUid(pCxt->pCreateSmaReq->name, TSDB_TABLE_FNAME_LEN); @@ -1448,7 +1448,7 @@ static void mndCreateTSMABuildCreateStreamReq(SCreateTSMACxt *pCxt) { pCxt->pCreateStreamReq->fillHistory = STREAM_FILL_HISTORY_ON; pCxt->pCreateStreamReq->maxDelay = 10000; pCxt->pCreateStreamReq->watermark = 0; - pCxt->pCreateStreamReq->numOfTags = pCxt->pSrcStb ? pCxt->pSrcStb->numOfTags : 0; + pCxt->pCreateStreamReq->numOfTags = pCxt->pSrcStb ? pCxt->pSrcStb->numOfTags + 1 : 1; pCxt->pCreateStreamReq->checkpointFreq = 0; pCxt->pCreateStreamReq->createStb = 1; pCxt->pCreateStreamReq->targetStbUid = 0; @@ -1460,10 +1460,10 @@ static void mndCreateTSMABuildCreateStreamReq(SCreateTSMACxt *pCxt) { pCxt->pCreateStreamReq->sql = strdup(pCxt->pCreateSmaReq->sql); // construct tags + pCxt->pCreateStreamReq->pTags = taosArrayInit(pCxt->pCreateStreamReq->numOfTags, sizeof(SField)); + SField f = {0}; if (pCxt->pSrcStb) { - pCxt->pCreateStreamReq->pTags = taosArrayInit(pCxt->pSrcStb->numOfTags, sizeof(SField)); for (int32_t idx = 0; idx < pCxt->pSrcStb->numOfTags; ++idx) { - SField f = {0}; SSchema *pSchema = &pCxt->pSrcStb->pTags[idx]; f.bytes = pSchema->bytes; f.type = pSchema->type; @@ -1472,6 +1472,11 @@ static void mndCreateTSMABuildCreateStreamReq(SCreateTSMACxt *pCxt) { taosArrayPush(pCxt->pCreateStreamReq->pTags, &f); } } + f.bytes = TSDB_TABLE_FNAME_LEN - 1 + VARSTR_HEADER_SIZE; + f.flags = COL_SMA_ON; + f.type = TSDB_DATA_TYPE_BINARY; + tstrncpy(f.name, "tbname", strlen("tbname") + 1); + taosArrayPush(pCxt->pCreateStreamReq->pTags, &f); } static void mndCreateTSMABuildDropStreamReq(SCreateTSMACxt* pCxt) { @@ -1604,7 +1609,7 @@ static int32_t mndProcessCreateTSMAReq(SRpcMsg* pReq) { SDbObj * pDb = NULL; SStbObj * pStb = NULL; SSmaObj * pSma = NULL; - SSmaObj * pRecursiveTsma = NULL; + SSmaObj * pBaseTsma = NULL; SStreamObj * pStream = NULL; int64_t mTraceId = TRACE_GET_ROOTID(&pReq->info.traceId); SMCreateSmaReq createReq = {0}; @@ -1673,8 +1678,8 @@ static int32_t mndProcessCreateTSMAReq(SRpcMsg* pReq) { } if (createReq.recursiveTsma) { - pRecursiveTsma = sdbAcquire(pMnode->pSdb, SDB_SMA, createReq.baseTsmaName); - if (!pRecursiveTsma) { + pBaseTsma = sdbAcquire(pMnode->pSdb, SDB_SMA, createReq.baseTsmaName); + if (!pBaseTsma) { mError("base tsma: %s not found when creating recursive tsma", createReq.baseTsmaName); terrno = TSDB_CODE_MND_SMA_NOT_EXIST; goto _OVER; @@ -1690,7 +1695,7 @@ static int32_t mndProcessCreateTSMAReq(SRpcMsg* pReq) { .pDb = pDb, .pRpcReq = pReq, .pSma = NULL, - .pRecursiveSma = pRecursiveTsma, + .pBaseSma = pBaseTsma, .pSrcStb = pStb, }; @@ -1703,7 +1708,7 @@ _OVER: } if (pStb) mndReleaseStb(pMnode, pStb); - if (pRecursiveTsma) mndReleaseSma(pMnode, pRecursiveTsma); + if (pBaseTsma) mndReleaseSma(pMnode, pBaseTsma); mndReleaseSma(pMnode, pSma); mndReleaseStream(pMnode, pStream); mndReleaseDb(pMnode, pDb); @@ -1871,8 +1876,9 @@ static int32_t mndRetrieveTSMA(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlo while (numOfRows < rows) { pIter->pSmaIter = sdbFetch(pMnode->pSdb, SDB_SMA, pIter->pSmaIter, (void **)&pSma); if (pIter->pSmaIter == NULL) break; + SDbObj* pSrcDb = mndAcquireDb(pMnode, pSma->db); - if (pDb && pSma->dbUid != pDb->uid) { + if ((pDb && pSma->dbUid != pDb->uid) || !pSrcDb) { sdbRelease(pMnode->pSdb, pSma); continue; } @@ -1915,7 +1921,6 @@ static int32_t mndRetrieveTSMA(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlo // interval char interval[64 + VARSTR_HEADER_SIZE] = {0}; - SDbObj* pSrcDb = mndAcquireDb(pMnode, pSma->db); int32_t len = snprintf(interval + VARSTR_HEADER_SIZE, 64, "%" PRId64 "%c", pSma->interval, getPrecisionUnit(pSrcDb->cfg.precision)); varDataSetLen(interval, len); diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 6cf2019127..fc94d5c2ca 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -2506,6 +2506,20 @@ static int32_t translateTableCountPseudoColumn(SFunctionNode* pFunc, char* pErrB return TSDB_CODE_SUCCESS; } +static int32_t translateMd5(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { + if (1 != LIST_LENGTH(pFunc->pParameterList)) { + return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName); + } + + uint8_t para1Type = getSDataTypeFromNode(nodesListGetNode(pFunc->pParameterList, 0))->type; + if (para1Type != TSDB_DATA_TYPE_VARCHAR) { + return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); + } + + pFunc->node.resType = (SDataType){.bytes = MD5_OUTPUT_LEN, .type = TSDB_DATA_TYPE_VARCHAR}; + return TSDB_CODE_SUCCESS; +} + // clang-format off const SBuiltinFuncDefinition funcMgtBuiltins[] = { { @@ -4094,6 +4108,16 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .processFunc = hllFunctionMerge, .finalizeFunc = hllPartialFinalize, }, + { + .name = "md5", + .type = FUNCTION_TYPE_MD5, + .classification = FUNC_MGT_SCALAR_FUNC, + .translateFunc = translateMd5, + .getEnvFunc = NULL, + .initFunc = NULL, + .sprocessFunc = md5Function, + .finalizeFunc = NULL + }, }; // clang-format on diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index da12cbdc17..5cf5e09a78 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -3669,15 +3669,15 @@ static int32_t setTableTsmas(STranslateContext* pCxt, SName* pName, SRealTableNo taosArrayDestroyP(pRealTable->tsmaTargetTbVgInfo, taosMemoryFree); pRealTable->tsmaTargetTbVgInfo = NULL; } - char buf[TSDB_TABLE_FNAME_LEN]; + char buf[TSDB_TABLE_FNAME_LEN + TSDB_TABLE_NAME_LEN + 1]; for (int32_t i = 0; i < pRealTable->pTsmas->size; ++i) { STableTSMAInfo* pTsma = taosArrayGetP(pRealTable->pTsmas, i); SName tsmaTargetTbName = {0}; toName(pCxt->pParseCxt->acctId, pRealTable->table.dbName, "", &tsmaTargetTbName); - int32_t len = snprintf(buf, TSDB_TABLE_FNAME_LEN, "%s.%s", pTsma->dbFName, pTsma->name); + int32_t len = snprintf(buf, TSDB_TABLE_FNAME_LEN + TSDB_TABLE_NAME_LEN, "%s.%s_%s", pTsma->dbFName, pTsma->name, + pRealTable->table.tableName); len = taosCreateMD5Hash(buf, len); - len = sprintf(buf + len, "_%s", pRealTable->table.tableName); - strncpy(tsmaTargetTbName.tname, buf, TSDB_TABLE_NAME_LEN); + strncpy(tsmaTargetTbName.tname, buf, strlen(buf)); collectUseTable(&tsmaTargetTbName, pCxt->pTargetTables); SVgroupInfo vgInfo = {0}; bool exists = false; @@ -5697,15 +5697,14 @@ static int32_t setEqualTbnameTableVgroups(STranslateContext* pCxt, SSelectStmt* for (int32_t k = 0; k < pInfo->aTbnames->size; ++k) { const char* pTbName = taosArrayGetP(pInfo->aTbnames, k); - char* pNewTbName = taosMemoryCalloc(1, TSMA_RES_CTB_PREFIX_LEN + strlen(pTbName) + 1); + char* pNewTbName = taosMemoryCalloc(1, TSDB_TABLE_FNAME_LEN + TSDB_TABLE_NAME_LEN + 1); if (!pNewTbName) { code = TSDB_CODE_OUT_OF_MEMORY; break; } taosArrayPush(pTbNames, &pNewTbName); - sprintf(pNewTbName, "%s.%s", pTsma->dbFName, pTsma->name); + sprintf(pNewTbName, "%s.%s_%s", pTsma->dbFName, pTsma->name, pTbName); int32_t len = taosCreateMD5Hash(pNewTbName, strlen(pNewTbName)); - sprintf(pNewTbName + len, "_%s", pTbName); } if (TSDB_CODE_SUCCESS == code) { vgsInfo = taosMemoryMalloc(sizeof(SVgroupsInfo) + nTbls * sizeof(SVgroupInfo)); @@ -7946,21 +7945,10 @@ static int32_t doTranslateDropSuperTable(STranslateContext* pCxt, const SName* p return code; } -static int32_t doTranslateDropCtbsWithTsma(STranslateContext* pCxt, SDropTableStmt* pStmt) { - SNode* pNode; - // note that there could have normal tables - - - - return TSDB_CODE_SUCCESS; -} - static int32_t translateDropTable(STranslateContext* pCxt, SDropTableStmt* pStmt) { SDropTableClause* pClause = (SDropTableClause*)nodesListGetNode(pStmt->pTables, 0); SName tableName; - if (pStmt->withTsma) { - return doTranslateDropCtbsWithTsma(pCxt, pStmt); - } + if (pStmt->withTsma) return TSDB_CODE_SUCCESS; return doTranslateDropSuperTable( pCxt, toName(pCxt->pParseCxt->acctId, pClause->dbName, pClause->tableName, &tableName), pClause->ignoreNotExists); } @@ -10556,68 +10544,44 @@ static bool sortColWithColId(SNode* pNode1, SNode* pNode2) { return pCol1->colId < pCol2->colId; } -static int32_t buildTSMAAstMakeConcatFuncNode(SCreateTSMAStmt* pStmt, SMCreateSmaReq* pReq, const SNode* pTbNameFunc, - SFunctionNode** pConcatFuncOut) { +static int32_t buildTSMAAstStreamSubTable(SCreateTSMAStmt* pStmt, SMCreateSmaReq* pReq, const SNode* pTbname, SNode** pSubTable) { int32_t code = 0; - SFunctionNode* pSubstrFunc = NULL; - SNode* pRes = NULL; - SValueNode* pTsmaNameHashVNode = NULL; + SFunctionNode* pMd5Func = (SFunctionNode*)nodesMakeNode(QUERY_NODE_FUNCTION); SFunctionNode* pConcatFunc = (SFunctionNode*)nodesMakeNode(QUERY_NODE_FUNCTION); + SValueNode* pVal = (SValueNode*)nodesMakeNode(QUERY_NODE_VALUE); + if (!pMd5Func || !pConcatFunc || !pVal) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _end; + } + sprintf(pMd5Func->functionName, "%s", "md5"); + sprintf(pConcatFunc->functionName, "%s", "concat"); + pVal->literal = taosMemoryMalloc(TSDB_TABLE_FNAME_LEN + 1); + if (!pVal->literal) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _end; + } + sprintf(pVal->literal, "%s_", pReq->name); + pVal->node.resType.type = TSDB_DATA_TYPE_VARCHAR; + pVal->node.resType.bytes = strlen(pVal->literal); + code = nodesListMakeAppend(&pConcatFunc->pParameterList, (SNode*)pVal); + if (code != TSDB_CODE_SUCCESS) goto _end; + pVal = NULL; - if (!pConcatFunc) code = TSDB_CODE_OUT_OF_MEMORY; + // not recursive tsma, md5(concat('1.test.tsma1_', tbname)) + // recursive tsma, md5(concat('1.test.tsma1_', `tbname`)), `tbname` is the last tag + code = nodesListStrictAppend(pConcatFunc->pParameterList, nodesCloneNode(pTbname)); + if (code != TSDB_CODE_SUCCESS) goto _end; - if (code == TSDB_CODE_SUCCESS) { - snprintf(pConcatFunc->functionName, TSDB_FUNC_NAME_LEN, "concat"); - code = nodesListMakeStrictAppend(&pConcatFunc->pParameterList, nodesMakeNode(QUERY_NODE_VALUE)); - } + code = nodesListMakeAppend(&pMd5Func->pParameterList, (SNode*)pConcatFunc); + if (code != TSDB_CODE_SUCCESS) goto _end; + pConcatFunc = NULL; + *pSubTable = (SNode*)pMd5Func; - if (TSDB_CODE_SUCCESS == code) { - pTsmaNameHashVNode = (SValueNode*)nodesListGetNode(pConcatFunc->pParameterList, 0); - pTsmaNameHashVNode->literal = taosMemoryCalloc(1, TSDB_TABLE_FNAME_LEN + 1); - if (!pTsmaNameHashVNode->literal) code = TSDB_CODE_OUT_OF_MEMORY; - } - if (TSDB_CODE_SUCCESS == code) { - sprintf(pTsmaNameHashVNode->literal, "%s", pReq->name); - int32_t len = taosCreateMD5Hash(pTsmaNameHashVNode->literal, strlen(pTsmaNameHashVNode->literal)); - ASSERT(len == TSMA_RES_CTB_PREFIX_LEN - 1); - sprintf(pTsmaNameHashVNode->literal + len, "_"); - pTsmaNameHashVNode->node.resType.type = TSDB_DATA_TYPE_VARCHAR; - pTsmaNameHashVNode->node.resType.bytes = strlen(pTsmaNameHashVNode->literal); - } - - if (TSDB_CODE_SUCCESS == code && pStmt->pOptions->recursiveTsma) { - pSubstrFunc = (SFunctionNode*)nodesMakeNode(QUERY_NODE_FUNCTION); - if (!pSubstrFunc) code = TSDB_CODE_OUT_OF_MEMORY; - if (TSDB_CODE_SUCCESS == code) { - snprintf(pSubstrFunc->functionName, TSDB_FUNC_NAME_LEN, "substr"); - code = nodesListMakeStrictAppend(&pSubstrFunc->pParameterList, nodesCloneNode(pTbNameFunc)); - } - if (TSDB_CODE_SUCCESS == code) { - code = nodesListMakeStrictAppend(&pSubstrFunc->pParameterList, nodesMakeNode(QUERY_NODE_VALUE)); - if (TSDB_CODE_SUCCESS == code) { - SValueNode* pV = (SValueNode*)pSubstrFunc->pParameterList->pTail->pNode; - pV->literal = taosMemoryCalloc(1, 64); - if (!pV->literal) code = TSDB_CODE_OUT_OF_MEMORY; - sprintf(pV->literal, "%d", TSMA_RES_CTB_PREFIX_LEN + 1); - pV->isDuration = false; - pV->translate = false; - pV->node.resType.type = TSDB_DATA_TYPE_INT; - pV->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_INT].bytes; - } - } - } - if (TSDB_CODE_SUCCESS == code) { - if (pSubstrFunc) { - code = nodesListAppend(pConcatFunc->pParameterList, (SNode*)pSubstrFunc); - } else { - code = nodesListStrictAppend(pConcatFunc->pParameterList, nodesCloneNode(pTbNameFunc)); - } - } - if (TSDB_CODE_SUCCESS == code) { - *pConcatFuncOut = pConcatFunc; - } else { - nodesDestroyNode((SNode*)pSubstrFunc); - nodesDestroyNode((SNode*)pConcatFunc); +_end: + if (code) { + if (pMd5Func) nodesDestroyNode((SNode*)pMd5Func); + if (pConcatFunc) nodesDestroyNode((SNode*)pConcatFunc); + if (pVal) nodesDestroyNode((SNode*)pVal); } return code; } @@ -10633,33 +10597,38 @@ static int32_t buildTSMAAst(STranslateContext* pCxt, SCreateTSMAStmt* pStmt, SMC info.pInterval = nodesCloneNode(pStmt->pOptions->pInterval); if (!info.pFuncs || !info.pInterval) code = TSDB_CODE_OUT_OF_MEMORY; + SFunctionNode* pTbnameFunc = NULL; if (TSDB_CODE_SUCCESS == code) { // append partition by tbname - SFunctionNode* pTbnameFunc = (SFunctionNode*)createTbnameFunction(); + pTbnameFunc = (SFunctionNode*)createTbnameFunction(); if (pTbnameFunc) { sprintf(pTbnameFunc->node.userAlias, "tbname"); - nodesListMakeAppend(&info.pPartitionByList, (SNode*)pTbnameFunc); + code = nodesListMakeStrictAppend(&info.pPartitionByList, (SNode*)pTbnameFunc); } else { code = TSDB_CODE_OUT_OF_MEMORY; } - if (code == TSDB_CODE_SUCCESS) { - SFunctionNode* pConcatFunc = NULL; - code = buildTSMAAstMakeConcatFuncNode(pStmt, pReq, (const SNode*)pTbnameFunc, &pConcatFunc); - if (code == TSDB_CODE_SUCCESS) { - info.pSubTable = (SNode*)pConcatFunc; - } - } } if (TSDB_CODE_SUCCESS == code) { // append partition by tags + SNode* pTagCol = NULL; for (int32_t idx = 0; idx < numOfTags; ++idx) { - SNode* pTagCol = createColumnNodeWithName(pTags[idx].name); + pTagCol = createColumnNodeWithName(pTags[idx].name); if (!pTagCol) { code = TSDB_CODE_OUT_OF_MEMORY; break; } nodesListAppend(info.pPartitionByList, pTagCol); - nodesListMakeAppend(&info.pTags, nodesCloneNode(pTagCol)); + code = nodesListMakeStrictAppend(&info.pTags, nodesCloneNode(pTagCol)); + } + + // sub table + if (code == TSDB_CODE_SUCCESS) { + SFunctionNode* pSubTable = NULL; + code = buildTSMAAstStreamSubTable(pStmt, pReq, pStmt->pOptions->recursiveTsma ? pTagCol : (SNode*)pTbnameFunc, (SNode**)&pSubTable); + if (code == TSDB_CODE_SUCCESS) { + info.pSubTable = (SNode*)pSubTable; + } + code = nodesListMakeStrictAppend(&info.pTags, nodesCloneNode((SNode*)pTbnameFunc)); } } @@ -12405,7 +12374,7 @@ static int32_t rewriteDropTable(STranslateContext* pCxt, SQuery* pQuery) { pStmt->withTsma = pTsmas && pTsmas->size > 0; } pClause->pTsmas = pTsmas; - if (tableType == TSDB_NORMAL_TABLE && pStmt->withTsma) { + if (tableType == TSDB_NORMAL_TABLE && pTsmas && pTsmas->size > 0) { taosHashCleanup(pVgroupHashmap); return TSDB_CODE_TSMA_MUST_BE_DROPPED; } @@ -12453,6 +12422,13 @@ static int32_t rewriteDropTable(STranslateContext* pCxt, SQuery* pQuery) { static int32_t buildUpdateTagValReq(STranslateContext* pCxt, SAlterTableStmt* pStmt, STableMeta* pTableMeta, SVAlterTbReq* pReq) { + SName tbName = {0}; + SArray* pTsmas = NULL; + toName(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tableName, &tbName); + int32_t code = getTableTsmasFromCache(pCxt->pMetaCache, &tbName, &pTsmas); + if (code != TSDB_CODE_SUCCESS) return code; + if (pTsmas && pTsmas->size > 0) return TSDB_CODE_TSMA_MUST_BE_DROPPED; + SSchema* pSchema = getTagSchema(pTableMeta, pStmt->colName); if (NULL == pSchema) { return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_ALTER_TABLE, "Invalid tag name: %s", @@ -12469,8 +12445,6 @@ static int32_t buildUpdateTagValReq(STranslateContext* pCxt, SAlterTableStmt* pS pReq->colId = pSchema->colId; pReq->tagType = pSchema->type; - int32_t code = 0; - STag* pTag = NULL; SToken token; char tokenBuf[TSDB_MAX_TAGS_LEN]; diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index 29dc28c11e..86a2e9f1b7 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -6175,57 +6175,46 @@ static int32_t tsmaOptRewriteTag(const STSMAOptCtx* pTsmaOptCtx, const STSMAOptU static int32_t tsmaOptRewriteTbname(const STSMAOptCtx* pTsmaOptCtx, SNode** pTbNameNode, const STSMAOptUsefulTsma* pTsma) { - int32_t code = 0; - SFunctionNode* pRewrittenFunc = (SFunctionNode*)nodesMakeNode(QUERY_NODE_FUNCTION); - SValueNode* pValue = (SValueNode*)nodesMakeNode(QUERY_NODE_VALUE); - if (!pRewrittenFunc || !pValue) code = TSDB_CODE_OUT_OF_MEMORY; + int32_t code = 0; + SExprNode* pRewrittenFunc = (SExprNode*)nodesMakeNode(pTsma ? QUERY_NODE_COLUMN : QUERY_NODE_FUNCTION); + SValueNode* pValue = NULL; + if (!pRewrittenFunc) code = TSDB_CODE_OUT_OF_MEMORY; if (code == TSDB_CODE_SUCCESS) { - pRewrittenFunc->node.resType = ((SExprNode*)(*pTbNameNode))->resType; - pValue->translate = true; + pRewrittenFunc->resType = ((SExprNode*)(*pTbNameNode))->resType; } if (pTsma && code == TSDB_CODE_SUCCESS) { - // TODO tsma test child tbname too long - // if with tsma, we replace func tbname with substr(tbname, TSMA_RES_CTB_PREFIX_LEN) - pRewrittenFunc->funcId = fmGetFuncId("substr"); - snprintf(pRewrittenFunc->functionName, TSDB_FUNC_NAME_LEN, "substr"); - pValue->node.resType.type = TSDB_DATA_TYPE_INT; - pValue->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_INT].bytes; - pValue->literal = taosMemoryCalloc(1, 16); - pValue->datum.i = TSMA_RES_CTB_PREFIX_LEN + 1; - if (!pValue->literal) code = TSDB_CODE_OUT_OF_MEMORY; - if (code == TSDB_CODE_SUCCESS) { - sprintf(pValue->literal, "%d", TSMA_RES_CTB_PREFIX_LEN + 1); - code = nodesListMakeAppend(&pRewrittenFunc->pParameterList, *pTbNameNode); - } - if (code == TSDB_CODE_SUCCESS) { - code = nodesListAppend(pRewrittenFunc->pParameterList, (SNode*)pValue); - } + nodesDestroyNode(*pTbNameNode); + SColumnNode* pCol = (SColumnNode*)pRewrittenFunc; + const SSchema* pSchema = taosArrayGet(pTsma->pTsma->pTags, pTsma->pTsma->pTags->size - 1); + strcpy(pCol->tableName, pTsma->targetTbName); + strcpy(pCol->tableAlias, pTsma->targetTbName); + pCol->tableId = pTsma->targetTbUid; + pCol->tableType = TSDB_SUPER_TABLE; + pCol->colId = pSchema->colId; + pCol->colType = COLUMN_TYPE_TAG; } else if (code == TSDB_CODE_SUCCESS) { // if no tsma, we replace func tbname with concat('', tbname) - pRewrittenFunc->funcId = fmGetFuncId("concat"); - snprintf(pRewrittenFunc->functionName, TSDB_FUNC_NAME_LEN, "concat"); + SFunctionNode* pFunc = (SFunctionNode*)pRewrittenFunc; + pFunc->funcId = fmGetFuncId("concat"); + snprintf(pFunc->functionName, TSDB_FUNC_NAME_LEN, "concat"); + pValue = (SValueNode*)nodesMakeNode(QUERY_NODE_VALUE); + if (!pValue) code = TSDB_CODE_OUT_OF_MEMORY; - pValue->node.resType = ((SExprNode*)(*pTbNameNode))->resType; - pValue->literal = taosMemoryCalloc(1, TSDB_TABLE_FNAME_LEN + 1); - pValue->datum.p = taosMemoryCalloc(1, TSDB_TABLE_FNAME_LEN + 1 + VARSTR_HEADER_SIZE); - if (!pValue->literal || !pValue->datum.p) code = TSDB_CODE_OUT_OF_MEMORY; + if (code == TSDB_CODE_SUCCESS) { + pValue->translate = true; + pValue->node.resType = ((SExprNode*)(*pTbNameNode))->resType; + pValue->literal = taosMemoryCalloc(1, TSDB_TABLE_FNAME_LEN + 1); + pValue->datum.p = taosMemoryCalloc(1, TSDB_TABLE_FNAME_LEN + 1 + VARSTR_HEADER_SIZE); + if (!pValue->literal || !pValue->datum.p) code = TSDB_CODE_OUT_OF_MEMORY; + } - if (0 && code == TSDB_CODE_SUCCESS) { - sprintf(pValue->literal, "%s.%s", pTsma->pTsma->dbFName, pTsma->pTsma->name); - int32_t len = taosCreateMD5Hash(pValue->literal, strlen(pValue->literal)); - pValue->literal[len] = '_'; - strcpy(pValue->datum.p, pValue->literal); - pValue->node.resType.bytes = len + 1; - varDataSetLen(pValue->datum.p, pValue->node.resType.bytes); - strncpy(pValue->datum.p, pValue->literal, pValue->node.resType.bytes); - pRewrittenFunc->node.resType.bytes += pValue->node.resType.bytes; + if (code == TSDB_CODE_SUCCESS) { + code = nodesListMakeStrictAppend(&pFunc->pParameterList, (SNode*)pValue); + pValue = NULL; } if (code == TSDB_CODE_SUCCESS) { - code = nodesListMakeAppend(&pRewrittenFunc->pParameterList, (SNode*)pValue); - } - if (code == TSDB_CODE_SUCCESS) { - code = nodesListStrictAppend(pRewrittenFunc->pParameterList, *pTbNameNode); + code = nodesListStrictAppend(pFunc->pParameterList, *pTbNameNode); } } @@ -6233,7 +6222,7 @@ static int32_t tsmaOptRewriteTbname(const STSMAOptCtx* pTsmaOptCtx, SNode** pTbN *pTbNameNode = (SNode*)pRewrittenFunc; } else { nodesDestroyNode((SNode*)pRewrittenFunc); - nodesDestroyNode((SNode*)pValue); + if (pValue) nodesDestroyNode((SNode*)pValue); } return code; @@ -6327,7 +6316,7 @@ static int32_t tsmaOptRewriteScan(STSMAOptCtx* pTsmaOptCtx, SScanLogicNode* pNew strcpy(pNewScan->tableName.tname, pTsma->targetTbName); } if (code == TSDB_CODE_SUCCESS) { - code = tsmaOptRewriteNodeList(pNewScan->pScanPseudoCols, pTsmaOptCtx, pTsma, false, true); + code = tsmaOptRewriteNodeList(pNewScan->pScanPseudoCols, pTsmaOptCtx, pTsma, true, true); } if (code == TSDB_CODE_SUCCESS) { code = tsmaOptRewriteNode(&pNewScan->pTagCond, pTsmaOptCtx, pTsma, true, true); diff --git a/source/libs/scalar/src/sclfunc.c b/source/libs/scalar/src/sclfunc.c index e732d1c587..da26d7e3b9 100644 --- a/source/libs/scalar/src/sclfunc.c +++ b/source/libs/scalar/src/sclfunc.c @@ -697,6 +697,36 @@ int32_t substrFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOu return TSDB_CODE_SUCCESS; } +int32_t md5Function(SScalarParam* pInput, int32_t inputNum, SScalarParam* pOutput) { + SColumnInfoData *pInputData = pInput->columnData; + SColumnInfoData *pOutputData = pOutput->columnData; + int32_t bufLen = TMAX(MD5_OUTPUT_LEN + VARSTR_HEADER_SIZE + 1, pInputData->info.bytes); + char* pOutputBuf = taosMemoryMalloc(bufLen); + if (!pOutputBuf) { + qError("md5 function alloc memory failed"); + return TSDB_CODE_OUT_OF_MEMORY; + } + for (int32_t i = 0; i < pInput->numOfRows; ++i) { + if (colDataIsNull_s(pInputData, i)) { + colDataSetNULL(pOutputData, i); + continue; + } + char *input = colDataGetData(pInput[0].columnData, i); + if (bufLen < varDataLen(input) + VARSTR_HEADER_SIZE) { + bufLen = varDataLen(input) + VARSTR_HEADER_SIZE; + pOutputBuf = taosMemoryRealloc(pOutputBuf, bufLen); + } + char *output = pOutputBuf; + memcpy(varDataVal(output), varDataVal(input), varDataLen(input)); + int32_t len = taosCreateMD5Hash(varDataVal(output), varDataLen(input)); + varDataSetLen(output, len); + colDataSetVal(pOutputData, i, output, false); + } + pOutput->numOfRows = pInput->numOfRows; + taosMemoryFree(pOutputBuf); + return TSDB_CODE_SUCCESS; +} + /** Conversion functions **/ int32_t castFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) { int16_t inputType = GET_PARAM_TYPE(&pInput[0]); diff --git a/tests/script/tsim/query/tableCount.sim b/tests/script/tsim/query/tableCount.sim index eae82a0a1b..4c9c8ce240 100644 --- a/tests/script/tsim/query/tableCount.sim +++ b/tests/script/tsim/query/tableCount.sim @@ -97,7 +97,7 @@ endi if $data42 != 3 then return -1 endi -if $data52 != 30 then +if $data52 != 31 then return -1 endi if $data62 != 5 then diff --git a/tests/system-test/2-query/tsma.py b/tests/system-test/2-query/tsma.py index bb67354022..71e1db22a8 100644 --- a/tests/system-test/2-query/tsma.py +++ b/tests/system-test/2-query/tsma.py @@ -9,7 +9,7 @@ from util.dnodes import * from util.common import * # from tmqCommon import * -ROUND = 1000 +ROUND = 2000 class TSMA: def __init__(self): @@ -53,10 +53,7 @@ class UsedTsma: def setIsTsma(self): self.is_tsma_ = self.name.endswith(self.TSMA_RES_STB_POSTFIX) if not self.is_tsma_: - pos = self.name.find('_') # for tsma output child table - if pos == 32: - self.is_tsma_ = True - + self.is_tsma_ = len(self.name) == 32 # for tsma output child table class TSMAQueryContext: def __init__(self) -> None: @@ -110,6 +107,11 @@ class TSMAQCBuilder: res = tdSql.queryResult[0][0] return res.timestamp() * 1000 + def md5(self, buf: str) -> str: + tdSql.query(f'select md5("{buf}")') + res = tdSql.queryResult[0][0] + return res + def should_query_with_table(self, tb_name: str, ts_begin: str = UsedTsma.TS_MIN, ts_end: str = UsedTsma.TS_MAX) -> 'TSMAQCBuilder': used_tsma: UsedTsma = UsedTsma() used_tsma.name = tb_name @@ -118,10 +120,11 @@ class TSMAQCBuilder: used_tsma.is_tsma_ = False self.qc_.used_tsmas.append(used_tsma) return self - - def should_query_with_tsma_ctb(self, tb_name: str, ts_begin: str = UsedTsma.TS_MIN, ts_end: str = UsedTsma.TS_MAX) -> 'TSMAQCBuilder': + + def should_query_with_tsma_ctb(self, db_name: str, tsma_name: str, ctb_name: str, ts_begin: str = UsedTsma.TS_MIN, ts_end: str = UsedTsma.TS_MAX) -> 'TSMAQCBuilder': used_tsma: UsedTsma = UsedTsma() - used_tsma.name = tb_name + name = f'1.{db_name}.{tsma_name}_{ctb_name}' + used_tsma.name = self.md5(name) used_tsma.time_range_start = self.to_timestamp(ts_begin) used_tsma.time_range_end = self.to_timestamp(ts_end) used_tsma.is_tsma_ = True @@ -711,7 +714,7 @@ class TDTestCase: if ctx.has_tsma(): if ctx.used_tsmas[0].name == tsma_name + UsedTsma.TSMA_RES_STB_POSTFIX: break - elif ctx.used_tsmas[0].name.find('_') == 32 and ctx.used_tsmas[0].name[33:] == tb: + elif len(ctx.used_tsmas[0].name) == 32 and 1: ## select md5 break else: time.sleep(1) @@ -768,14 +771,14 @@ class TDTestCase: self.test_query_child_table() self.test_skip_tsma_hint() self.test_long_tsma_name() - self.test_long_tb_name() + self.test_long_ctb_name() self.test_add_tag_col() self.test_modify_col_name_value() def test_union(self): ctxs = [] sql = 'select avg(c1) from meters union select avg(c1) from norm_tb' - ctx = TSMAQCBuilder().with_sql(sql).should_query_with_tsma('tsma2').should_query_with_tsma_ctb('d2f2c89f2b3378a2a48b4cadf9c3f927_norm_tb').get_qc() + ctx = TSMAQCBuilder().with_sql(sql).should_query_with_tsma('tsma2').should_query_with_tsma_ctb('test', 'tsma5', 'norm_tb').get_qc() ctxs.append(ctx) sql = 'select avg(c1), avg(c2) from meters where ts between "2018-09-17 09:00:00.000" and "2018-09-17 10:00:00.000" union select avg(c1), avg(c2) from meters where ts between "2018-09-17 09:00:00.200" and "2018-09-17 10:23:19.800"' ctxs.append(TSMAQCBuilder().with_sql(sql) @@ -793,7 +796,7 @@ class TDTestCase: tdSql.execute('insert into norm_tb values(now, 2)') self.create_tsma('tsma_db2_norm_t', 'db2', 'norm_tb', ['avg(c2)', 'last(ts)'], '10m') sql = 'select avg(c1) from test.meters union select avg(c2) from norm_tb' - self.check([TSMAQCBuilder().with_sql(sql).should_query_with_tsma('tsma2').should_query_with_tsma_ctb('e2d730bfc1242321c58c9ab7590ac060_norm_tb').get_qc()]) + self.check([TSMAQCBuilder().with_sql(sql).should_query_with_tsma('tsma2').should_query_with_tsma_ctb('db2', 'tsma_db2_norm_t', 'norm_tb').get_qc()]) tdSql.execute('drop database db2') tdSql.execute('use test') @@ -844,8 +847,19 @@ class TDTestCase: tdSql.execute(f'drop tsma {name}') - def test_long_tb_name(self): - pass + def test_long_ctb_name(self): + tb_name = self.generate_random_string(192) + tsma_name = self.generate_random_string(178) + tdSql.execute('create database db2') + tdSql.execute('use db2') + tdSql.execute(f'create table {tb_name}(ts timestamp, c2 int)') + tdSql.execute(f'insert into {tb_name} values(now, 1)') + tdSql.execute(f'insert into {tb_name} values(now, 2)') + self.create_tsma(tsma_name, 'db2', tb_name, ['avg(c2)', 'last(ts)'], '10m') + sql = f'select avg(c2), last(ts) from {tb_name}' + self.check([TSMAQCBuilder().with_sql(sql).should_query_with_tsma_ctb('db2', tsma_name, tb_name).get_qc()]) + tdSql.execute('drop database db2') + tdSql.execute('use test') def test_skip_tsma_hint(self): ctxs = [] @@ -858,12 +872,10 @@ class TDTestCase: def test_query_child_table(self): sql = 'select avg(c1) from t1' - ctx = TSMAQCBuilder().with_sql(sql).should_query_with_tsma( - 'e8945e7385834f8c22705546d4016539_t1', UsedTsma.TS_MIN, UsedTsma.TS_MAX, child_tb=True).get_qc() + ctx = TSMAQCBuilder().with_sql(sql).should_query_with_tsma_ctb('test', 'tsma2', 't1', UsedTsma.TS_MIN, UsedTsma.TS_MAX).get_qc() self.tsma_tester.check_sql(sql, ctx) sql = 'select avg(c1) from t3' - ctx = TSMAQCBuilder().with_sql(sql).should_query_with_tsma( - 'e8945e7385834f8c22705546d4016539_t3', child_tb=True).get_qc() + ctx = TSMAQCBuilder().with_sql(sql).should_query_with_tsma_ctb('test', 'tsma2', 't3').get_qc() self.tsma_tester.check_sql(sql, ctx) def test_recursive_tsma(self): @@ -1104,6 +1116,9 @@ class TDTestCase: self.test_ddl() self.test_query_with_tsma() + def test_ins_tsma(self): + pass + def test_create_tsma(self): function_name = sys._getframe().f_code.co_name tdLog.debug(f'-----{function_name}------')