From e5d17c95faaf53482a631a949eef7b829d7cf4e9 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 17 Apr 2021 14:56:30 +0800 Subject: [PATCH] [td-2859] --- src/client/src/tscLocalMerge.c | 9 ++++++++- src/client/src/tscServer.c | 1 - src/client/src/tscSubquery.c | 1 - src/client/src/tscUtil.c | 12 ++++++++++-- src/query/inc/qExecutor.h | 2 +- src/query/src/qExecutor.c | 8 +++++++- tests/script/general/parser/join_multivnode.sim | 2 +- 7 files changed, 27 insertions(+), 8 deletions(-) diff --git a/src/client/src/tscLocalMerge.c b/src/client/src/tscLocalMerge.c index 064d793a9c..86e5349a6f 100644 --- a/src/client/src/tscLocalMerge.c +++ b/src/client/src/tscLocalMerge.c @@ -1837,7 +1837,7 @@ static void appendOneRowToDataBlock(SSDataBlock *pBlock, char *buf, SColumnModel pBlock->info.rows += 1; } -SSDataBlock* doMultiwaySort(void* param, bool* newgroup) { +SSDataBlock* doMultiwayMergeSort(void* param, bool* newgroup) { SOperatorInfo* pOperator = (SOperatorInfo*) param; if (pOperator->status == OP_EXEC_DONE) { return NULL; @@ -2043,8 +2043,15 @@ SSDataBlock* doGlobalAggregate(void* param, bool* newgroup) { if (pInfoData->info.type == TSDB_DATA_TYPE_TIMESTAMP && pRes->info.rows > 0) { STimeWindow* w = &pRes->info.window; + + // TODO in case of desc order, swap it w->skey = *(int64_t*)pInfoData->pData; w->ekey = *(int64_t*)(((char*)pInfoData->pData) + TSDB_KEYSIZE * (pRes->info.rows - 1)); + + if (pOperator->pRuntimeEnv->pQueryAttr->order.order == TSDB_ORDER_DESC) { + SWAP(w->skey, w->ekey, TSKEY); + assert(w->skey <= w->ekey); + } } } diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 24690366c4..5590c9449f 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -1599,7 +1599,6 @@ int tscProcessRetrieveLocalMergeRsp(SSqlObj *pSql) { if (pRes->code == TSDB_CODE_SUCCESS && pRes->numOfRows > 0) { tscCreateResPointerInfo(pRes, pQueryInfo); tscSetResRawPtrRv(pRes, pQueryInfo, p); -// tscSetResRawPtr(pRes, pQueryInfo); } pRes->row = 0; diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index 3a09a8ad9b..da743e1597 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -3591,7 +3591,6 @@ void* createQueryInfoFromQueryNode(SQueryInfo* pQueryInfo, SExprInfo* pExprs, ST STsBufInfo bufInfo = {0}; SQueryParam param = {.pOperator = pa}; /*int32_t code = */initQInfo(&bufInfo, NULL, pQInfo, ¶m, NULL, 0, merger); -// pQInfo->runtimeEnv.proot->upstream = pSourceOperator; return pQInfo; _cleanup: diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index da310beb49..b9c68ebdce 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -3282,7 +3282,6 @@ static int32_t createSecondaryExpr(SQueryAttr* pQueryAttr, SQueryInfo* pQueryInf for (int32_t i = 0; i < pQueryAttr->numOfExpr2; ++i) { SInternalField* pField = tscFieldInfoGetInternalField(&pQueryInfo->fieldsInfo, i); SExprInfo* pExpr = pField->pExpr; -// SExprInfo *pExpr = &pQueryAttr->pExpr3[i]; SSqlExpr *pse = &pQueryAttr->pExpr2[i].base; pse->uid = pTableMetaInfo->pTableMeta->id.uid; @@ -3304,8 +3303,17 @@ static int32_t createSecondaryExpr(SQueryAttr* pQueryAttr, SQueryInfo* pQueryInf pse->resBytes = pExpr->base.resBytes; // TODO restore refactor + int32_t functionId = pExpr->base.functionId; + if (pExpr->base.functionId == TSDB_FUNC_FIRST_DST) { + functionId = TSDB_FUNC_FIRST; + } else if (pExpr->base.functionId == TSDB_FUNC_LAST_DST) { + functionId = TSDB_FUNC_LAST; + } else if (pExpr->base.functionId == TSDB_FUNC_STDDEV_DST) { + functionId = TSDB_FUNC_STDDEV; + } + int32_t inter = 0; - getResultDataInfo(pExpr->base.colType, pExpr->base.colBytes, pExpr->base.functionId, 0, &pse->resType, + getResultDataInfo(pExpr->base.colType, pExpr->base.colBytes, functionId, 0, &pse->resType, &pse->resBytes, &inter, 0, false); pse->colType = pse->resType; pse->colBytes = pse->resBytes; diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index 9b9a361bf5..e7ff0f32ed 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -499,7 +499,7 @@ SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, void* merger); SSDataBlock* doGlobalAggregate(void* param, bool* newgroup); -SSDataBlock* doMultiwaySort(void* param, bool* newgroup); +SSDataBlock* doMultiwayMergeSort(void* param, bool* newgroup); SSDataBlock* doSLimit(void* param, bool* newgroup); SSDataBlock* createOutputBuf(SExprInfo* pExpr, int32_t numOfOutput, int32_t numOfRows); diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 504b1b5169..d28c04bc24 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -954,6 +954,12 @@ static void arithmeticApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionC for (int32_t k = 0; k < numOfOutput; ++k) { pCtx[k].startTs = pQueryAttr->window.skey; + + // Always set the asc order for merge stage process + if (pCtx[k].currentStage == MERGE_STAGE) { + pCtx[k].order = TSDB_ORDER_ASC; + } + aAggs[pCtx[k].functionId].xFunction(&pCtx[k]); } } @@ -4500,7 +4506,7 @@ SOperatorInfo *createMultiwaySortOperatorInfo(SQueryRuntimeEnv *pRuntimeEnv, SEx pOperator->info = pInfo; pOperator->pRuntimeEnv = pRuntimeEnv; pOperator->numOfOutput = pRuntimeEnv->pQueryAttr->numOfCols; - pOperator->exec = doMultiwaySort; + pOperator->exec = doMultiwayMergeSort; return pOperator; } diff --git a/tests/script/general/parser/join_multivnode.sim b/tests/script/general/parser/join_multivnode.sim index 2322496a94..c72b2c5b3e 100644 --- a/tests/script/general/parser/join_multivnode.sim +++ b/tests/script/general/parser/join_multivnode.sim @@ -142,7 +142,7 @@ if $rows != 300 then endi if $data00 != @70-01-01 08:01:40.990@ then - print expect 0, actual: $data00 + print expect 70-01-01 08:01:40.990, actual: $data00 return -1 endi