support long ctb names

This commit is contained in:
wangjiaming0909 2024-03-26 11:08:16 +08:00
parent 6ce0616ffd
commit f6200dd923
10 changed files with 204 additions and 165 deletions

View File

@ -415,7 +415,7 @@ typedef struct STUidTagInfo {
int32_t taosGenCrashJsonMsg(int signum, char **pMsg, int64_t clusterId, int64_t startTime);
#define TSMA_RES_STB_POSTFIX "_tsma_res_stb_"
#define TSMA_RES_CTB_PREFIX_LEN 33 // md5 output(32) and _
#define MD5_OUTPUT_LEN 32
static inline bool isTsmaResSTb(const char* stbName) {
const char* pos = strstr(stbName, TSMA_RES_STB_POSTFIX);

View File

@ -88,6 +88,7 @@ typedef enum EFunctionType {
FUNCTION_TYPE_LTRIM,
FUNCTION_TYPE_RTRIM,
FUNCTION_TYPE_SUBSTR,
FUNCTION_TYPE_MD5,
// conversion function
FUNCTION_TYPE_CAST = 2000,

View File

@ -72,6 +72,7 @@ int32_t upperFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOut
int32_t ltrimFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
int32_t rtrimFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
int32_t substrFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
int32_t md5Function(SScalarParam* pInput, int32_t inputNum, SScalarParam* pOutput);
/* Conversion functions */
int32_t castFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);

View File

@ -70,7 +70,7 @@ typedef struct SCreateTSMACxt {
const SDbObj * pDb;
SStbObj * pSrcStb;
SSmaObj * pSma;
const SSmaObj * pRecursiveSma;
const SSmaObj * pBaseSma;
SCMCreateStreamReq *pCreateStreamReq;
SMDropStreamReq * pDropStreamReq;
const char * streamName;
@ -1396,7 +1396,7 @@ static void initSMAObj(SCreateTSMACxt* pCxt) {
memcpy(pCxt->pSma->name, pCxt->pCreateSmaReq->name, TSDB_TABLE_FNAME_LEN);
memcpy(pCxt->pSma->stb, pCxt->pCreateSmaReq->stb, TSDB_TABLE_FNAME_LEN);
memcpy(pCxt->pSma->db, pCxt->pDb->name, TSDB_DB_FNAME_LEN);
if (pCxt->pRecursiveSma) memcpy(pCxt->pSma->baseSmaName, pCxt->pRecursiveSma->name, TSDB_TABLE_FNAME_LEN);
if (pCxt->pBaseSma) memcpy(pCxt->pSma->baseSmaName, pCxt->pBaseSma->name, TSDB_TABLE_FNAME_LEN);
pCxt->pSma->createdTime = taosGetTimestampMs();
pCxt->pSma->uid = mndGenerateUid(pCxt->pCreateSmaReq->name, TSDB_TABLE_FNAME_LEN);
@ -1448,7 +1448,7 @@ static void mndCreateTSMABuildCreateStreamReq(SCreateTSMACxt *pCxt) {
pCxt->pCreateStreamReq->fillHistory = STREAM_FILL_HISTORY_ON;
pCxt->pCreateStreamReq->maxDelay = 10000;
pCxt->pCreateStreamReq->watermark = 0;
pCxt->pCreateStreamReq->numOfTags = pCxt->pSrcStb ? pCxt->pSrcStb->numOfTags : 0;
pCxt->pCreateStreamReq->numOfTags = pCxt->pSrcStb ? pCxt->pSrcStb->numOfTags + 1 : 1;
pCxt->pCreateStreamReq->checkpointFreq = 0;
pCxt->pCreateStreamReq->createStb = 1;
pCxt->pCreateStreamReq->targetStbUid = 0;
@ -1460,10 +1460,10 @@ static void mndCreateTSMABuildCreateStreamReq(SCreateTSMACxt *pCxt) {
pCxt->pCreateStreamReq->sql = strdup(pCxt->pCreateSmaReq->sql);
// construct tags
pCxt->pCreateStreamReq->pTags = taosArrayInit(pCxt->pCreateStreamReq->numOfTags, sizeof(SField));
SField f = {0};
if (pCxt->pSrcStb) {
pCxt->pCreateStreamReq->pTags = taosArrayInit(pCxt->pSrcStb->numOfTags, sizeof(SField));
for (int32_t idx = 0; idx < pCxt->pSrcStb->numOfTags; ++idx) {
SField f = {0};
SSchema *pSchema = &pCxt->pSrcStb->pTags[idx];
f.bytes = pSchema->bytes;
f.type = pSchema->type;
@ -1472,6 +1472,11 @@ static void mndCreateTSMABuildCreateStreamReq(SCreateTSMACxt *pCxt) {
taosArrayPush(pCxt->pCreateStreamReq->pTags, &f);
}
}
f.bytes = TSDB_TABLE_FNAME_LEN - 1 + VARSTR_HEADER_SIZE;
f.flags = COL_SMA_ON;
f.type = TSDB_DATA_TYPE_BINARY;
tstrncpy(f.name, "tbname", strlen("tbname") + 1);
taosArrayPush(pCxt->pCreateStreamReq->pTags, &f);
}
static void mndCreateTSMABuildDropStreamReq(SCreateTSMACxt* pCxt) {
@ -1604,7 +1609,7 @@ static int32_t mndProcessCreateTSMAReq(SRpcMsg* pReq) {
SDbObj * pDb = NULL;
SStbObj * pStb = NULL;
SSmaObj * pSma = NULL;
SSmaObj * pRecursiveTsma = NULL;
SSmaObj * pBaseTsma = NULL;
SStreamObj * pStream = NULL;
int64_t mTraceId = TRACE_GET_ROOTID(&pReq->info.traceId);
SMCreateSmaReq createReq = {0};
@ -1673,8 +1678,8 @@ static int32_t mndProcessCreateTSMAReq(SRpcMsg* pReq) {
}
if (createReq.recursiveTsma) {
pRecursiveTsma = sdbAcquire(pMnode->pSdb, SDB_SMA, createReq.baseTsmaName);
if (!pRecursiveTsma) {
pBaseTsma = sdbAcquire(pMnode->pSdb, SDB_SMA, createReq.baseTsmaName);
if (!pBaseTsma) {
mError("base tsma: %s not found when creating recursive tsma", createReq.baseTsmaName);
terrno = TSDB_CODE_MND_SMA_NOT_EXIST;
goto _OVER;
@ -1690,7 +1695,7 @@ static int32_t mndProcessCreateTSMAReq(SRpcMsg* pReq) {
.pDb = pDb,
.pRpcReq = pReq,
.pSma = NULL,
.pRecursiveSma = pRecursiveTsma,
.pBaseSma = pBaseTsma,
.pSrcStb = pStb,
};
@ -1703,7 +1708,7 @@ _OVER:
}
if (pStb) mndReleaseStb(pMnode, pStb);
if (pRecursiveTsma) mndReleaseSma(pMnode, pRecursiveTsma);
if (pBaseTsma) mndReleaseSma(pMnode, pBaseTsma);
mndReleaseSma(pMnode, pSma);
mndReleaseStream(pMnode, pStream);
mndReleaseDb(pMnode, pDb);
@ -1871,8 +1876,9 @@ static int32_t mndRetrieveTSMA(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlo
while (numOfRows < rows) {
pIter->pSmaIter = sdbFetch(pMnode->pSdb, SDB_SMA, pIter->pSmaIter, (void **)&pSma);
if (pIter->pSmaIter == NULL) break;
SDbObj* pSrcDb = mndAcquireDb(pMnode, pSma->db);
if (pDb && pSma->dbUid != pDb->uid) {
if ((pDb && pSma->dbUid != pDb->uid) || !pSrcDb) {
sdbRelease(pMnode->pSdb, pSma);
continue;
}
@ -1915,7 +1921,6 @@ static int32_t mndRetrieveTSMA(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlo
// interval
char interval[64 + VARSTR_HEADER_SIZE] = {0};
SDbObj* pSrcDb = mndAcquireDb(pMnode, pSma->db);
int32_t len = snprintf(interval + VARSTR_HEADER_SIZE, 64, "%" PRId64 "%c", pSma->interval,
getPrecisionUnit(pSrcDb->cfg.precision));
varDataSetLen(interval, len);

View File

@ -2506,6 +2506,20 @@ static int32_t translateTableCountPseudoColumn(SFunctionNode* pFunc, char* pErrB
return TSDB_CODE_SUCCESS;
}
static int32_t translateMd5(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
if (1 != LIST_LENGTH(pFunc->pParameterList)) {
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
}
uint8_t para1Type = getSDataTypeFromNode(nodesListGetNode(pFunc->pParameterList, 0))->type;
if (para1Type != TSDB_DATA_TYPE_VARCHAR) {
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
}
pFunc->node.resType = (SDataType){.bytes = MD5_OUTPUT_LEN, .type = TSDB_DATA_TYPE_VARCHAR};
return TSDB_CODE_SUCCESS;
}
// clang-format off
const SBuiltinFuncDefinition funcMgtBuiltins[] = {
{
@ -4094,6 +4108,16 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.processFunc = hllFunctionMerge,
.finalizeFunc = hllPartialFinalize,
},
{
.name = "md5",
.type = FUNCTION_TYPE_MD5,
.classification = FUNC_MGT_SCALAR_FUNC,
.translateFunc = translateMd5,
.getEnvFunc = NULL,
.initFunc = NULL,
.sprocessFunc = md5Function,
.finalizeFunc = NULL
},
};
// clang-format on

View File

@ -3669,15 +3669,15 @@ static int32_t setTableTsmas(STranslateContext* pCxt, SName* pName, SRealTableNo
taosArrayDestroyP(pRealTable->tsmaTargetTbVgInfo, taosMemoryFree);
pRealTable->tsmaTargetTbVgInfo = NULL;
}
char buf[TSDB_TABLE_FNAME_LEN];
char buf[TSDB_TABLE_FNAME_LEN + TSDB_TABLE_NAME_LEN + 1];
for (int32_t i = 0; i < pRealTable->pTsmas->size; ++i) {
STableTSMAInfo* pTsma = taosArrayGetP(pRealTable->pTsmas, i);
SName tsmaTargetTbName = {0};
toName(pCxt->pParseCxt->acctId, pRealTable->table.dbName, "", &tsmaTargetTbName);
int32_t len = snprintf(buf, TSDB_TABLE_FNAME_LEN, "%s.%s", pTsma->dbFName, pTsma->name);
int32_t len = snprintf(buf, TSDB_TABLE_FNAME_LEN + TSDB_TABLE_NAME_LEN, "%s.%s_%s", pTsma->dbFName, pTsma->name,
pRealTable->table.tableName);
len = taosCreateMD5Hash(buf, len);
len = sprintf(buf + len, "_%s", pRealTable->table.tableName);
strncpy(tsmaTargetTbName.tname, buf, TSDB_TABLE_NAME_LEN);
strncpy(tsmaTargetTbName.tname, buf, strlen(buf));
collectUseTable(&tsmaTargetTbName, pCxt->pTargetTables);
SVgroupInfo vgInfo = {0};
bool exists = false;
@ -5697,15 +5697,14 @@ static int32_t setEqualTbnameTableVgroups(STranslateContext* pCxt, SSelectStmt*
for (int32_t k = 0; k < pInfo->aTbnames->size; ++k) {
const char* pTbName = taosArrayGetP(pInfo->aTbnames, k);
char* pNewTbName = taosMemoryCalloc(1, TSMA_RES_CTB_PREFIX_LEN + strlen(pTbName) + 1);
char* pNewTbName = taosMemoryCalloc(1, TSDB_TABLE_FNAME_LEN + TSDB_TABLE_NAME_LEN + 1);
if (!pNewTbName) {
code = TSDB_CODE_OUT_OF_MEMORY;
break;
}
taosArrayPush(pTbNames, &pNewTbName);
sprintf(pNewTbName, "%s.%s", pTsma->dbFName, pTsma->name);
sprintf(pNewTbName, "%s.%s_%s", pTsma->dbFName, pTsma->name, pTbName);
int32_t len = taosCreateMD5Hash(pNewTbName, strlen(pNewTbName));
sprintf(pNewTbName + len, "_%s", pTbName);
}
if (TSDB_CODE_SUCCESS == code) {
vgsInfo = taosMemoryMalloc(sizeof(SVgroupsInfo) + nTbls * sizeof(SVgroupInfo));
@ -7946,21 +7945,10 @@ static int32_t doTranslateDropSuperTable(STranslateContext* pCxt, const SName* p
return code;
}
static int32_t doTranslateDropCtbsWithTsma(STranslateContext* pCxt, SDropTableStmt* pStmt) {
SNode* pNode;
// note that there could have normal tables
return TSDB_CODE_SUCCESS;
}
static int32_t translateDropTable(STranslateContext* pCxt, SDropTableStmt* pStmt) {
SDropTableClause* pClause = (SDropTableClause*)nodesListGetNode(pStmt->pTables, 0);
SName tableName;
if (pStmt->withTsma) {
return doTranslateDropCtbsWithTsma(pCxt, pStmt);
}
if (pStmt->withTsma) return TSDB_CODE_SUCCESS;
return doTranslateDropSuperTable(
pCxt, toName(pCxt->pParseCxt->acctId, pClause->dbName, pClause->tableName, &tableName), pClause->ignoreNotExists);
}
@ -10556,68 +10544,44 @@ static bool sortColWithColId(SNode* pNode1, SNode* pNode2) {
return pCol1->colId < pCol2->colId;
}
static int32_t buildTSMAAstMakeConcatFuncNode(SCreateTSMAStmt* pStmt, SMCreateSmaReq* pReq, const SNode* pTbNameFunc,
SFunctionNode** pConcatFuncOut) {
static int32_t buildTSMAAstStreamSubTable(SCreateTSMAStmt* pStmt, SMCreateSmaReq* pReq, const SNode* pTbname, SNode** pSubTable) {
int32_t code = 0;
SFunctionNode* pSubstrFunc = NULL;
SNode* pRes = NULL;
SValueNode* pTsmaNameHashVNode = NULL;
SFunctionNode* pMd5Func = (SFunctionNode*)nodesMakeNode(QUERY_NODE_FUNCTION);
SFunctionNode* pConcatFunc = (SFunctionNode*)nodesMakeNode(QUERY_NODE_FUNCTION);
SValueNode* pVal = (SValueNode*)nodesMakeNode(QUERY_NODE_VALUE);
if (!pMd5Func || !pConcatFunc || !pVal) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _end;
}
sprintf(pMd5Func->functionName, "%s", "md5");
sprintf(pConcatFunc->functionName, "%s", "concat");
pVal->literal = taosMemoryMalloc(TSDB_TABLE_FNAME_LEN + 1);
if (!pVal->literal) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _end;
}
sprintf(pVal->literal, "%s_", pReq->name);
pVal->node.resType.type = TSDB_DATA_TYPE_VARCHAR;
pVal->node.resType.bytes = strlen(pVal->literal);
code = nodesListMakeAppend(&pConcatFunc->pParameterList, (SNode*)pVal);
if (code != TSDB_CODE_SUCCESS) goto _end;
pVal = NULL;
if (!pConcatFunc) code = TSDB_CODE_OUT_OF_MEMORY;
// not recursive tsma, md5(concat('1.test.tsma1_', tbname))
// recursive tsma, md5(concat('1.test.tsma1_', `tbname`)), `tbname` is the last tag
code = nodesListStrictAppend(pConcatFunc->pParameterList, nodesCloneNode(pTbname));
if (code != TSDB_CODE_SUCCESS) goto _end;
if (code == TSDB_CODE_SUCCESS) {
snprintf(pConcatFunc->functionName, TSDB_FUNC_NAME_LEN, "concat");
code = nodesListMakeStrictAppend(&pConcatFunc->pParameterList, nodesMakeNode(QUERY_NODE_VALUE));
}
code = nodesListMakeAppend(&pMd5Func->pParameterList, (SNode*)pConcatFunc);
if (code != TSDB_CODE_SUCCESS) goto _end;
pConcatFunc = NULL;
*pSubTable = (SNode*)pMd5Func;
if (TSDB_CODE_SUCCESS == code) {
pTsmaNameHashVNode = (SValueNode*)nodesListGetNode(pConcatFunc->pParameterList, 0);
pTsmaNameHashVNode->literal = taosMemoryCalloc(1, TSDB_TABLE_FNAME_LEN + 1);
if (!pTsmaNameHashVNode->literal) code = TSDB_CODE_OUT_OF_MEMORY;
}
if (TSDB_CODE_SUCCESS == code) {
sprintf(pTsmaNameHashVNode->literal, "%s", pReq->name);
int32_t len = taosCreateMD5Hash(pTsmaNameHashVNode->literal, strlen(pTsmaNameHashVNode->literal));
ASSERT(len == TSMA_RES_CTB_PREFIX_LEN - 1);
sprintf(pTsmaNameHashVNode->literal + len, "_");
pTsmaNameHashVNode->node.resType.type = TSDB_DATA_TYPE_VARCHAR;
pTsmaNameHashVNode->node.resType.bytes = strlen(pTsmaNameHashVNode->literal);
}
if (TSDB_CODE_SUCCESS == code && pStmt->pOptions->recursiveTsma) {
pSubstrFunc = (SFunctionNode*)nodesMakeNode(QUERY_NODE_FUNCTION);
if (!pSubstrFunc) code = TSDB_CODE_OUT_OF_MEMORY;
if (TSDB_CODE_SUCCESS == code) {
snprintf(pSubstrFunc->functionName, TSDB_FUNC_NAME_LEN, "substr");
code = nodesListMakeStrictAppend(&pSubstrFunc->pParameterList, nodesCloneNode(pTbNameFunc));
}
if (TSDB_CODE_SUCCESS == code) {
code = nodesListMakeStrictAppend(&pSubstrFunc->pParameterList, nodesMakeNode(QUERY_NODE_VALUE));
if (TSDB_CODE_SUCCESS == code) {
SValueNode* pV = (SValueNode*)pSubstrFunc->pParameterList->pTail->pNode;
pV->literal = taosMemoryCalloc(1, 64);
if (!pV->literal) code = TSDB_CODE_OUT_OF_MEMORY;
sprintf(pV->literal, "%d", TSMA_RES_CTB_PREFIX_LEN + 1);
pV->isDuration = false;
pV->translate = false;
pV->node.resType.type = TSDB_DATA_TYPE_INT;
pV->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_INT].bytes;
}
}
}
if (TSDB_CODE_SUCCESS == code) {
if (pSubstrFunc) {
code = nodesListAppend(pConcatFunc->pParameterList, (SNode*)pSubstrFunc);
} else {
code = nodesListStrictAppend(pConcatFunc->pParameterList, nodesCloneNode(pTbNameFunc));
}
}
if (TSDB_CODE_SUCCESS == code) {
*pConcatFuncOut = pConcatFunc;
} else {
nodesDestroyNode((SNode*)pSubstrFunc);
nodesDestroyNode((SNode*)pConcatFunc);
_end:
if (code) {
if (pMd5Func) nodesDestroyNode((SNode*)pMd5Func);
if (pConcatFunc) nodesDestroyNode((SNode*)pConcatFunc);
if (pVal) nodesDestroyNode((SNode*)pVal);
}
return code;
}
@ -10633,33 +10597,38 @@ static int32_t buildTSMAAst(STranslateContext* pCxt, SCreateTSMAStmt* pStmt, SMC
info.pInterval = nodesCloneNode(pStmt->pOptions->pInterval);
if (!info.pFuncs || !info.pInterval) code = TSDB_CODE_OUT_OF_MEMORY;
SFunctionNode* pTbnameFunc = NULL;
if (TSDB_CODE_SUCCESS == code) {
// append partition by tbname
SFunctionNode* pTbnameFunc = (SFunctionNode*)createTbnameFunction();
pTbnameFunc = (SFunctionNode*)createTbnameFunction();
if (pTbnameFunc) {
sprintf(pTbnameFunc->node.userAlias, "tbname");
nodesListMakeAppend(&info.pPartitionByList, (SNode*)pTbnameFunc);
code = nodesListMakeStrictAppend(&info.pPartitionByList, (SNode*)pTbnameFunc);
} else {
code = TSDB_CODE_OUT_OF_MEMORY;
}
if (code == TSDB_CODE_SUCCESS) {
SFunctionNode* pConcatFunc = NULL;
code = buildTSMAAstMakeConcatFuncNode(pStmt, pReq, (const SNode*)pTbnameFunc, &pConcatFunc);
if (code == TSDB_CODE_SUCCESS) {
info.pSubTable = (SNode*)pConcatFunc;
}
}
}
if (TSDB_CODE_SUCCESS == code) {
// append partition by tags
SNode* pTagCol = NULL;
for (int32_t idx = 0; idx < numOfTags; ++idx) {
SNode* pTagCol = createColumnNodeWithName(pTags[idx].name);
pTagCol = createColumnNodeWithName(pTags[idx].name);
if (!pTagCol) {
code = TSDB_CODE_OUT_OF_MEMORY;
break;
}
nodesListAppend(info.pPartitionByList, pTagCol);
nodesListMakeAppend(&info.pTags, nodesCloneNode(pTagCol));
code = nodesListMakeStrictAppend(&info.pTags, nodesCloneNode(pTagCol));
}
// sub table
if (code == TSDB_CODE_SUCCESS) {
SFunctionNode* pSubTable = NULL;
code = buildTSMAAstStreamSubTable(pStmt, pReq, pStmt->pOptions->recursiveTsma ? pTagCol : (SNode*)pTbnameFunc, (SNode**)&pSubTable);
if (code == TSDB_CODE_SUCCESS) {
info.pSubTable = (SNode*)pSubTable;
}
code = nodesListMakeStrictAppend(&info.pTags, nodesCloneNode((SNode*)pTbnameFunc));
}
}
@ -12405,7 +12374,7 @@ static int32_t rewriteDropTable(STranslateContext* pCxt, SQuery* pQuery) {
pStmt->withTsma = pTsmas && pTsmas->size > 0;
}
pClause->pTsmas = pTsmas;
if (tableType == TSDB_NORMAL_TABLE && pStmt->withTsma) {
if (tableType == TSDB_NORMAL_TABLE && pTsmas && pTsmas->size > 0) {
taosHashCleanup(pVgroupHashmap);
return TSDB_CODE_TSMA_MUST_BE_DROPPED;
}
@ -12453,6 +12422,13 @@ static int32_t rewriteDropTable(STranslateContext* pCxt, SQuery* pQuery) {
static int32_t buildUpdateTagValReq(STranslateContext* pCxt, SAlterTableStmt* pStmt, STableMeta* pTableMeta,
SVAlterTbReq* pReq) {
SName tbName = {0};
SArray* pTsmas = NULL;
toName(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tableName, &tbName);
int32_t code = getTableTsmasFromCache(pCxt->pMetaCache, &tbName, &pTsmas);
if (code != TSDB_CODE_SUCCESS) return code;
if (pTsmas && pTsmas->size > 0) return TSDB_CODE_TSMA_MUST_BE_DROPPED;
SSchema* pSchema = getTagSchema(pTableMeta, pStmt->colName);
if (NULL == pSchema) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_ALTER_TABLE, "Invalid tag name: %s",
@ -12469,8 +12445,6 @@ static int32_t buildUpdateTagValReq(STranslateContext* pCxt, SAlterTableStmt* pS
pReq->colId = pSchema->colId;
pReq->tagType = pSchema->type;
int32_t code = 0;
STag* pTag = NULL;
SToken token;
char tokenBuf[TSDB_MAX_TAGS_LEN];

View File

@ -6175,57 +6175,46 @@ static int32_t tsmaOptRewriteTag(const STSMAOptCtx* pTsmaOptCtx, const STSMAOptU
static int32_t tsmaOptRewriteTbname(const STSMAOptCtx* pTsmaOptCtx, SNode** pTbNameNode,
const STSMAOptUsefulTsma* pTsma) {
int32_t code = 0;
SFunctionNode* pRewrittenFunc = (SFunctionNode*)nodesMakeNode(QUERY_NODE_FUNCTION);
SValueNode* pValue = (SValueNode*)nodesMakeNode(QUERY_NODE_VALUE);
if (!pRewrittenFunc || !pValue) code = TSDB_CODE_OUT_OF_MEMORY;
int32_t code = 0;
SExprNode* pRewrittenFunc = (SExprNode*)nodesMakeNode(pTsma ? QUERY_NODE_COLUMN : QUERY_NODE_FUNCTION);
SValueNode* pValue = NULL;
if (!pRewrittenFunc) code = TSDB_CODE_OUT_OF_MEMORY;
if (code == TSDB_CODE_SUCCESS) {
pRewrittenFunc->node.resType = ((SExprNode*)(*pTbNameNode))->resType;
pValue->translate = true;
pRewrittenFunc->resType = ((SExprNode*)(*pTbNameNode))->resType;
}
if (pTsma && code == TSDB_CODE_SUCCESS) {
// TODO tsma test child tbname too long
// if with tsma, we replace func tbname with substr(tbname, TSMA_RES_CTB_PREFIX_LEN)
pRewrittenFunc->funcId = fmGetFuncId("substr");
snprintf(pRewrittenFunc->functionName, TSDB_FUNC_NAME_LEN, "substr");
pValue->node.resType.type = TSDB_DATA_TYPE_INT;
pValue->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_INT].bytes;
pValue->literal = taosMemoryCalloc(1, 16);
pValue->datum.i = TSMA_RES_CTB_PREFIX_LEN + 1;
if (!pValue->literal) code = TSDB_CODE_OUT_OF_MEMORY;
if (code == TSDB_CODE_SUCCESS) {
sprintf(pValue->literal, "%d", TSMA_RES_CTB_PREFIX_LEN + 1);
code = nodesListMakeAppend(&pRewrittenFunc->pParameterList, *pTbNameNode);
}
if (code == TSDB_CODE_SUCCESS) {
code = nodesListAppend(pRewrittenFunc->pParameterList, (SNode*)pValue);
}
nodesDestroyNode(*pTbNameNode);
SColumnNode* pCol = (SColumnNode*)pRewrittenFunc;
const SSchema* pSchema = taosArrayGet(pTsma->pTsma->pTags, pTsma->pTsma->pTags->size - 1);
strcpy(pCol->tableName, pTsma->targetTbName);
strcpy(pCol->tableAlias, pTsma->targetTbName);
pCol->tableId = pTsma->targetTbUid;
pCol->tableType = TSDB_SUPER_TABLE;
pCol->colId = pSchema->colId;
pCol->colType = COLUMN_TYPE_TAG;
} else if (code == TSDB_CODE_SUCCESS) {
// if no tsma, we replace func tbname with concat('', tbname)
pRewrittenFunc->funcId = fmGetFuncId("concat");
snprintf(pRewrittenFunc->functionName, TSDB_FUNC_NAME_LEN, "concat");
SFunctionNode* pFunc = (SFunctionNode*)pRewrittenFunc;
pFunc->funcId = fmGetFuncId("concat");
snprintf(pFunc->functionName, TSDB_FUNC_NAME_LEN, "concat");
pValue = (SValueNode*)nodesMakeNode(QUERY_NODE_VALUE);
if (!pValue) code = TSDB_CODE_OUT_OF_MEMORY;
pValue->node.resType = ((SExprNode*)(*pTbNameNode))->resType;
pValue->literal = taosMemoryCalloc(1, TSDB_TABLE_FNAME_LEN + 1);
pValue->datum.p = taosMemoryCalloc(1, TSDB_TABLE_FNAME_LEN + 1 + VARSTR_HEADER_SIZE);
if (!pValue->literal || !pValue->datum.p) code = TSDB_CODE_OUT_OF_MEMORY;
if (code == TSDB_CODE_SUCCESS) {
pValue->translate = true;
pValue->node.resType = ((SExprNode*)(*pTbNameNode))->resType;
pValue->literal = taosMemoryCalloc(1, TSDB_TABLE_FNAME_LEN + 1);
pValue->datum.p = taosMemoryCalloc(1, TSDB_TABLE_FNAME_LEN + 1 + VARSTR_HEADER_SIZE);
if (!pValue->literal || !pValue->datum.p) code = TSDB_CODE_OUT_OF_MEMORY;
}
if (0 && code == TSDB_CODE_SUCCESS) {
sprintf(pValue->literal, "%s.%s", pTsma->pTsma->dbFName, pTsma->pTsma->name);
int32_t len = taosCreateMD5Hash(pValue->literal, strlen(pValue->literal));
pValue->literal[len] = '_';
strcpy(pValue->datum.p, pValue->literal);
pValue->node.resType.bytes = len + 1;
varDataSetLen(pValue->datum.p, pValue->node.resType.bytes);
strncpy(pValue->datum.p, pValue->literal, pValue->node.resType.bytes);
pRewrittenFunc->node.resType.bytes += pValue->node.resType.bytes;
if (code == TSDB_CODE_SUCCESS) {
code = nodesListMakeStrictAppend(&pFunc->pParameterList, (SNode*)pValue);
pValue = NULL;
}
if (code == TSDB_CODE_SUCCESS) {
code = nodesListMakeAppend(&pRewrittenFunc->pParameterList, (SNode*)pValue);
}
if (code == TSDB_CODE_SUCCESS) {
code = nodesListStrictAppend(pRewrittenFunc->pParameterList, *pTbNameNode);
code = nodesListStrictAppend(pFunc->pParameterList, *pTbNameNode);
}
}
@ -6233,7 +6222,7 @@ static int32_t tsmaOptRewriteTbname(const STSMAOptCtx* pTsmaOptCtx, SNode** pTbN
*pTbNameNode = (SNode*)pRewrittenFunc;
} else {
nodesDestroyNode((SNode*)pRewrittenFunc);
nodesDestroyNode((SNode*)pValue);
if (pValue) nodesDestroyNode((SNode*)pValue);
}
return code;
@ -6327,7 +6316,7 @@ static int32_t tsmaOptRewriteScan(STSMAOptCtx* pTsmaOptCtx, SScanLogicNode* pNew
strcpy(pNewScan->tableName.tname, pTsma->targetTbName);
}
if (code == TSDB_CODE_SUCCESS) {
code = tsmaOptRewriteNodeList(pNewScan->pScanPseudoCols, pTsmaOptCtx, pTsma, false, true);
code = tsmaOptRewriteNodeList(pNewScan->pScanPseudoCols, pTsmaOptCtx, pTsma, true, true);
}
if (code == TSDB_CODE_SUCCESS) {
code = tsmaOptRewriteNode(&pNewScan->pTagCond, pTsmaOptCtx, pTsma, true, true);

View File

@ -697,6 +697,36 @@ int32_t substrFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOu
return TSDB_CODE_SUCCESS;
}
int32_t md5Function(SScalarParam* pInput, int32_t inputNum, SScalarParam* pOutput) {
SColumnInfoData *pInputData = pInput->columnData;
SColumnInfoData *pOutputData = pOutput->columnData;
int32_t bufLen = TMAX(MD5_OUTPUT_LEN + VARSTR_HEADER_SIZE + 1, pInputData->info.bytes);
char* pOutputBuf = taosMemoryMalloc(bufLen);
if (!pOutputBuf) {
qError("md5 function alloc memory failed");
return TSDB_CODE_OUT_OF_MEMORY;
}
for (int32_t i = 0; i < pInput->numOfRows; ++i) {
if (colDataIsNull_s(pInputData, i)) {
colDataSetNULL(pOutputData, i);
continue;
}
char *input = colDataGetData(pInput[0].columnData, i);
if (bufLen < varDataLen(input) + VARSTR_HEADER_SIZE) {
bufLen = varDataLen(input) + VARSTR_HEADER_SIZE;
pOutputBuf = taosMemoryRealloc(pOutputBuf, bufLen);
}
char *output = pOutputBuf;
memcpy(varDataVal(output), varDataVal(input), varDataLen(input));
int32_t len = taosCreateMD5Hash(varDataVal(output), varDataLen(input));
varDataSetLen(output, len);
colDataSetVal(pOutputData, i, output, false);
}
pOutput->numOfRows = pInput->numOfRows;
taosMemoryFree(pOutputBuf);
return TSDB_CODE_SUCCESS;
}
/** Conversion functions **/
int32_t castFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
int16_t inputType = GET_PARAM_TYPE(&pInput[0]);

View File

@ -97,7 +97,7 @@ endi
if $data42 != 3 then
return -1
endi
if $data52 != 30 then
if $data52 != 31 then
return -1
endi
if $data62 != 5 then

View File

@ -9,7 +9,7 @@ from util.dnodes import *
from util.common import *
# from tmqCommon import *
ROUND = 1000
ROUND = 2000
class TSMA:
def __init__(self):
@ -53,10 +53,7 @@ class UsedTsma:
def setIsTsma(self):
self.is_tsma_ = self.name.endswith(self.TSMA_RES_STB_POSTFIX)
if not self.is_tsma_:
pos = self.name.find('_') # for tsma output child table
if pos == 32:
self.is_tsma_ = True
self.is_tsma_ = len(self.name) == 32 # for tsma output child table
class TSMAQueryContext:
def __init__(self) -> None:
@ -110,6 +107,11 @@ class TSMAQCBuilder:
res = tdSql.queryResult[0][0]
return res.timestamp() * 1000
def md5(self, buf: str) -> str:
tdSql.query(f'select md5("{buf}")')
res = tdSql.queryResult[0][0]
return res
def should_query_with_table(self, tb_name: str, ts_begin: str = UsedTsma.TS_MIN, ts_end: str = UsedTsma.TS_MAX) -> 'TSMAQCBuilder':
used_tsma: UsedTsma = UsedTsma()
used_tsma.name = tb_name
@ -118,10 +120,11 @@ class TSMAQCBuilder:
used_tsma.is_tsma_ = False
self.qc_.used_tsmas.append(used_tsma)
return self
def should_query_with_tsma_ctb(self, tb_name: str, ts_begin: str = UsedTsma.TS_MIN, ts_end: str = UsedTsma.TS_MAX) -> 'TSMAQCBuilder':
def should_query_with_tsma_ctb(self, db_name: str, tsma_name: str, ctb_name: str, ts_begin: str = UsedTsma.TS_MIN, ts_end: str = UsedTsma.TS_MAX) -> 'TSMAQCBuilder':
used_tsma: UsedTsma = UsedTsma()
used_tsma.name = tb_name
name = f'1.{db_name}.{tsma_name}_{ctb_name}'
used_tsma.name = self.md5(name)
used_tsma.time_range_start = self.to_timestamp(ts_begin)
used_tsma.time_range_end = self.to_timestamp(ts_end)
used_tsma.is_tsma_ = True
@ -711,7 +714,7 @@ class TDTestCase:
if ctx.has_tsma():
if ctx.used_tsmas[0].name == tsma_name + UsedTsma.TSMA_RES_STB_POSTFIX:
break
elif ctx.used_tsmas[0].name.find('_') == 32 and ctx.used_tsmas[0].name[33:] == tb:
elif len(ctx.used_tsmas[0].name) == 32 and 1: ## select md5
break
else:
time.sleep(1)
@ -768,14 +771,14 @@ class TDTestCase:
self.test_query_child_table()
self.test_skip_tsma_hint()
self.test_long_tsma_name()
self.test_long_tb_name()
self.test_long_ctb_name()
self.test_add_tag_col()
self.test_modify_col_name_value()
def test_union(self):
ctxs = []
sql = 'select avg(c1) from meters union select avg(c1) from norm_tb'
ctx = TSMAQCBuilder().with_sql(sql).should_query_with_tsma('tsma2').should_query_with_tsma_ctb('d2f2c89f2b3378a2a48b4cadf9c3f927_norm_tb').get_qc()
ctx = TSMAQCBuilder().with_sql(sql).should_query_with_tsma('tsma2').should_query_with_tsma_ctb('test', 'tsma5', 'norm_tb').get_qc()
ctxs.append(ctx)
sql = 'select avg(c1), avg(c2) from meters where ts between "2018-09-17 09:00:00.000" and "2018-09-17 10:00:00.000" union select avg(c1), avg(c2) from meters where ts between "2018-09-17 09:00:00.200" and "2018-09-17 10:23:19.800"'
ctxs.append(TSMAQCBuilder().with_sql(sql)
@ -793,7 +796,7 @@ class TDTestCase:
tdSql.execute('insert into norm_tb values(now, 2)')
self.create_tsma('tsma_db2_norm_t', 'db2', 'norm_tb', ['avg(c2)', 'last(ts)'], '10m')
sql = 'select avg(c1) from test.meters union select avg(c2) from norm_tb'
self.check([TSMAQCBuilder().with_sql(sql).should_query_with_tsma('tsma2').should_query_with_tsma_ctb('e2d730bfc1242321c58c9ab7590ac060_norm_tb').get_qc()])
self.check([TSMAQCBuilder().with_sql(sql).should_query_with_tsma('tsma2').should_query_with_tsma_ctb('db2', 'tsma_db2_norm_t', 'norm_tb').get_qc()])
tdSql.execute('drop database db2')
tdSql.execute('use test')
@ -844,8 +847,19 @@ class TDTestCase:
tdSql.execute(f'drop tsma {name}')
def test_long_tb_name(self):
pass
def test_long_ctb_name(self):
tb_name = self.generate_random_string(192)
tsma_name = self.generate_random_string(178)
tdSql.execute('create database db2')
tdSql.execute('use db2')
tdSql.execute(f'create table {tb_name}(ts timestamp, c2 int)')
tdSql.execute(f'insert into {tb_name} values(now, 1)')
tdSql.execute(f'insert into {tb_name} values(now, 2)')
self.create_tsma(tsma_name, 'db2', tb_name, ['avg(c2)', 'last(ts)'], '10m')
sql = f'select avg(c2), last(ts) from {tb_name}'
self.check([TSMAQCBuilder().with_sql(sql).should_query_with_tsma_ctb('db2', tsma_name, tb_name).get_qc()])
tdSql.execute('drop database db2')
tdSql.execute('use test')
def test_skip_tsma_hint(self):
ctxs = []
@ -858,12 +872,10 @@ class TDTestCase:
def test_query_child_table(self):
sql = 'select avg(c1) from t1'
ctx = TSMAQCBuilder().with_sql(sql).should_query_with_tsma(
'e8945e7385834f8c22705546d4016539_t1', UsedTsma.TS_MIN, UsedTsma.TS_MAX, child_tb=True).get_qc()
ctx = TSMAQCBuilder().with_sql(sql).should_query_with_tsma_ctb('test', 'tsma2', 't1', UsedTsma.TS_MIN, UsedTsma.TS_MAX).get_qc()
self.tsma_tester.check_sql(sql, ctx)
sql = 'select avg(c1) from t3'
ctx = TSMAQCBuilder().with_sql(sql).should_query_with_tsma(
'e8945e7385834f8c22705546d4016539_t3', child_tb=True).get_qc()
ctx = TSMAQCBuilder().with_sql(sql).should_query_with_tsma_ctb('test', 'tsma2', 't3').get_qc()
self.tsma_tester.check_sql(sql, ctx)
def test_recursive_tsma(self):
@ -1104,6 +1116,9 @@ class TDTestCase:
self.test_ddl()
self.test_query_with_tsma()
def test_ins_tsma(self):
pass
def test_create_tsma(self):
function_name = sys._getframe().f_code.co_name
tdLog.debug(f'-----{function_name}------')