fix group_key not set properly
This commit is contained in:
parent
76eea9f7c4
commit
42df9f587f
|
@ -39,6 +39,7 @@ typedef struct STimeSliceOperatorInfo {
|
||||||
int64_t prevTs;
|
int64_t prevTs;
|
||||||
bool prevTsSet;
|
bool prevTsSet;
|
||||||
uint64_t groupId;
|
uint64_t groupId;
|
||||||
|
SSDataBlock* pCurrentGroupRes;
|
||||||
SSDataBlock* pNextGroupRes;
|
SSDataBlock* pNextGroupRes;
|
||||||
} STimeSliceOperatorInfo;
|
} STimeSliceOperatorInfo;
|
||||||
|
|
||||||
|
@ -197,18 +198,24 @@ static bool isInterpFunc(SExprInfo* pExprInfo) {
|
||||||
return (strcasecmp(name, "interp") == 0);
|
return (strcasecmp(name, "interp") == 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static bool isGroupKeyFunc(SExprInfo* pExprInfo) {
|
||||||
|
char *name = pExprInfo->pExpr->_function.functionName;
|
||||||
|
return (strcasecmp(name, "_group_key") == 0);
|
||||||
|
}
|
||||||
|
|
||||||
static bool genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp* pExprSup, SSDataBlock* pResBlock,
|
static bool genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp* pExprSup, SSDataBlock* pResBlock,
|
||||||
bool beforeTs) {
|
SSDataBlock* pSrcBlock, int32_t index, bool beforeTs) {
|
||||||
int32_t rows = pResBlock->info.rows;
|
int32_t rows = pResBlock->info.rows;
|
||||||
timeSliceEnsureBlockCapacity(pSliceInfo, pResBlock);
|
timeSliceEnsureBlockCapacity(pSliceInfo, pResBlock);
|
||||||
// todo set the correct primary timestamp column
|
// todo set the correct primary timestamp column
|
||||||
|
|
||||||
|
|
||||||
// output the result
|
// output the result
|
||||||
bool hasInterp = true;
|
bool hasInterp = true;
|
||||||
for (int32_t j = 0; j < pExprSup->numOfExprs; ++j) {
|
for (int32_t j = 0; j < pExprSup->numOfExprs; ++j) {
|
||||||
SExprInfo* pExprInfo = &pExprSup->pExprInfo[j];
|
SExprInfo* pExprInfo = &pExprSup->pExprInfo[j];
|
||||||
|
|
||||||
int32_t dstSlot = pExprInfo->base.resSchema.slotId;
|
int32_t dstSlot = pExprInfo->base.resSchema.slotId;
|
||||||
SColumnInfoData* pDst = taosArrayGet(pResBlock->pDataBlock, dstSlot);
|
SColumnInfoData* pDst = taosArrayGet(pResBlock->pDataBlock, dstSlot);
|
||||||
|
|
||||||
if (isIrowtsPseudoColumn(pExprInfo)) {
|
if (isIrowtsPseudoColumn(pExprInfo)) {
|
||||||
|
@ -219,6 +226,18 @@ static bool genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp
|
||||||
colDataAppend(pDst, pResBlock->info.rows, (char*)&isFilled, false);
|
colDataAppend(pDst, pResBlock->info.rows, (char*)&isFilled, false);
|
||||||
continue;
|
continue;
|
||||||
} else if (!isInterpFunc(pExprInfo)) {
|
} else if (!isInterpFunc(pExprInfo)) {
|
||||||
|
if (isGroupKeyFunc(pExprInfo)) {
|
||||||
|
int32_t srcSlot = pExprInfo->base.pParam[0].pCol->slotId;
|
||||||
|
SColumnInfoData* pSrc = taosArrayGet(pSrcBlock->pDataBlock, srcSlot);
|
||||||
|
|
||||||
|
if (colDataIsNull_s(pSrc, index)) {
|
||||||
|
colDataSetNULL(pDst, pResBlock->info.rows);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
char* v = colDataGetData(pSrc, index);
|
||||||
|
colDataSetVal(pDst, pResBlock->info.rows, v, false);
|
||||||
|
}
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -345,7 +364,7 @@ static void addCurrentRowToResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp*
|
||||||
bool isFilled = false;
|
bool isFilled = false;
|
||||||
colDataSetVal(pDst, pResBlock->info.rows, (char*)&isFilled, false);
|
colDataSetVal(pDst, pResBlock->info.rows, (char*)&isFilled, false);
|
||||||
} else {
|
} else {
|
||||||
int32_t srcSlot = pExprInfo->base.pParam[0].pCol->slotId;
|
int32_t srcSlot = pExprInfo->base.pParam[0].pCol->slotId;
|
||||||
SColumnInfoData* pSrc = taosArrayGet(pSrcBlock->pDataBlock, srcSlot);
|
SColumnInfoData* pSrc = taosArrayGet(pSrcBlock->pDataBlock, srcSlot);
|
||||||
|
|
||||||
if (colDataIsNull_s(pSrc, index)) {
|
if (colDataIsNull_s(pSrc, index)) {
|
||||||
|
@ -559,7 +578,7 @@ static void doTimesliceImpl(SOperatorInfo* pOperator, STimeSliceOperatorInfo* pS
|
||||||
int64_t nextTs = *(int64_t*)colDataGetData(pTsCol, i + 1);
|
int64_t nextTs = *(int64_t*)colDataGetData(pTsCol, i + 1);
|
||||||
if (nextTs > pSliceInfo->current) {
|
if (nextTs > pSliceInfo->current) {
|
||||||
while (pSliceInfo->current < nextTs && pSliceInfo->current <= pSliceInfo->win.ekey) {
|
while (pSliceInfo->current < nextTs && pSliceInfo->current <= pSliceInfo->win.ekey) {
|
||||||
if (!genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock, false) &&
|
if (!genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock, pBlock, i, false) &&
|
||||||
pSliceInfo->fillType == TSDB_FILL_LINEAR) {
|
pSliceInfo->fillType == TSDB_FILL_LINEAR) {
|
||||||
break;
|
break;
|
||||||
} else {
|
} else {
|
||||||
|
@ -583,7 +602,7 @@ static void doTimesliceImpl(SOperatorInfo* pOperator, STimeSliceOperatorInfo* pS
|
||||||
doKeepLinearInfo(pSliceInfo, pBlock, i);
|
doKeepLinearInfo(pSliceInfo, pBlock, i);
|
||||||
|
|
||||||
while (pSliceInfo->current < ts && pSliceInfo->current <= pSliceInfo->win.ekey) {
|
while (pSliceInfo->current < ts && pSliceInfo->current <= pSliceInfo->win.ekey) {
|
||||||
if (!genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock, true) &&
|
if (!genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock, pBlock, i, true) &&
|
||||||
pSliceInfo->fillType == TSDB_FILL_LINEAR) {
|
pSliceInfo->fillType == TSDB_FILL_LINEAR) {
|
||||||
break;
|
break;
|
||||||
} else {
|
} else {
|
||||||
|
@ -608,13 +627,14 @@ static void doTimesliceImpl(SOperatorInfo* pOperator, STimeSliceOperatorInfo* pS
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static void genInterpAfterDataBlock(STimeSliceOperatorInfo* pSliceInfo, SOperatorInfo* pOperator) {
|
static void genInterpAfterDataBlock(STimeSliceOperatorInfo* pSliceInfo, SOperatorInfo* pOperator, SSDataBlock* pSrcBlock,
|
||||||
|
int32_t index) {
|
||||||
SSDataBlock* pResBlock = pSliceInfo->pRes;
|
SSDataBlock* pResBlock = pSliceInfo->pRes;
|
||||||
SInterval* pInterval = &pSliceInfo->interval;
|
SInterval* pInterval = &pSliceInfo->interval;
|
||||||
|
|
||||||
while (pSliceInfo->current <= pSliceInfo->win.ekey && pSliceInfo->fillType != TSDB_FILL_NEXT &&
|
while (pSliceInfo->current <= pSliceInfo->win.ekey && pSliceInfo->fillType != TSDB_FILL_NEXT &&
|
||||||
pSliceInfo->fillType != TSDB_FILL_LINEAR) {
|
pSliceInfo->fillType != TSDB_FILL_LINEAR) {
|
||||||
genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock, false);
|
genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock, pSrcBlock, index, false);
|
||||||
pSliceInfo->current =
|
pSliceInfo->current =
|
||||||
taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision);
|
taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision);
|
||||||
}
|
}
|
||||||
|
@ -645,7 +665,9 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
if (pSliceInfo->pNextGroupRes != NULL) {
|
if (pSliceInfo->pNextGroupRes != NULL) {
|
||||||
|
setInputDataBlock(pSup, pSliceInfo->pNextGroupRes, order, MAIN_SCAN, true);
|
||||||
doTimesliceImpl(pOperator, pSliceInfo, pSliceInfo->pNextGroupRes, pTaskInfo);
|
doTimesliceImpl(pOperator, pSliceInfo, pSliceInfo->pNextGroupRes, pTaskInfo);
|
||||||
|
pSliceInfo->pCurrentGroupRes = pSliceInfo->pNextGroupRes;
|
||||||
pSliceInfo->pNextGroupRes = NULL;
|
pSliceInfo->pNextGroupRes = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -679,11 +701,13 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
|
||||||
// the pDataBlock are always the same one, no need to call this again
|
// the pDataBlock are always the same one, no need to call this again
|
||||||
setInputDataBlock(pSup, pBlock, order, MAIN_SCAN, true);
|
setInputDataBlock(pSup, pBlock, order, MAIN_SCAN, true);
|
||||||
doTimesliceImpl(pOperator, pSliceInfo, pBlock, pTaskInfo);
|
doTimesliceImpl(pOperator, pSliceInfo, pBlock, pTaskInfo);
|
||||||
|
pSliceInfo->pCurrentGroupRes = pBlock;
|
||||||
}
|
}
|
||||||
|
|
||||||
// check if need to interpolate after last datablock
|
// check if need to interpolate after last datablock
|
||||||
// except for fill(next), fill(linear)
|
// except for fill(next), fill(linear)
|
||||||
genInterpAfterDataBlock(pSliceInfo, pOperator);
|
genInterpAfterDataBlock(pSliceInfo, pOperator, pSliceInfo->pCurrentGroupRes,
|
||||||
|
pSliceInfo->pCurrentGroupRes->info.rows - 1);
|
||||||
|
|
||||||
doFilter(pResBlock, pOperator->exprSupp.pFilterInfo, NULL);
|
doFilter(pResBlock, pOperator->exprSupp.pFilterInfo, NULL);
|
||||||
if (pOperator->status == OP_EXEC_DONE) {
|
if (pOperator->status == OP_EXEC_DONE) {
|
||||||
|
@ -748,6 +772,7 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode
|
||||||
pInfo->prevTs = 0;
|
pInfo->prevTs = 0;
|
||||||
pInfo->groupId = 0;
|
pInfo->groupId = 0;
|
||||||
pInfo->pNextGroupRes = NULL;
|
pInfo->pNextGroupRes = NULL;
|
||||||
|
pInfo->pCurrentGroupRes = NULL;
|
||||||
|
|
||||||
if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
|
if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
|
||||||
STableScanInfo* pScanInfo = (STableScanInfo*)downstream->info;
|
STableScanInfo* pScanInfo = (STableScanInfo*)downstream->info;
|
||||||
|
|
Loading…
Reference in New Issue