diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 272db1fab5..05a8a54c11 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -299,6 +299,7 @@ int32_t tsS3UploadDelaySec = 60; bool tsExperimental = true; int32_t tsMaxTsmaNum = 8; +int32_t tsMaxTsmaCalcDelay = 600; #ifndef _STORAGE int32_t taosSetTfsCfg(SConfig *pCfg) { diff --git a/source/dnode/mnode/impl/src/mndSma.c b/source/dnode/mnode/impl/src/mndSma.c index 5aca786e4f..62e4f1c5a6 100644 --- a/source/dnode/mnode/impl/src/mndSma.c +++ b/source/dnode/mnode/impl/src/mndSma.c @@ -68,13 +68,14 @@ typedef struct SCreateTSMACxt { const SMDropSmaReq * pDropSmaReq; }; SDbObj *pDb; - SStbObj * pSrcStb; - SSmaObj * pSma; - const SSmaObj * pBaseSma; + SStbObj *pSrcStb; + SSmaObj *pSma; + const SSmaObj *pBaseSma; SCMCreateStreamReq *pCreateStreamReq; - SMDropStreamReq * pDropStreamReq; - const char * streamName; - const char * targetStbFullName; + SMDropStreamReq *pDropStreamReq; + const char *streamName; + const char *targetStbFullName; + SNodeList *pProjects; } SCreateTSMACxt; int32_t mndInitSma(SMnode *pMnode) { @@ -1477,6 +1478,17 @@ static void mndCreateTSMABuildCreateStreamReq(SCreateTSMACxt *pCxt) { f.type = TSDB_DATA_TYPE_BINARY; tstrncpy(f.name, "tbname", strlen("tbname") + 1); taosArrayPush(pCxt->pCreateStreamReq->pTags, &f); + + // construct output cols + SNode* pNode; + FOREACH(pNode, pCxt->pProjects) { + SExprNode* pExprNode = (SExprNode*)pNode; + f.bytes = pExprNode->resType.bytes; + f.type = pExprNode->resType.type; + f.flags = COL_SMA_ON; + strcpy(f.name, pExprNode->aliasName); + taosArrayPush(pCxt->pCreateStreamReq->pCols, &f); + } } static void mndCreateTSMABuildDropStreamReq(SCreateTSMACxt* pCxt) { @@ -1584,11 +1596,28 @@ static int32_t mndCreateTSMA(SCreateTSMACxt *pCxt) { pCxt->pSma = &sma; initSMAObj(pCxt); + + SNodeList* pProjects = NULL; + terrno = nodesStringToList(pCxt->pCreateSmaReq->expr, &pProjects); + if (TSDB_CODE_SUCCESS != terrno) { + code = -1; + goto _OVER; + } + pCxt->pProjects = pProjects; + pCxt->pCreateStreamReq = &createStreamReq; if (pCxt->pCreateSmaReq->pVgroupVerList) { pCxt->pCreateStreamReq->pVgroupVerList = taosArrayDup(pCxt->pCreateSmaReq->pVgroupVerList, NULL); if (!pCxt->pCreateStreamReq->pVgroupVerList) { - errno = TSDB_CODE_OUT_OF_MEMORY; + terrno = TSDB_CODE_OUT_OF_MEMORY; + code = -1; + goto _OVER; + } + } + if (LIST_LENGTH(pProjects) > 0) { + createStreamReq.pCols = taosArrayInit(LIST_LENGTH(pProjects), sizeof(SField)); + if (!createStreamReq.pCols) { + terrno = TSDB_CODE_OUT_OF_MEMORY; code = -1; goto _OVER; } @@ -1610,6 +1639,8 @@ _OVER: tFreeSCMCreateStreamReq(pCxt->pCreateStreamReq); if (pCxt->pDropStreamReq) tFreeMDropStreamReq(pCxt->pDropStreamReq); pCxt->pCreateStreamReq = NULL; + if (pProjects) nodesDestroyList(pProjects); + pCxt->pProjects = NULL; return code; } diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index b6d171d203..76ec4da24c 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -5037,12 +5037,11 @@ int32_t tsdbReaderReset2(STsdbReader* pReader, SQueryTableDataCond* pCond) { bool asc = ASCENDING_TRAVERSE(pReader->info.order); int32_t step = asc ? 1 : -1; - int64_t ts = 0; if (asc) { - ts = (pReader->info.window.skey > INT64_MIN)? pReader->info.window.skey-1:pReader->info.window.skey; + ts = (pReader->info.window.skey > INT64_MIN) ? pReader->info.window.skey - 1 : pReader->info.window.skey; } else { - ts = (pReader->info.window.ekey < INT64_MAX)? pReader->info.window.ekey + 1:pReader->info.window.ekey; + ts = (pReader->info.window.ekey < INT64_MAX) ? pReader->info.window.ekey + 1 : pReader->info.window.ekey; } resetAllDataBlockScanInfo(pStatus->pTableMap, ts, step); diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index fc94d5c2ca..394eecd542 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -1899,8 +1899,9 @@ static int32_t translateFirstLastState(SFunctionNode* pFunc, char* pErrBuf, int3 SNode* pPara = nodesListGetNode(pFunc->pParameterList, 0); int32_t paraBytes = getSDataTypeFromNode(pPara)->bytes; + int32_t pkBytes = (pFunc->hasPk) ? pFunc->pkBytes : 0; pFunc->node.resType = - (SDataType){.bytes = getFirstLastInfoSize(paraBytes) + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}; + (SDataType){.bytes = getFirstLastInfoSize(paraBytes, pkBytes) + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}; return TSDB_CODE_SUCCESS; } @@ -4042,7 +4043,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .name = "_first_state", .type = FUNCTION_TYPE_FIRST_STATE, .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC | - FUNC_MGT_KEEP_ORDER_FUNC | FUNC_MGT_FORBID_SYSTABLE_FUNC | FUNC_MGT_TSMA_FUNC, + FUNC_MGT_KEEP_ORDER_FUNC | FUNC_MGT_FORBID_SYSTABLE_FUNC | FUNC_MGT_TSMA_FUNC | FUNC_MGT_PRIMARY_KEY_FUNC, .translateFunc = translateFirstLastState, .getEnvFunc = getFirstLastFuncEnv, .initFunc = functionSetup, @@ -4055,7 +4056,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .name = "_first_state_merge", .type = FUNCTION_TYPE_FIRST_STATE_MERGE, .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC | - FUNC_MGT_KEEP_ORDER_FUNC | FUNC_MGT_FORBID_SYSTABLE_FUNC | FUNC_MGT_TSMA_FUNC, + FUNC_MGT_KEEP_ORDER_FUNC | FUNC_MGT_FORBID_SYSTABLE_FUNC | FUNC_MGT_TSMA_FUNC | FUNC_MGT_PRIMARY_KEY_FUNC, .translateFunc = translateFirstLastStateMerge, .getEnvFunc = getFirstLastFuncEnv, .initFunc = functionSetup, @@ -4066,7 +4067,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .name = "_last_state", .type = FUNCTION_TYPE_LAST_STATE, .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC | - FUNC_MGT_KEEP_ORDER_FUNC | FUNC_MGT_FORBID_SYSTABLE_FUNC | FUNC_MGT_TSMA_FUNC, + FUNC_MGT_KEEP_ORDER_FUNC | FUNC_MGT_FORBID_SYSTABLE_FUNC | FUNC_MGT_TSMA_FUNC | FUNC_MGT_PRIMARY_KEY_FUNC, .translateFunc = translateFirstLastState, .getEnvFunc = getFirstLastFuncEnv, .initFunc = functionSetup, @@ -4079,7 +4080,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .name = "_last_state_merge", .type = FUNCTION_TYPE_LAST_STATE_MERGE, .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC | - FUNC_MGT_KEEP_ORDER_FUNC | FUNC_MGT_FORBID_SYSTABLE_FUNC | FUNC_MGT_TSMA_FUNC, + FUNC_MGT_KEEP_ORDER_FUNC | FUNC_MGT_FORBID_SYSTABLE_FUNC | FUNC_MGT_TSMA_FUNC | FUNC_MGT_PRIMARY_KEY_FUNC, .translateFunc = translateFirstLastStateMerge, .getEnvFunc = getFirstLastFuncEnv, .initFunc = functionSetup,