Merge branch '3.0' into feature/stream

This commit is contained in:
Liu Jicong 2022-06-17 16:38:47 +08:00
commit 7783a103bc
19 changed files with 156 additions and 56 deletions

View File

@ -228,6 +228,7 @@ typedef enum ENodeType {
QUERY_NODE_PHYSICAL_PLAN_FILL,
QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION,
QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION,
QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION,
QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION,
QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE,
QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE,

View File

@ -130,13 +130,17 @@ typedef struct SMergeLogicNode {
typedef enum EWindowType { WINDOW_TYPE_INTERVAL = 1, WINDOW_TYPE_SESSION, WINDOW_TYPE_STATE } EWindowType;
typedef enum EIntervalAlgorithm {
typedef enum EWindowAlgorithm {
INTERVAL_ALGO_HASH = 1,
INTERVAL_ALGO_MERGE,
INTERVAL_ALGO_STREAM_FINAL,
INTERVAL_ALGO_STREAM_SEMI,
INTERVAL_ALGO_STREAM_SINGLE,
} EIntervalAlgorithm;
SESSION_ALGO_STREAM_SEMI,
SESSION_ALGO_STREAM_FINAL,
SESSION_ALGO_STREAM_SINGLE,
SESSION_ALGO_MERGE,
} EWindowAlgorithm;
typedef struct SWindowLogicNode {
SLogicNode node;
@ -153,7 +157,7 @@ typedef struct SWindowLogicNode {
int8_t triggerType;
int64_t watermark;
double filesFactor;
EIntervalAlgorithm intervalAlgo;
EWindowAlgorithm windowAlgo;
} SWindowLogicNode;
typedef struct SFillLogicNode {
@ -371,6 +375,8 @@ typedef struct SSessionWinodwPhysiNode {
} SSessionWinodwPhysiNode;
typedef SSessionWinodwPhysiNode SStreamSessionWinodwPhysiNode;
typedef SSessionWinodwPhysiNode SStreamSemiSessionWinodwPhysiNode;
typedef SSessionWinodwPhysiNode SStreamFinalSessionWinodwPhysiNode;
typedef struct SStateWinodwPhysiNode {
SWinodwPhysiNode window;

View File

@ -761,11 +761,12 @@ static int32_t mndRetrieveDnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&pDnode->createdTime, false);
char b[tListLen(offlineReason) + VARSTR_HEADER_SIZE] = {0};
char *b = taosMemoryCalloc(VARSTR_HEADER_SIZE + strlen(offlineReason[pDnode->offlineReason]) + 1, 1);
STR_TO_VARSTR(b, online ? "" : offlineReason[pDnode->offlineReason]);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, b, false);
taosMemoryFreeClear(b);
numOfRows++;
sdbRelease(pSdb, pDnode);

View File

@ -845,8 +845,10 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SExprInfo*
SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SNode* pOnCondition, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResBlock, int64_t gap, int32_t tsSlotId, STimeWindowAggSupp* pTwAggSupp, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream,
SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream,
SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, int32_t numOfChild);
SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo);

View File

@ -4728,18 +4728,13 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
pOptr =
createSessionAggOperatorInfo(ops[0], pExprInfo, num, pResBlock, pSessionNode->gap, tsSlotId, &as, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION == type) {
SSessionWinodwPhysiNode* pSessionNode = (SSessionWinodwPhysiNode*)pPhyNode;
STimeWindowAggSupp as = {.waterMark = pSessionNode->window.watermark,
.calTrigger = pSessionNode->window.triggerType};
SExprInfo* pExprInfo = createExprInfo(pSessionNode->window.pFuncs, NULL, &num);
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
int32_t tsSlotId = ((SColumnNode*)pSessionNode->window.pTspk)->slotId;
pOptr = createStreamSessionAggOperatorInfo(ops[0], pExprInfo, num, pResBlock, pSessionNode->gap, tsSlotId, &as,
pTaskInfo);
pOptr = createStreamSessionAggOperatorInfo(ops[0], pPhyNode, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION == type) {
int32_t children = 0;
pOptr = createStreamFinalSessionAggOperatorInfo(ops[0], pPhyNode, pTaskInfo, children);
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION == type) {
int32_t children = 1;
pOptr = createStreamFinalSessionAggOperatorInfo(ops[0], pPhyNode, pTaskInfo, children);
} else if (QUERY_NODE_PHYSICAL_PLAN_PARTITION == type) {
pOptr = createPartitionOperatorInfo(ops[0], (SPartitionPhysiNode*)pPhyNode, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE == type) {

View File

@ -2425,12 +2425,15 @@ int32_t initSessionAggSupporter(SStreamAggSupporter* pSup, const char* pKey, Sql
return initStreamAggSupporter(pSup, pKey, pCtx, numOfOutput, sizeof(SResultWindowInfo));
}
SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResBlock, int64_t gap, int32_t tsSlotId,
STimeWindowAggSupp* pTwAggSupp, SExecTaskInfo* pTaskInfo) {
int32_t code = TSDB_CODE_OUT_OF_MEMORY;
SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo) {
SSessionWinodwPhysiNode* pSessionNode = (SSessionWinodwPhysiNode*)pPhyNode;
int32_t numOfCols = 0;
SExprInfo* pExprInfo = createExprInfo(pSessionNode->window.pFuncs, NULL, &numOfCols);
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
int32_t tsSlotId = ((SColumnNode*)pSessionNode->window.pTspk)->slotId;
int32_t code = TSDB_CODE_OUT_OF_MEMORY;
SStreamSessionAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamSessionAggOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
goto _error;
}
@ -2453,12 +2456,14 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SEx
}
initDummyFunction(pInfo->pDummyCtx, pInfo->binfo.pCtx, numOfCols);
pInfo->twAggSup = *pTwAggSupp;
pInfo->twAggSup = (STimeWindowAggSupp) {.waterMark = pSessionNode->window.watermark,
.calTrigger = pSessionNode->window.triggerType,
.maxTs = INT64_MIN};
initResultRowInfo(&pInfo->binfo.resultRowInfo, 8);
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
pInfo->primaryTsIndex = tsSlotId;
pInfo->gap = gap;
pInfo->gap = pSessionNode->gap;
pInfo->binfo.pRes = pResBlock;
pInfo->order = TSDB_ORDER_ASC;
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
@ -2960,25 +2965,20 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
return pBInfo->pRes->info.rows == 0 ? NULL : pBInfo->pRes;
}
SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo,
int32_t numOfCols, SSDataBlock* pResBlock, int64_t gap,
int32_t tsSlotId, STimeWindowAggSupp* pTwAggSupp,
SExecTaskInfo* pTaskInfo) {
int32_t code = TSDB_CODE_OUT_OF_MEMORY;
SStreamSessionAggOperatorInfo* pInfo = NULL;
SOperatorInfo* pOperator = createStreamSessionAggOperatorInfo(downstream, pExprInfo, numOfCols, pResBlock, gap,
tsSlotId, pTwAggSupp, pTaskInfo);
SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream,
SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, int32_t numOfChild) {
int32_t code = TSDB_CODE_OUT_OF_MEMORY;
SOperatorInfo* pOperator = createStreamSessionAggOperatorInfo(downstream, pPhyNode, pTaskInfo);
if (pOperator == NULL) {
goto _error;
}
pOperator->name = "StreamFinalSessionWindowAggOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION;
int32_t numOfChild = 1; // Todo(liuyao) get it from phy plan
pInfo = pOperator->info;
SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
pInfo->pChildren = taosArrayInit(8, sizeof(void*));
for (int32_t i = 0; i < numOfChild; i++) {
SOperatorInfo* pChild =
createStreamSessionAggOperatorInfo(NULL, pExprInfo, numOfCols, NULL, gap, tsSlotId, pTwAggSupp, pTaskInfo);
createStreamSessionAggOperatorInfo(NULL, pPhyNode, pTaskInfo);
if (pChild == NULL) {
goto _error;
}
@ -2988,7 +2988,7 @@ SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream
_error:
if (pInfo != NULL) {
destroyStreamSessionAggOperatorInfo(pInfo, numOfCols);
destroyStreamSessionAggOperatorInfo(pInfo, pOperator->numOfExprs);
}
taosMemoryFreeClear(pInfo);

View File

@ -427,7 +427,7 @@ static SNode* logicWindowCopy(const SWindowLogicNode* pSrc, SWindowLogicNode* pD
COPY_SCALAR_FIELD(triggerType);
COPY_SCALAR_FIELD(watermark);
COPY_SCALAR_FIELD(filesFactor);
COPY_SCALAR_FIELD(intervalAlgo);
COPY_SCALAR_FIELD(windowAlgo);
return (SNode*)pDst;
}
@ -538,6 +538,12 @@ static SNode* physiIntervalCopy(const SIntervalPhysiNode* pSrc, SIntervalPhysiNo
return (SNode*)pDst;
}
static SNode* physiSessionCopy(const SSessionWinodwPhysiNode* pSrc, SSessionWinodwPhysiNode* pDst) {
COPY_BASE_OBJECT_FIELD(window, physiWindowCopy);
COPY_SCALAR_FIELD(gap);
return (SNode*)pDst;
}
static SNode* dataBlockDescCopy(const SDataBlockDescNode* pSrc, SDataBlockDescNode* pDst) {
COPY_SCALAR_FIELD(dataBlockId);
CLONE_NODE_LIST_FIELD(pSlots);
@ -678,6 +684,9 @@ SNode* nodesCloneNode(const SNode* pNode) {
case QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL:
return physiIntervalCopy((const SIntervalPhysiNode*)pNode, (SIntervalPhysiNode*)pDst);
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION:
return physiSessionCopy((const SSessionWinodwPhysiNode*)pNode, (SSessionWinodwPhysiNode*)pDst);
default:
break;
}

View File

@ -246,6 +246,10 @@ const char* nodesNodeName(ENodeType type) {
return "PhysiSessionWindow";
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION:
return "PhysiStreamSessionWindow";
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION:
return "PhysiStreamSemiSessionWindow";
case QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION:
return "PhysiStreamFinalSessionWindow";
case QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE:
return "PhysiStateWindow";
case QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE:
@ -3998,6 +4002,8 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) {
return physiFillNodeToJson(pObj, pJson);
case QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION:
return physiSessionWindowNodeToJson(pObj, pJson);
case QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE:
@ -4131,6 +4137,8 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) {
return jsonToPhysiFillNode(pJson, pObj);
case QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION:
return jsonToPhysiSessionWindowNode(pJson, pObj);
case QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE:

View File

@ -289,6 +289,10 @@ SNode* nodesMakeNode(ENodeType type) {
return makeNode(type, sizeof(SSessionWinodwPhysiNode));
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION:
return makeNode(type, sizeof(SStreamSessionWinodwPhysiNode));
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION:
return makeNode(type, sizeof(SStreamSemiSessionWinodwPhysiNode));
case QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION:
return makeNode(type, sizeof(SStreamFinalSessionWinodwPhysiNode));
case QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE:
return makeNode(type, sizeof(SStateWinodwPhysiNode));
case QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE:
@ -806,6 +810,7 @@ void nodesDestroyNode(SNode* pNode) {
}
case QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION:
destroyWinodwPhysiNode((SWinodwPhysiNode*)pNode);
break;

View File

@ -285,7 +285,7 @@ class MockCatalogServiceImpl {
}
void createSmaIndex(const SMCreateSmaReq* pReq) {
STableIndexInfo info;
STableIndexInfo info = {0};
info.intervalUnit = pReq->intervalUnit;
info.slidingUnit = pReq->slidingUnit;
info.interval = pReq->interval;

View File

@ -548,6 +548,7 @@ static int32_t createWindowLogicNodeBySession(SLogicPlanContext* pCxt, SSessionW
pWindow->winType = WINDOW_TYPE_SESSION;
pWindow->sessionGap = ((SValueNode*)pSession->pGap)->datum.i;
pWindow->windowAlgo = pCxt->pPlanCxt->streamQuery ? SESSION_ALGO_STREAM_SINGLE : SESSION_ALGO_MERGE;
pWindow->pTspk = nodesCloneNode((SNode*)pSession->pCol);
if (NULL == pWindow->pTspk) {
@ -572,7 +573,7 @@ static int32_t createWindowLogicNodeByInterval(SLogicPlanContext* pCxt, SInterva
pWindow->sliding = (NULL != pInterval->pSliding ? ((SValueNode*)pInterval->pSliding)->datum.i : pWindow->interval);
pWindow->slidingUnit =
(NULL != pInterval->pSliding ? ((SValueNode*)pInterval->pSliding)->unit : pWindow->intervalUnit);
pWindow->intervalAlgo = pCxt->pPlanCxt->streamQuery ? INTERVAL_ALGO_STREAM_SINGLE : INTERVAL_ALGO_HASH;
pWindow->windowAlgo = pCxt->pPlanCxt->streamQuery ? INTERVAL_ALGO_STREAM_SINGLE : INTERVAL_ALGO_HASH;
pWindow->pTspk = nodesCloneNode(pInterval->pCol);
if (NULL == pWindow->pTspk) {

View File

@ -977,8 +977,8 @@ static int32_t createWindowPhysiNodeFinalize(SPhysiPlanContext* pCxt, SNodeList*
return code;
}
static ENodeType getIntervalOperatorType(EIntervalAlgorithm intervalAlgo) {
switch (intervalAlgo) {
static ENodeType getIntervalOperatorType(EWindowAlgorithm windowAlgo) {
switch (windowAlgo) {
case INTERVAL_ALGO_HASH:
return QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL;
case INTERVAL_ALGO_MERGE:
@ -989,6 +989,14 @@ static ENodeType getIntervalOperatorType(EIntervalAlgorithm intervalAlgo) {
return QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL;
case INTERVAL_ALGO_STREAM_SINGLE:
return QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL;
case SESSION_ALGO_STREAM_FINAL:
return QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION;
case SESSION_ALGO_STREAM_SEMI:
return QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION;
case SESSION_ALGO_STREAM_SINGLE:
return QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION;
case SESSION_ALGO_MERGE:
return QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION;
default:
break;
}
@ -998,7 +1006,7 @@ static ENodeType getIntervalOperatorType(EIntervalAlgorithm intervalAlgo) {
static int32_t createIntervalPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren,
SWindowLogicNode* pWindowLogicNode, SPhysiNode** pPhyNode) {
SIntervalPhysiNode* pInterval = (SIntervalPhysiNode*)makePhysiNode(
pCxt, (SLogicNode*)pWindowLogicNode, getIntervalOperatorType(pWindowLogicNode->intervalAlgo));
pCxt, (SLogicNode*)pWindowLogicNode, getIntervalOperatorType(pWindowLogicNode->windowAlgo));
if (NULL == pInterval) {
return TSDB_CODE_OUT_OF_MEMORY;
}
@ -1015,8 +1023,7 @@ static int32_t createIntervalPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChil
static int32_t createSessionWindowPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren,
SWindowLogicNode* pWindowLogicNode, SPhysiNode** pPhyNode) {
SSessionWinodwPhysiNode* pSession = (SSessionWinodwPhysiNode*)makePhysiNode(
pCxt, (SLogicNode*)pWindowLogicNode,
(pCxt->pPlanCxt->streamQuery ? QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION : QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION));
pCxt, (SLogicNode*)pWindowLogicNode, getIntervalOperatorType(pWindowLogicNode->windowAlgo));
if (NULL == pSession) {
return TSDB_CODE_OUT_OF_MEMORY;
}

View File

@ -376,8 +376,8 @@ static int32_t stbSplSplitIntervalForBatch(SSplitContext* pCxt, SStableSplitInfo
SLogicNode* pPartWindow = NULL;
int32_t code = stbSplCreatePartWindowNode((SWindowLogicNode*)pInfo->pSplitNode, &pPartWindow);
if (TSDB_CODE_SUCCESS == code) {
((SWindowLogicNode*)pPartWindow)->intervalAlgo = INTERVAL_ALGO_HASH;
((SWindowLogicNode*)pInfo->pSplitNode)->intervalAlgo = INTERVAL_ALGO_MERGE;
((SWindowLogicNode*)pPartWindow)->windowAlgo = INTERVAL_ALGO_HASH;
((SWindowLogicNode*)pInfo->pSplitNode)->windowAlgo = INTERVAL_ALGO_MERGE;
SNodeList* pMergeKeys = NULL;
code = stbSplCreateMergeKeysByPrimaryKey(((SWindowLogicNode*)pInfo->pSplitNode)->pTspk, &pMergeKeys);
if (TSDB_CODE_SUCCESS == code) {
@ -400,8 +400,8 @@ static int32_t stbSplSplitIntervalForStream(SSplitContext* pCxt, SStableSplitInf
SLogicNode* pPartWindow = NULL;
int32_t code = stbSplCreatePartWindowNode((SWindowLogicNode*)pInfo->pSplitNode, &pPartWindow);
if (TSDB_CODE_SUCCESS == code) {
((SWindowLogicNode*)pPartWindow)->intervalAlgo = INTERVAL_ALGO_STREAM_SEMI;
((SWindowLogicNode*)pInfo->pSplitNode)->intervalAlgo = INTERVAL_ALGO_STREAM_FINAL;
((SWindowLogicNode*)pPartWindow)->windowAlgo = INTERVAL_ALGO_STREAM_SEMI;
((SWindowLogicNode*)pInfo->pSplitNode)->windowAlgo = INTERVAL_ALGO_STREAM_FINAL;
code = stbSplCreateExchangeNode(pCxt, pInfo->pSplitNode, pPartWindow);
}
if (TSDB_CODE_SUCCESS == code) {
@ -421,8 +421,29 @@ static int32_t stbSplSplitInterval(SSplitContext* pCxt, SStableSplitInfo* pInfo)
}
}
static int32_t stbSplSplitSessionForStream(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
SLogicNode* pPartWindow = NULL;
int32_t code = stbSplCreatePartWindowNode((SWindowLogicNode*)pInfo->pSplitNode, &pPartWindow);
if (TSDB_CODE_SUCCESS == code) {
((SWindowLogicNode*)pPartWindow)->windowAlgo = SESSION_ALGO_STREAM_SEMI;
((SWindowLogicNode*)pInfo->pSplitNode)->windowAlgo = SESSION_ALGO_STREAM_FINAL;
code = stbSplCreateExchangeNode(pCxt, pInfo->pSplitNode, pPartWindow);
}
if (TSDB_CODE_SUCCESS == code) {
code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren,
(SNode*)splCreateScanSubplan(pCxt, pPartWindow, SPLIT_FLAG_STABLE_SPLIT));
}
pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
++(pCxt->groupId);
return code;
}
static int32_t stbSplSplitSession(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
return TSDB_CODE_PLAN_INTERNAL_ERROR;
if (pCxt->pPlanCxt->streamQuery) {
return stbSplSplitSessionForStream(pCxt, pInfo);
} else {
return TSDB_CODE_PLAN_INTERNAL_ERROR;
}
}
static int32_t stbSplSplitWindowNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
@ -537,10 +558,12 @@ static int32_t stbSplCreateMergeKeys(SNodeList* pSortKeys, SNodeList* pTargets,
SNode* pNode = NULL;
FOREACH(pNode, pSortKeys) {
SOrderByExprNode* pSortKey = (SOrderByExprNode*)pNode;
SExprNode* pSortExpr = (SExprNode*)pSortKey->pExpr;
SNode* pTarget = NULL;
bool found = false;
FOREACH(pTarget, pTargets) {
if (0 == strcmp(((SExprNode*)pSortKey->pExpr)->aliasName, ((SColumnNode*)pTarget)->colName)) {
if ((QUERY_NODE_COLUMN == nodeType(pSortExpr) && nodesEqualNode((SNode*)pSortExpr, pTarget)) ||
(0 == strcmp(pSortExpr->aliasName, ((SColumnNode*)pTarget)->colName))) {
code = nodesListMakeStrictAppend(&pMergeKeys, stbSplCreateOrderByExpr(pSortKey, pTarget));
if (TSDB_CODE_SUCCESS != code) {
break;
@ -549,7 +572,7 @@ static int32_t stbSplCreateMergeKeys(SNodeList* pSortKeys, SNodeList* pTargets,
}
}
if (TSDB_CODE_SUCCESS == code && !found) {
SNode* pCol = stbSplCreateColumnNode((SExprNode*)pSortKey->pExpr);
SNode* pCol = stbSplCreateColumnNode(pSortExpr);
code = nodesListMakeStrictAppend(&pMergeKeys, stbSplCreateOrderByExpr(pSortKey, pCol));
if (TSDB_CODE_SUCCESS == code) {
code = nodesListStrictAppend(pTargets, pCol);

View File

@ -28,6 +28,8 @@ TEST_F(PlanOrderByTest, basic) {
// ORDER BY key is not in the projection list
run("SELECT c1 FROM t1 ORDER BY c2");
run("SELECT c1 AS a FROM t1 ORDER BY a");
run("SELECT c1 + 10 AS a FROM t1 ORDER BY a");
}
@ -59,4 +61,6 @@ TEST_F(PlanOrderByTest, stable) {
run("SELECT c2 FROM st1 ORDER BY c1");
run("SELECT c2 FROM st1 PARTITION BY c2 ORDER BY c1");
run("SELECT c1 AS a FROM st1 ORDER BY a");
}

View File

@ -72,6 +72,9 @@ int32_t taosEnvToCfg(const char *envStr, char *cfgStr) {
if (cfgNameLen > 0) {
memcpy(cfgStr, buf, cfgNameLen);
memset(&cfgStr[cfgNameLen], ' ', p - cfgStr - cfgNameLen + 1);
} else {
*cfgStr = '\0';
return -1;
}
}
return strlen(cfgStr);

View File

@ -0,0 +1,23 @@
@echo off
echo Executing copy_udf.bat
set SCRIPT_DIR=%cd%
echo SCRIPT_DIR: %SCRIPT_DIR%
cd ..\..\..
set TAOS_DIR=%cd%
echo find udf library in %TAOS_DIR%
set UDF1_DIR=%TAOS_DIR%\debug\build\lib\udf1.dll
set UDF2_DIR=%TAOS_DIR%\debug\build\lib\udf2.dll
echo %UDF1_DIR%
echo %UDF2_DIR%
set UDF_TMP=C:\Windows\Temp\udf
rm -rf %UDF_TMP%
mkdir %UDF_TMP%
echo Copy udf shared library files to %UDF_TMP%
cp %UDF1_DIR% %UDF_TMP%
cp %UDF2_DIR% %UDF_TMP%

View File

@ -19,8 +19,14 @@ sql show databases;
sql create table t (ts timestamp, f int);
sql insert into t values(now, 1)(now+1s, 2);
sql create function udf1 as '/tmp/udf/libudf1.so' outputtype int bufSize 8;
sql create aggregate function udf2 as '/tmp/udf/libudf2.so' outputtype double bufSize 8;
system_content printf %OS%
if $system_content == Windows_NT then
sql create function udf1 as 'C:\\Windows\\Temp\\udf1.dll' outputtype int bufSize 8;
sql create aggregate function udf2 as 'C:\\Windows\\Temp\\udf2.dll' outputtype double bufSize 8;
else
sql create function udf1 as '/tmp/udf/libudf1.so' outputtype int bufSize 8;
sql create aggregate function udf2 as '/tmp/udf/libudf2.so' outputtype double bufSize 8;
endi
sql show functions;
if $rows != 2 then
return -1

View File

@ -458,11 +458,17 @@ bool simExecuteSystemContentCmd(SScript *script, char *option) {
char buf[4096] = {0};
char buf1[4096 + 512] = {0};
char filename[400] = {0};
sprintf(filename, "%s/%s.tmp", simScriptDir, script->fileName);
sprintf(filename, "%s" TD_DIRSEP "%s.tmp", simScriptDir, script->fileName);
#ifdef WINDOWS
sprintf(buf, "cd %s && ", simScriptDir);
simVisuallizeOption(script, option, buf + strlen(buf));
sprintf(buf1, "%s > %s 2>nul", buf, filename);
#else
sprintf(buf, "cd %s; ", simScriptDir);
simVisuallizeOption(script, option, buf + strlen(buf));
sprintf(buf1, "%s > %s 2>/dev/null", buf, filename);
#endif
sprintf(script->system_exit_code, "%d", system(buf1));
simStoreSystemContentResult(script, filename);

View File

@ -206,7 +206,7 @@ SScript *simParseScript(char *fileName) {
for (int32_t i = 0; i < cmdlen; ++i) {
if (buffer[i] == '\r' || buffer[i] == '\n') {
buffer[i] = ' ';
buffer[i] = '\0';
}
}