fix: keep group for interval query
This commit is contained in:
parent
16209644c4
commit
f74c488c42
|
@ -749,7 +749,7 @@ void clearResultRowInitFlag(SqlFunctionCtx* pCtx, int32_t numOfOutput);
|
||||||
|
|
||||||
SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pResultRowInfo, char* pData,
|
SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pResultRowInfo, char* pData,
|
||||||
int16_t bytes, bool masterscan, uint64_t groupId, SExecTaskInfo* pTaskInfo,
|
int16_t bytes, bool masterscan, uint64_t groupId, SExecTaskInfo* pTaskInfo,
|
||||||
bool isIntervalQuery, SAggSupporter* pSup);
|
bool isIntervalQuery, SAggSupporter* pSup, bool keepGroup);
|
||||||
// operator creater functions
|
// operator creater functions
|
||||||
// clang-format off
|
// clang-format off
|
||||||
SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode* pExNode, SExecTaskInfo* pTaskInfo);
|
SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode* pExNode, SExecTaskInfo* pTaskInfo);
|
||||||
|
|
|
@ -195,9 +195,11 @@ SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int32_t* currentPageId, i
|
||||||
*/
|
*/
|
||||||
SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pResultRowInfo, char* pData,
|
SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pResultRowInfo, char* pData,
|
||||||
int16_t bytes, bool masterscan, uint64_t groupId, SExecTaskInfo* pTaskInfo,
|
int16_t bytes, bool masterscan, uint64_t groupId, SExecTaskInfo* pTaskInfo,
|
||||||
bool isIntervalQuery, SAggSupporter* pSup) {
|
bool isIntervalQuery, SAggSupporter* pSup, bool keepGroup) {
|
||||||
SET_RES_WINDOW_KEY(pSup->keyBuf, pData, bytes, groupId);
|
SET_RES_WINDOW_KEY(pSup->keyBuf, pData, bytes, groupId);
|
||||||
|
if (!keepGroup) {
|
||||||
*(uint64_t*)pSup->keyBuf = calcGroupId(pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes));
|
*(uint64_t*)pSup->keyBuf = calcGroupId(pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes));
|
||||||
|
}
|
||||||
|
|
||||||
SResultRowPosition* p1 =
|
SResultRowPosition* p1 =
|
||||||
(SResultRowPosition*)tSimpleHashGet(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes));
|
(SResultRowPosition*)tSimpleHashGet(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes));
|
||||||
|
@ -1035,7 +1037,7 @@ void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uin
|
||||||
int32_t* rowEntryInfoOffset = pOperator->exprSupp.rowEntryInfoOffset;
|
int32_t* rowEntryInfoOffset = pOperator->exprSupp.rowEntryInfoOffset;
|
||||||
|
|
||||||
SResultRow* pResultRow = doSetResultOutBufByKey(pAggInfo->aggSup.pResultBuf, pResultRowInfo, (char*)&groupId,
|
SResultRow* pResultRow = doSetResultOutBufByKey(pAggInfo->aggSup.pResultBuf, pResultRowInfo, (char*)&groupId,
|
||||||
sizeof(groupId), true, groupId, pTaskInfo, false, &pAggInfo->aggSup);
|
sizeof(groupId), true, groupId, pTaskInfo, false, &pAggInfo->aggSup, true);
|
||||||
/*
|
/*
|
||||||
* not assign result buffer yet, add new result buffer
|
* not assign result buffer yet, add new result buffer
|
||||||
* all group belong to one result set, and each group result has different group id so set the id to be one
|
* all group belong to one result set, and each group result has different group id so set the id to be one
|
||||||
|
|
|
@ -918,7 +918,7 @@ int32_t setGroupResultOutputBuf(SOperatorInfo* pOperator, SOptrBasicInfo* binfo,
|
||||||
SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx;
|
SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx;
|
||||||
|
|
||||||
SResultRow* pResultRow =
|
SResultRow* pResultRow =
|
||||||
doSetResultOutBufByKey(pBuf, pResultRowInfo, (char*)pData, bytes, true, groupId, pTaskInfo, false, pAggSup);
|
doSetResultOutBufByKey(pBuf, pResultRowInfo, (char*)pData, bytes, true, groupId, pTaskInfo, false, pAggSup, false);
|
||||||
assert(pResultRow != NULL);
|
assert(pResultRow != NULL);
|
||||||
|
|
||||||
setResultRowInitCtx(pResultRow, pCtx, numOfCols, pOperator->exprSupp.rowEntryInfoOffset);
|
setResultRowInitCtx(pResultRow, pCtx, numOfCols, pOperator->exprSupp.rowEntryInfoOffset);
|
||||||
|
|
|
@ -580,7 +580,7 @@ void setFunctionResultOutput(SOperatorInfo* pOperator, SOptrBasicInfo* pInfo, SA
|
||||||
int64_t tid = 0;
|
int64_t tid = 0;
|
||||||
int64_t groupId = 0;
|
int64_t groupId = 0;
|
||||||
SResultRow* pRow = doSetResultOutBufByKey(pSup->pResultBuf, pResultRowInfo, (char*)&tid, sizeof(tid), true, groupId,
|
SResultRow* pRow = doSetResultOutBufByKey(pSup->pResultBuf, pResultRowInfo, (char*)&tid, sizeof(tid), true, groupId,
|
||||||
pTaskInfo, false, pSup);
|
pTaskInfo, false, pSup, true);
|
||||||
|
|
||||||
for (int32_t i = 0; i < numOfExprs; ++i) {
|
for (int32_t i = 0; i < numOfExprs; ++i) {
|
||||||
struct SResultRowEntryInfo* pEntry = getResultEntryInfo(pRow, i, rowEntryInfoOffset);
|
struct SResultRowEntryInfo* pEntry = getResultEntryInfo(pRow, i, rowEntryInfoOffset);
|
||||||
|
|
|
@ -78,7 +78,7 @@ static int32_t setTimeWindowOutputBuf(SResultRowInfo* pResultRowInfo, STimeWindo
|
||||||
int32_t numOfOutput, int32_t* rowEntryInfoOffset, SAggSupporter* pAggSup,
|
int32_t numOfOutput, int32_t* rowEntryInfoOffset, SAggSupporter* pAggSup,
|
||||||
SExecTaskInfo* pTaskInfo) {
|
SExecTaskInfo* pTaskInfo) {
|
||||||
SResultRow* pResultRow = doSetResultOutBufByKey(pAggSup->pResultBuf, pResultRowInfo, (char*)&win->skey, TSDB_KEYSIZE,
|
SResultRow* pResultRow = doSetResultOutBufByKey(pAggSup->pResultBuf, pResultRowInfo, (char*)&win->skey, TSDB_KEYSIZE,
|
||||||
masterscan, tableGroupId, pTaskInfo, true, pAggSup);
|
masterscan, tableGroupId, pTaskInfo, true, pAggSup, true);
|
||||||
|
|
||||||
if (pResultRow == NULL) {
|
if (pResultRow == NULL) {
|
||||||
*pResult = NULL;
|
*pResult = NULL;
|
||||||
|
|
Loading…
Reference in New Issue