[td-2859]
This commit is contained in:
parent
d54ea3c297
commit
e5d17c95fa
|
@ -1837,7 +1837,7 @@ static void appendOneRowToDataBlock(SSDataBlock *pBlock, char *buf, SColumnModel
|
|||
pBlock->info.rows += 1;
|
||||
}
|
||||
|
||||
SSDataBlock* doMultiwaySort(void* param, bool* newgroup) {
|
||||
SSDataBlock* doMultiwayMergeSort(void* param, bool* newgroup) {
|
||||
SOperatorInfo* pOperator = (SOperatorInfo*) param;
|
||||
if (pOperator->status == OP_EXEC_DONE) {
|
||||
return NULL;
|
||||
|
@ -2043,8 +2043,15 @@ SSDataBlock* doGlobalAggregate(void* param, bool* newgroup) {
|
|||
|
||||
if (pInfoData->info.type == TSDB_DATA_TYPE_TIMESTAMP && pRes->info.rows > 0) {
|
||||
STimeWindow* w = &pRes->info.window;
|
||||
|
||||
// TODO in case of desc order, swap it
|
||||
w->skey = *(int64_t*)pInfoData->pData;
|
||||
w->ekey = *(int64_t*)(((char*)pInfoData->pData) + TSDB_KEYSIZE * (pRes->info.rows - 1));
|
||||
|
||||
if (pOperator->pRuntimeEnv->pQueryAttr->order.order == TSDB_ORDER_DESC) {
|
||||
SWAP(w->skey, w->ekey, TSKEY);
|
||||
assert(w->skey <= w->ekey);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -1599,7 +1599,6 @@ int tscProcessRetrieveLocalMergeRsp(SSqlObj *pSql) {
|
|||
if (pRes->code == TSDB_CODE_SUCCESS && pRes->numOfRows > 0) {
|
||||
tscCreateResPointerInfo(pRes, pQueryInfo);
|
||||
tscSetResRawPtrRv(pRes, pQueryInfo, p);
|
||||
// tscSetResRawPtr(pRes, pQueryInfo);
|
||||
}
|
||||
|
||||
pRes->row = 0;
|
||||
|
|
|
@ -3591,7 +3591,6 @@ void* createQueryInfoFromQueryNode(SQueryInfo* pQueryInfo, SExprInfo* pExprs, ST
|
|||
STsBufInfo bufInfo = {0};
|
||||
SQueryParam param = {.pOperator = pa};
|
||||
/*int32_t code = */initQInfo(&bufInfo, NULL, pQInfo, ¶m, NULL, 0, merger);
|
||||
// pQInfo->runtimeEnv.proot->upstream = pSourceOperator;
|
||||
return pQInfo;
|
||||
|
||||
_cleanup:
|
||||
|
|
|
@ -3282,7 +3282,6 @@ static int32_t createSecondaryExpr(SQueryAttr* pQueryAttr, SQueryInfo* pQueryInf
|
|||
for (int32_t i = 0; i < pQueryAttr->numOfExpr2; ++i) {
|
||||
SInternalField* pField = tscFieldInfoGetInternalField(&pQueryInfo->fieldsInfo, i);
|
||||
SExprInfo* pExpr = pField->pExpr;
|
||||
// SExprInfo *pExpr = &pQueryAttr->pExpr3[i];
|
||||
|
||||
SSqlExpr *pse = &pQueryAttr->pExpr2[i].base;
|
||||
pse->uid = pTableMetaInfo->pTableMeta->id.uid;
|
||||
|
@ -3304,8 +3303,17 @@ static int32_t createSecondaryExpr(SQueryAttr* pQueryAttr, SQueryInfo* pQueryInf
|
|||
pse->resBytes = pExpr->base.resBytes;
|
||||
|
||||
// TODO restore refactor
|
||||
int32_t functionId = pExpr->base.functionId;
|
||||
if (pExpr->base.functionId == TSDB_FUNC_FIRST_DST) {
|
||||
functionId = TSDB_FUNC_FIRST;
|
||||
} else if (pExpr->base.functionId == TSDB_FUNC_LAST_DST) {
|
||||
functionId = TSDB_FUNC_LAST;
|
||||
} else if (pExpr->base.functionId == TSDB_FUNC_STDDEV_DST) {
|
||||
functionId = TSDB_FUNC_STDDEV;
|
||||
}
|
||||
|
||||
int32_t inter = 0;
|
||||
getResultDataInfo(pExpr->base.colType, pExpr->base.colBytes, pExpr->base.functionId, 0, &pse->resType,
|
||||
getResultDataInfo(pExpr->base.colType, pExpr->base.colBytes, functionId, 0, &pse->resType,
|
||||
&pse->resBytes, &inter, 0, false);
|
||||
pse->colType = pse->resType;
|
||||
pse->colBytes = pse->resBytes;
|
||||
|
|
|
@ -499,7 +499,7 @@ SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv,
|
|||
SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, void* merger);
|
||||
|
||||
SSDataBlock* doGlobalAggregate(void* param, bool* newgroup);
|
||||
SSDataBlock* doMultiwaySort(void* param, bool* newgroup);
|
||||
SSDataBlock* doMultiwayMergeSort(void* param, bool* newgroup);
|
||||
SSDataBlock* doSLimit(void* param, bool* newgroup);
|
||||
|
||||
SSDataBlock* createOutputBuf(SExprInfo* pExpr, int32_t numOfOutput, int32_t numOfRows);
|
||||
|
|
|
@ -954,6 +954,12 @@ static void arithmeticApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionC
|
|||
|
||||
for (int32_t k = 0; k < numOfOutput; ++k) {
|
||||
pCtx[k].startTs = pQueryAttr->window.skey;
|
||||
|
||||
// Always set the asc order for merge stage process
|
||||
if (pCtx[k].currentStage == MERGE_STAGE) {
|
||||
pCtx[k].order = TSDB_ORDER_ASC;
|
||||
}
|
||||
|
||||
aAggs[pCtx[k].functionId].xFunction(&pCtx[k]);
|
||||
}
|
||||
}
|
||||
|
@ -4500,7 +4506,7 @@ SOperatorInfo *createMultiwaySortOperatorInfo(SQueryRuntimeEnv *pRuntimeEnv, SEx
|
|||
pOperator->info = pInfo;
|
||||
pOperator->pRuntimeEnv = pRuntimeEnv;
|
||||
pOperator->numOfOutput = pRuntimeEnv->pQueryAttr->numOfCols;
|
||||
pOperator->exec = doMultiwaySort;
|
||||
pOperator->exec = doMultiwayMergeSort;
|
||||
|
||||
return pOperator;
|
||||
}
|
||||
|
|
|
@ -142,7 +142,7 @@ if $rows != 300 then
|
|||
endi
|
||||
|
||||
if $data00 != @70-01-01 08:01:40.990@ then
|
||||
print expect 0, actual: $data00
|
||||
print expect 70-01-01 08:01:40.990, actual: $data00
|
||||
return -1
|
||||
endi
|
||||
|
||||
|
|
Loading…
Reference in New Issue