This commit is contained in:
wangjiaming0909 2023-11-24 17:16:59 +08:00
parent 6324d1293a
commit 08fc9170f5
2 changed files with 39 additions and 33 deletions

View File

@ -1376,29 +1376,27 @@ static void mndCancelRetrieveIdx(SMnode *pMnode, void *pIter) {
taosMemoryFree(p);
}
static void initSMAObj(SSmaObj *pSma, const SMCreateSmaReq *pCreateReq, const SStbObj *pStb, const SDbObj *pDb) {
memcpy(pSma->name, pCreateReq->name, TSDB_TABLE_FNAME_LEN);
memcpy(pSma->stb, pStb->name, TSDB_TABLE_FNAME_LEN);
memcpy(pSma->db, pDb->name, TSDB_DB_FNAME_LEN);
pSma->createdTime = taosGetTimestampMs();
pSma->uid = mndGenerateUid(pCreateReq->name, TSDB_TABLE_FNAME_LEN);
static void initSMAObj(SCreateTSMACxt* pCxt) {
memcpy(pCxt->pSma->name, pCxt->pCreateSmaReq->name, TSDB_TABLE_FNAME_LEN);
memcpy(pCxt->pSma->stb, pCxt->pSrcStb->name, TSDB_TABLE_FNAME_LEN);
memcpy(pCxt->pSma->db, pCxt->pDb->name, TSDB_DB_FNAME_LEN);
pCxt->pSma->createdTime = taosGetTimestampMs();
pCxt->pSma->uid = mndGenerateUid(pCxt->pCreateSmaReq->name, TSDB_TABLE_FNAME_LEN);
char resultTbName[TSDB_TABLE_FNAME_LEN + 16] = {0};
snprintf(resultTbName, TSDB_TABLE_FNAME_LEN + 16, "%s_td_tsma_rst_tb", pCreateReq->name);
memcpy(pSma->dstTbName, resultTbName, TSDB_TABLE_FNAME_LEN);
pSma->dstTbUid = mndGenerateUid(pSma->dstTbName, TSDB_TABLE_FNAME_LEN);
pSma->stbUid = pStb->uid;
pSma->dbUid = pDb->uid;
pSma->interval = pCreateReq->interval;
pSma->intervalUnit = pCreateReq->intervalUnit;
pSma->timezone = tsTimezone;
memcpy(pCxt->pSma->dstTbName, pCxt->targetStbFullName, TSDB_TABLE_FNAME_LEN);
pCxt->pSma->dstTbUid = mndGenerateUid(pCxt->pSma->dstTbName, TSDB_TABLE_FNAME_LEN);
pCxt->pSma->stbUid = pCxt->pSrcStb->uid;
pCxt->pSma->dbUid = pCxt->pDb->uid;
pCxt->pSma->interval = pCxt->pCreateSmaReq->interval;
pCxt->pSma->intervalUnit = pCxt->pCreateSmaReq->intervalUnit;
pCxt->pSma->timezone = tsTimezone;
pSma->exprLen = pCreateReq->exprLen;
pSma->sqlLen = pCreateReq->sqlLen;
pSma->astLen = pCreateReq->astLen;
pSma->expr = pCreateReq->expr;
pSma->sql = pCreateReq->sql;
pSma->ast = pCreateReq->ast;
pCxt->pSma->exprLen = pCxt->pCreateSmaReq->exprLen;
pCxt->pSma->sqlLen = pCxt->pCreateSmaReq->sqlLen;
pCxt->pSma->astLen = pCxt->pCreateSmaReq->astLen;
pCxt->pSma->expr = pCxt->pCreateSmaReq->expr;
pCxt->pSma->sql = pCxt->pCreateSmaReq->sql;
pCxt->pSma->ast = pCxt->pCreateSmaReq->ast;
}
static void initStreamObj(SStreamObj *pStream, const char *streamName, const SMCreateSmaReq *pCreateReq,
@ -1540,7 +1538,7 @@ static int32_t mndCreateTSMA(SCreateTSMACxt *pCxt) {
SMDropStreamReq dropStreamReq = {0};
pCxt->pSma = &sma;
initSMAObj(&sma, pCxt->pCreateSmaReq, pCxt->pSrcStb, pCxt->pDb);
initSMAObj(pCxt);
pCxt->pCreateStreamReq = &createStreamReq;
pCxt->pDropStreamReq = &dropStreamReq;
mndCreateTSMABuildCreateStreamReq(pCxt);
@ -1560,6 +1558,13 @@ _OVER:
return code;
}
static void mndTSMAGenerateOutputName(const char* tsmaName, char* streamName, char* targetStbName) {
SName smaName;
tNameFromString(&smaName, tsmaName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
sprintf(streamName, "%d.%s", smaName.acctId, smaName.tname);
snprintf(targetStbName, TSDB_TABLE_FNAME_LEN, "%s_tsma_res_stb", tsmaName);
}
static int32_t mndProcessCreateTSMAReq(SRpcMsg* pReq) {
#ifdef WINDOWS
terrno = TSDB_CODE_MND_INVALID_PLATFORM;
@ -1592,10 +1597,7 @@ static int32_t mndProcessCreateTSMAReq(SRpcMsg* pReq) {
char streamName[TSDB_TABLE_FNAME_LEN] = {0};
char streamTargetStbFullName[TSDB_TABLE_FNAME_LEN] = {0};
SName smaName;
tNameFromString(&smaName, createReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
sprintf(streamName, "%d.%s", smaName.acctId, smaName.tname);
snprintf(streamTargetStbFullName, TSDB_TABLE_FNAME_LEN, "%s.tsma_result_stb", createReq.name);
mndTSMAGenerateOutputName(createReq.name, streamName, streamTargetStbFullName);
SStbObj *pTargetStb = mndAcquireStb(pMnode, streamTargetStbFullName);
if (pTargetStb) {
@ -1740,10 +1742,7 @@ static int32_t mndProcessDropTSMAReq(SRpcMsg* pReq) {
char streamName[TSDB_TABLE_FNAME_LEN] = {0};
char streamTargetStbFullName[TSDB_TABLE_FNAME_LEN] = {0};
SName smaName;
tNameFromString(&smaName, dropReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
sprintf(streamName, "%d.%s", smaName.acctId, smaName.tname);
snprintf(streamTargetStbFullName, TSDB_TABLE_FNAME_LEN, "%s.tsma_result_stb", dropReq.name);
mndTSMAGenerateOutputName(dropReq.name, streamName, streamTargetStbFullName);
SStbObj* pStb = mndAcquireStb(pMnode, streamTargetStbFullName);

View File

@ -7456,6 +7456,7 @@ static int32_t addWstartToSampleProjects(SNodeList* pProjectionList) {
return TSDB_CODE_OUT_OF_MEMORY;
}
strcpy(pFunc->functionName, "_wstart");
strcpy(pFunc->node.userAlias, "_wstart");
return nodesListPushFront(pProjectionList, (SNode*)pFunc);
}
@ -7465,6 +7466,7 @@ static int32_t addWendToSampleProjects(SNodeList* pProjectionList) {
return TSDB_CODE_OUT_OF_MEMORY;
}
strcpy(pFunc->functionName, "_wend");
strcpy(pFunc->node.userAlias, "_wend");
return nodesListAppend(pProjectionList, (SNode*)pFunc);
}
@ -7474,6 +7476,7 @@ static int32_t addWdurationToSampleProjects(SNodeList* pProjectionList) {
return TSDB_CODE_OUT_OF_MEMORY;
}
strcpy(pFunc->functionName, "_wduration");
strcpy(pFunc->node.userAlias, "_wduration");
return nodesListAppend(pProjectionList, (SNode*)pFunc);
}
@ -10377,9 +10380,10 @@ static int32_t buildTSMAAst(STranslateContext* pCxt, SCreateTSMAStmt* pStmt, SMC
if (TSDB_CODE_SUCCESS == code) {
// append partition by tbname
SNode* pTbnameFunc = createTbnameFunction();
SFunctionNode* pTbnameFunc = (SFunctionNode*)createTbnameFunction();
if (pTbnameFunc) {
nodesListMakeAppend(&info.pPartitionByList, pTbnameFunc);
sprintf(pTbnameFunc->node.userAlias, "tbname");
nodesListMakeAppend(&info.pPartitionByList, (SNode*)pTbnameFunc);
} else {
code = TSDB_CODE_OUT_OF_MEMORY;
}
@ -10405,7 +10409,7 @@ static int32_t buildTSMAAst(STranslateContext* pCxt, SCreateTSMAStmt* pStmt, SMC
return code;
}
static char* defaultTSMAFuncs[4] = {"MAX", "MIN", "SUM", "COUNT"};
static char* defaultTSMAFuncs[4] = {"max", "min", "sum", "count"};
static int32_t
translateTSMAFuncs(STranslateContext * pCxt, SCreateTSMAStmt* pStmt, STableMeta* pTableMeta) {
@ -10456,6 +10460,9 @@ translateTSMAFuncs(STranslateContext * pCxt, SCreateTSMAStmt* pStmt, STableMeta*
nodesDestroyList(pTSMAFuncs);
return TSDB_CODE_OUT_OF_MEMORY;
}
// TODO what if exceeds the max size
snprintf(pFunc->node.userAlias, TSDB_COL_NAME_LEN, "%s(%s)", pFunc->functionName,
((SColumnNode*)pNode2)->colName);
}
}
nodesDestroyList(pStmt->pOptions->pFuncs);