merge with 3.0

This commit is contained in:
wangjiaming0909 2024-04-15 13:02:40 +08:00
parent a55d5c5e14
commit 1978a06e6e
4 changed files with 47 additions and 15 deletions

View File

@ -299,6 +299,7 @@ int32_t tsS3UploadDelaySec = 60;
bool tsExperimental = true;
int32_t tsMaxTsmaNum = 8;
int32_t tsMaxTsmaCalcDelay = 600;
#ifndef _STORAGE
int32_t taosSetTfsCfg(SConfig *pCfg) {

View File

@ -68,13 +68,14 @@ typedef struct SCreateTSMACxt {
const SMDropSmaReq * pDropSmaReq;
};
SDbObj *pDb;
SStbObj * pSrcStb;
SSmaObj * pSma;
const SSmaObj * pBaseSma;
SStbObj *pSrcStb;
SSmaObj *pSma;
const SSmaObj *pBaseSma;
SCMCreateStreamReq *pCreateStreamReq;
SMDropStreamReq * pDropStreamReq;
const char * streamName;
const char * targetStbFullName;
SMDropStreamReq *pDropStreamReq;
const char *streamName;
const char *targetStbFullName;
SNodeList *pProjects;
} SCreateTSMACxt;
int32_t mndInitSma(SMnode *pMnode) {
@ -1477,6 +1478,17 @@ static void mndCreateTSMABuildCreateStreamReq(SCreateTSMACxt *pCxt) {
f.type = TSDB_DATA_TYPE_BINARY;
tstrncpy(f.name, "tbname", strlen("tbname") + 1);
taosArrayPush(pCxt->pCreateStreamReq->pTags, &f);
// construct output cols
SNode* pNode;
FOREACH(pNode, pCxt->pProjects) {
SExprNode* pExprNode = (SExprNode*)pNode;
f.bytes = pExprNode->resType.bytes;
f.type = pExprNode->resType.type;
f.flags = COL_SMA_ON;
strcpy(f.name, pExprNode->aliasName);
taosArrayPush(pCxt->pCreateStreamReq->pCols, &f);
}
}
static void mndCreateTSMABuildDropStreamReq(SCreateTSMACxt* pCxt) {
@ -1584,11 +1596,28 @@ static int32_t mndCreateTSMA(SCreateTSMACxt *pCxt) {
pCxt->pSma = &sma;
initSMAObj(pCxt);
SNodeList* pProjects = NULL;
terrno = nodesStringToList(pCxt->pCreateSmaReq->expr, &pProjects);
if (TSDB_CODE_SUCCESS != terrno) {
code = -1;
goto _OVER;
}
pCxt->pProjects = pProjects;
pCxt->pCreateStreamReq = &createStreamReq;
if (pCxt->pCreateSmaReq->pVgroupVerList) {
pCxt->pCreateStreamReq->pVgroupVerList = taosArrayDup(pCxt->pCreateSmaReq->pVgroupVerList, NULL);
if (!pCxt->pCreateStreamReq->pVgroupVerList) {
errno = TSDB_CODE_OUT_OF_MEMORY;
terrno = TSDB_CODE_OUT_OF_MEMORY;
code = -1;
goto _OVER;
}
}
if (LIST_LENGTH(pProjects) > 0) {
createStreamReq.pCols = taosArrayInit(LIST_LENGTH(pProjects), sizeof(SField));
if (!createStreamReq.pCols) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
code = -1;
goto _OVER;
}
@ -1610,6 +1639,8 @@ _OVER:
tFreeSCMCreateStreamReq(pCxt->pCreateStreamReq);
if (pCxt->pDropStreamReq) tFreeMDropStreamReq(pCxt->pDropStreamReq);
pCxt->pCreateStreamReq = NULL;
if (pProjects) nodesDestroyList(pProjects);
pCxt->pProjects = NULL;
return code;
}

View File

@ -5037,12 +5037,11 @@ int32_t tsdbReaderReset2(STsdbReader* pReader, SQueryTableDataCond* pCond) {
bool asc = ASCENDING_TRAVERSE(pReader->info.order);
int32_t step = asc ? 1 : -1;
int64_t ts = 0;
if (asc) {
ts = (pReader->info.window.skey > INT64_MIN)? pReader->info.window.skey-1:pReader->info.window.skey;
ts = (pReader->info.window.skey > INT64_MIN) ? pReader->info.window.skey - 1 : pReader->info.window.skey;
} else {
ts = (pReader->info.window.ekey < INT64_MAX)? pReader->info.window.ekey + 1:pReader->info.window.ekey;
ts = (pReader->info.window.ekey < INT64_MAX) ? pReader->info.window.ekey + 1 : pReader->info.window.ekey;
}
resetAllDataBlockScanInfo(pStatus->pTableMap, ts, step);

View File

@ -1899,8 +1899,9 @@ static int32_t translateFirstLastState(SFunctionNode* pFunc, char* pErrBuf, int3
SNode* pPara = nodesListGetNode(pFunc->pParameterList, 0);
int32_t paraBytes = getSDataTypeFromNode(pPara)->bytes;
int32_t pkBytes = (pFunc->hasPk) ? pFunc->pkBytes : 0;
pFunc->node.resType =
(SDataType){.bytes = getFirstLastInfoSize(paraBytes) + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY};
(SDataType){.bytes = getFirstLastInfoSize(paraBytes, pkBytes) + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY};
return TSDB_CODE_SUCCESS;
}
@ -4042,7 +4043,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.name = "_first_state",
.type = FUNCTION_TYPE_FIRST_STATE,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC |
FUNC_MGT_KEEP_ORDER_FUNC | FUNC_MGT_FORBID_SYSTABLE_FUNC | FUNC_MGT_TSMA_FUNC,
FUNC_MGT_KEEP_ORDER_FUNC | FUNC_MGT_FORBID_SYSTABLE_FUNC | FUNC_MGT_TSMA_FUNC | FUNC_MGT_PRIMARY_KEY_FUNC,
.translateFunc = translateFirstLastState,
.getEnvFunc = getFirstLastFuncEnv,
.initFunc = functionSetup,
@ -4055,7 +4056,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.name = "_first_state_merge",
.type = FUNCTION_TYPE_FIRST_STATE_MERGE,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC |
FUNC_MGT_KEEP_ORDER_FUNC | FUNC_MGT_FORBID_SYSTABLE_FUNC | FUNC_MGT_TSMA_FUNC,
FUNC_MGT_KEEP_ORDER_FUNC | FUNC_MGT_FORBID_SYSTABLE_FUNC | FUNC_MGT_TSMA_FUNC | FUNC_MGT_PRIMARY_KEY_FUNC,
.translateFunc = translateFirstLastStateMerge,
.getEnvFunc = getFirstLastFuncEnv,
.initFunc = functionSetup,
@ -4066,7 +4067,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.name = "_last_state",
.type = FUNCTION_TYPE_LAST_STATE,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC |
FUNC_MGT_KEEP_ORDER_FUNC | FUNC_MGT_FORBID_SYSTABLE_FUNC | FUNC_MGT_TSMA_FUNC,
FUNC_MGT_KEEP_ORDER_FUNC | FUNC_MGT_FORBID_SYSTABLE_FUNC | FUNC_MGT_TSMA_FUNC | FUNC_MGT_PRIMARY_KEY_FUNC,
.translateFunc = translateFirstLastState,
.getEnvFunc = getFirstLastFuncEnv,
.initFunc = functionSetup,
@ -4079,7 +4080,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.name = "_last_state_merge",
.type = FUNCTION_TYPE_LAST_STATE_MERGE,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC |
FUNC_MGT_KEEP_ORDER_FUNC | FUNC_MGT_FORBID_SYSTABLE_FUNC | FUNC_MGT_TSMA_FUNC,
FUNC_MGT_KEEP_ORDER_FUNC | FUNC_MGT_FORBID_SYSTABLE_FUNC | FUNC_MGT_TSMA_FUNC | FUNC_MGT_PRIMARY_KEY_FUNC,
.translateFunc = translateFirstLastStateMerge,
.getEnvFunc = getFirstLastFuncEnv,
.initFunc = functionSetup,