fix(query): add attribute to control output order.
This commit is contained in:
parent
d7a525f21f
commit
915e4e40c7
|
@ -524,6 +524,7 @@ typedef struct SIntervalAggOperatorInfo {
|
|||
bool timeWindowInterpo; // interpolation needed or not
|
||||
SArray* pInterpCols; // interpolation columns
|
||||
int32_t order; // current SSDataBlock scan order
|
||||
int32_t resultTsOrder; // result timestamp order
|
||||
EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model]
|
||||
STimeWindowAggSupp twAggSup;
|
||||
bool invertible;
|
||||
|
|
|
@ -74,11 +74,12 @@ void taosFillSetInputDataBlock(struct SFillInfo* pFillInfo, const struct SSDataB
|
|||
struct SFillColInfo* createFillColInfo(SExprInfo* pExpr, int32_t numOfOutput, const struct SNodeListNode* val);
|
||||
bool taosFillHasMoreResults(struct SFillInfo* pFillInfo);
|
||||
|
||||
SFillInfo* taosCreateFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int32_t capacity, int32_t numOfCols,
|
||||
SFillInfo* taosCreateFillInfo(TSKEY skey, int32_t numOfTags, int32_t capacity, int32_t numOfCols,
|
||||
SInterval* pInterval, int32_t fillType, struct SFillColInfo* pCol, int32_t slotId,
|
||||
const char* id);
|
||||
|
||||
void* taosDestroyFillInfo(struct SFillInfo *pFillInfo);
|
||||
void taosFillSetDataOrderInfo(SFillInfo* pFillInfo, int32_t order);
|
||||
int64_t taosFillResultDataBlock(struct SFillInfo* pFillInfo, SSDataBlock* p, int32_t capacity);
|
||||
int64_t getFillInfoStart(struct SFillInfo *pFillInfo);
|
||||
|
||||
|
|
|
@ -3234,6 +3234,8 @@ static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) {
|
|||
return pResBlock;
|
||||
}
|
||||
|
||||
taosFillSetDataOrderInfo(pInfo->pFillInfo, TSDB_ORDER_ASC);
|
||||
|
||||
SOperatorInfo* pDownstream = pOperator->pDownstream[0];
|
||||
while (1) {
|
||||
SSDataBlock* pBlock = pDownstream->fpSet.getNextFn(pDownstream);
|
||||
|
@ -3592,9 +3594,8 @@ static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t
|
|||
STimeWindow w = getAlignQueryTimeWindow(pInterval, pInterval->precision, win.skey);
|
||||
w = getFirstQualifiedTimeWindow(win.skey, &w, pInterval, TSDB_ORDER_ASC);
|
||||
|
||||
int32_t order = TSDB_ORDER_ASC;
|
||||
pInfo->pFillInfo =
|
||||
taosCreateFillInfo(order, w.skey, 0, capacity, numOfCols, pInterval, fillType, pColInfo, pInfo->primaryTsCol, id);
|
||||
taosCreateFillInfo(w.skey, 0, capacity, numOfCols, pInterval, fillType, pColInfo, pInfo->primaryTsCol, id);
|
||||
|
||||
pInfo->win = win;
|
||||
pInfo->p = taosMemoryCalloc(numOfCols, POINTER_BYTES);
|
||||
|
|
|
@ -433,7 +433,7 @@ static int32_t taosNumOfRemainRows(SFillInfo* pFillInfo) {
|
|||
return pFillInfo->numOfRows - pFillInfo->index;
|
||||
}
|
||||
|
||||
struct SFillInfo* taosCreateFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int32_t capacity, int32_t numOfCols,
|
||||
struct SFillInfo* taosCreateFillInfo(TSKEY skey, int32_t numOfTags, int32_t capacity, int32_t numOfCols,
|
||||
SInterval* pInterval, int32_t fillType, struct SFillColInfo* pCol,
|
||||
int32_t primaryTsSlotId, const char* id) {
|
||||
if (fillType == TSDB_FILL_NONE) {
|
||||
|
@ -447,9 +447,7 @@ struct SFillInfo* taosCreateFillInfo(int32_t order, TSKEY skey, int32_t numOfTag
|
|||
}
|
||||
|
||||
pFillInfo->tsSlotId = primaryTsSlotId;
|
||||
|
||||
taosResetFillInfo(pFillInfo, skey);
|
||||
pFillInfo->order = order;
|
||||
|
||||
switch (fillType) {
|
||||
case FILL_MODE_NONE:
|
||||
|
@ -535,6 +533,14 @@ void* taosDestroyFillInfo(SFillInfo* pFillInfo) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
void taosFillSetDataOrderInfo(SFillInfo* pFillInfo, int32_t order) {
|
||||
if (pFillInfo == NULL || (order != TSDB_ORDER_ASC && order != TSDB_ORDER_DESC)) {
|
||||
return;
|
||||
}
|
||||
|
||||
pFillInfo->order = order;
|
||||
}
|
||||
|
||||
void taosFillSetStartInfo(SFillInfo* pFillInfo, int32_t numOfRows, TSKEY endKey) {
|
||||
if (pFillInfo->type == TSDB_FILL_NONE) {
|
||||
return;
|
||||
|
|
|
@ -1096,7 +1096,7 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) {
|
|||
hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, scanFlag, NULL);
|
||||
}
|
||||
|
||||
initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, pInfo->order);
|
||||
initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, pInfo->resultTsOrder);
|
||||
OPTR_SET_OPENED(pOperator);
|
||||
|
||||
pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
|
||||
|
@ -1249,7 +1249,6 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) {
|
|||
}
|
||||
|
||||
pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
|
||||
|
||||
pOperator->status = OP_RES_TO_RETURN;
|
||||
|
||||
initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, TSDB_ORDER_ASC);
|
||||
|
@ -1791,6 +1790,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
|
|||
|
||||
pInfo->win = pTaskInfo->window;
|
||||
pInfo->order = TSDB_ORDER_ASC;
|
||||
pInfo->resultTsOrder = TSDB_ORDER_ASC;
|
||||
pInfo->interval = *pInterval;
|
||||
pInfo->execModel = pTaskInfo->execModel;
|
||||
pInfo->twAggSup = *pTwAggSupp;
|
||||
|
@ -1807,7 +1807,6 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
|
|||
}
|
||||
|
||||
pInfo->primaryTsIndex = primaryTsSlotId;
|
||||
|
||||
SExprSupp* pSup = &pOperator->exprSupp;
|
||||
|
||||
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
|
||||
|
|
Loading…
Reference in New Issue