rewrite tbname for partition by tbname

This commit is contained in:
wangjiaming0909 2024-01-08 11:01:26 +08:00
parent 02e36d37fd
commit a438a77e78
3 changed files with 75 additions and 18 deletions

View File

@ -1448,7 +1448,7 @@ static void mndCreateTSMABuildCreateStreamReq(SCreateTSMACxt *pCxt) {
pCxt->pCreateStreamReq->fillNullCols = NULL; pCxt->pCreateStreamReq->fillNullCols = NULL;
pCxt->pCreateStreamReq->igUpdate = 0; pCxt->pCreateStreamReq->igUpdate = 0;
// TODO what's this tiemstamp? // TODO what's this tiemstamp?
pCxt->pCreateStreamReq->lastTs = 1685959190000; pCxt->pCreateStreamReq->lastTs = 1704442278000;
pCxt->pCreateStreamReq->ast = strdup(pCxt->pCreateSmaReq->ast); pCxt->pCreateStreamReq->ast = strdup(pCxt->pCreateSmaReq->ast);
pCxt->pCreateStreamReq->sql = strdup(pCxt->pCreateSmaReq->sql); pCxt->pCreateStreamReq->sql = strdup(pCxt->pCreateSmaReq->sql);

View File

@ -6158,14 +6158,69 @@ static int32_t tsmaOptRewriteTag(const STSMAOptCtx* pTsmaOptCtx, const STSMAOptU
return 0; return 0;
} }
static int32_t tsmaOptRewriteTbname(const STSMAOptCtx* pTsmaOptCtx, SNode** pTbNameNode, bool withTsma) { static int32_t tsmaOptRewriteTbname(const STSMAOptCtx* pTsmaOptCtx, SNode** pTbNameNode,
const STSMAOptUsefulTsma* pTsma) {
int32_t code = 0; int32_t code = 0;
if (withTsma) { SFunctionNode* pRewrittenFunc = (SFunctionNode*)nodesMakeNode(QUERY_NODE_FUNCTION);
// TODO test child tbname too long SValueNode* pValue = (SValueNode*)nodesMakeNode(QUERY_NODE_VALUE);
// if with tsma, we replace func tbname with concat(tbname, '') if (!pRewrittenFunc || !pValue) code = TSDB_CODE_OUT_OF_MEMORY;
} else { if (code == TSDB_CODE_SUCCESS) {
// if no tsma, we replace func tbname with concat('dbname_tsmaname', tbname) pRewrittenFunc->node.resType = ((SExprNode*)(*pTbNameNode))->resType;
pValue->translate = true;
} }
if (pTsma && code == TSDB_CODE_SUCCESS) {
// TODO test child tbname too long
// if with tsma, we replace func tbname with substr(tbname, 34)
pRewrittenFunc->funcId = fmGetFuncId("substr");
snprintf(pRewrittenFunc->functionName, TSDB_FUNC_NAME_LEN, "substr(tbname, 34)");
pValue->node.resType.type = TSDB_DATA_TYPE_INT;
pValue->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_INT].bytes;
pValue->literal = taosMemoryCalloc(1, 16);
pValue->datum.i = 34;
if (!pValue->literal) code = TSDB_CODE_OUT_OF_MEMORY;
if (code == TSDB_CODE_SUCCESS) {
sprintf(pValue->literal, "%d", 34);
code = nodesListMakeAppend(&pRewrittenFunc->pParameterList, *pTbNameNode);
}
if (code == TSDB_CODE_SUCCESS) {
code = nodesListAppend(pRewrittenFunc->pParameterList, (SNode*)pValue);
}
} else if (code == TSDB_CODE_SUCCESS){
// if no tsma, we replace func tbname with concat('', tbname)
pRewrittenFunc->funcId = fmGetFuncId("concat");
snprintf(pRewrittenFunc->functionName, TSDB_FUNC_NAME_LEN, "concat('', tbname)");
pValue->node.resType = ((SExprNode*)(*pTbNameNode))->resType;
pValue->literal = taosMemoryCalloc(1, TSDB_TABLE_FNAME_LEN + 1);
pValue->datum.p = taosMemoryCalloc(1, TSDB_TABLE_FNAME_LEN + 1 + VARSTR_HEADER_SIZE);
if (!pValue->literal || !pValue->datum.p) code = TSDB_CODE_OUT_OF_MEMORY;
if (0 && code == TSDB_CODE_SUCCESS) {
sprintf(pValue->literal, "%s.%s", pTsma->pTsma->dbFName, pTsma->pTsma->name);
int32_t len = taosCreateMD5Hash(pValue->literal, strlen(pValue->literal));
pValue->literal[len] = '_';
strcpy(pValue->datum.p, pValue->literal);
pValue->node.resType.bytes = len + 1;
varDataSetLen(pValue->datum.p, pValue->node.resType.bytes);
strncpy(pValue->datum.p, pValue->literal, pValue->node.resType.bytes);
pRewrittenFunc->node.resType.bytes += pValue->node.resType.bytes;
}
if (code == TSDB_CODE_SUCCESS) {
code = nodesListMakeAppend(&pRewrittenFunc->pParameterList, (SNode*)pValue);
}
if (code == TSDB_CODE_SUCCESS) {
code = nodesListStrictAppend(pRewrittenFunc->pParameterList, *pTbNameNode);
}
}
if (code == TSDB_CODE_SUCCESS) {
*pTbNameNode = (SNode*)pRewrittenFunc;
} else {
nodesDestroyNode((SNode*)pRewrittenFunc);
nodesDestroyNode((SNode*)pValue);
}
return code; return code;
} }
@ -6183,11 +6238,11 @@ EDealRes tsmaOptRewriter(SNode** ppNode, void* ctx) {
struct TsmaOptRewriteCtx* pCtx = ctx; struct TsmaOptRewriteCtx* pCtx = ctx;
if (pCtx->rewriteTag && nodeType(pNode) == QUERY_NODE_COLUMN && ((SColumnNode*)pNode)->colType == COLUMN_TYPE_TAG) { if (pCtx->rewriteTag && nodeType(pNode) == QUERY_NODE_COLUMN && ((SColumnNode*)pNode)->colType == COLUMN_TYPE_TAG) {
code = tsmaOptRewriteTag(pCtx->pTsmaOptCtx, pCtx->pTsma, (SColumnNode*)pNode); code = tsmaOptRewriteTag(pCtx->pTsmaOptCtx, pCtx->pTsma, (SColumnNode*)pNode);
} else if (pCtx->rewriteTbname && nodeType(pNode) == QUERY_NODE_FUNCTION){ } else if (pCtx->rewriteTbname &&
SFunctionNode* pFunc = (SFunctionNode*)pNode; ((nodeType(pNode) == QUERY_NODE_FUNCTION && ((SFunctionNode*)pNode)->funcType == FUNCTION_TYPE_TBNAME) ||
if (pFunc->funcType == FUNCTION_TYPE_TBNAME) { (nodeType(pNode) == QUERY_NODE_COLUMN && ((SColumnNode*)pNode)->colType == COLUMN_TYPE_TBNAME))) {
code = tsmaOptRewriteTbname(pCtx->pTsmaOptCtx, ppNode, pCtx->pTsma); code = tsmaOptRewriteTbname(pCtx->pTsmaOptCtx, ppNode, pCtx->pTsma);
} if (code == TSDB_CODE_SUCCESS) return DEAL_RES_IGNORE_CHILD;
} }
if (code) { if (code) {
pCtx->code = code; pCtx->code = code;
@ -6326,11 +6381,13 @@ static int32_t tsmaOptRevisePlan(STSMAOptCtx* pTsmaOptCtx, SLogicNode* pParent,
SFunctionNode* pStateFunc = (SFunctionNode*)pStateFuncNode; SFunctionNode* pStateFunc = (SFunctionNode*)pStateFuncNode;
SFunctionNode* pAggFunc = (SFunctionNode*)pAggFuncNode; SFunctionNode* pAggFunc = (SFunctionNode*)pAggFuncNode;
if (fmIsGroupKeyFunc(pAggFunc->funcId)) { if (fmIsGroupKeyFunc(pAggFunc->funcId)) {
ASSERT(pAggFunc->pParameterList->length == 1); struct TsmaOptRewriteCtx ctx = {
SNode* pParamNode = pAggFunc->pParameterList->pHead->pNode; .pTsmaOptCtx = pTsmaOptCtx, .pTsma = pTsma, .rewriteTag = true, .rewriteTbname = true, .code = 0};
if (nodeType(pParamNode) == QUERY_NODE_COLUMN) { nodesRewriteExpr(&pAggFuncNode, tsmaOptRewriter, &ctx);
SColumnNode* pTagCol = (SColumnNode*)pParamNode; if (ctx.code) {
if (pTagCol->colType == COLUMN_TYPE_TAG) tsmaOptRewriteTag(pTsmaOptCtx, pTsma, pTagCol); code = ctx.code;
} else {
REPLACE_LIST2_NODE(pAggFuncNode);
} }
continue; continue;
} else if (fmIsPseudoColumnFunc(pStateFunc->funcId)) { } else if (fmIsPseudoColumnFunc(pStateFunc->funcId)) {

View File

@ -958,7 +958,7 @@ static int32_t stbSplSplitWindowForPartTable(SSplitContext* pCxt, SStableSplitIn
static int32_t stbSplSplitWindowNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) { static int32_t stbSplSplitWindowNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
if (isPartTableWinodw((SWindowLogicNode*)pInfo->pSplitNode) && if (isPartTableWinodw((SWindowLogicNode*)pInfo->pSplitNode) &&
(pInfo->pSubplan->pChildren && LIST_LENGTH(pInfo->pSubplan->pChildren) > 0)) { (pInfo->pSubplan->pChildren && LIST_LENGTH(pInfo->pSubplan->pChildren) == 0)) {
return stbSplSplitWindowForPartTable(pCxt, pInfo); return stbSplSplitWindowForPartTable(pCxt, pInfo);
} else { } else {
return stbSplSplitWindowForCrossTable(pCxt, pInfo); return stbSplSplitWindowForCrossTable(pCxt, pInfo);