fix(query): set correct fill column index.
This commit is contained in:
parent
166cdcec7c
commit
7084c765d0
|
@ -3191,25 +3191,33 @@ int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDa
|
|||
}
|
||||
}
|
||||
|
||||
static void doHandleRemainBlockForNewGroupImpl(SFillOperatorInfo* pInfo, SResultInfo* pResultInfo,
|
||||
static void doApplyScalarCalculation(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32_t order, int32_t scanFlag);
|
||||
static void doHandleRemainBlockForNewGroupImpl(SOperatorInfo *pOperator, SFillOperatorInfo* pInfo, SResultInfo* pResultInfo,
|
||||
SExecTaskInfo* pTaskInfo) {
|
||||
pInfo->totalInputRows = pInfo->existNewGroupBlock->info.rows;
|
||||
SSDataBlock* pResBlock = pInfo->pFinalRes;
|
||||
|
||||
int32_t order = TSDB_ORDER_ASC;
|
||||
int32_t scanFlag = MAIN_SCAN;
|
||||
getTableScanInfo(pOperator, &order, &scanFlag);
|
||||
|
||||
int64_t ekey =
|
||||
Q_STATUS_EQUAL(pTaskInfo->status, TASK_COMPLETED) ? pInfo->win.ekey : pInfo->existNewGroupBlock->info.window.ekey;
|
||||
taosResetFillInfo(pInfo->pFillInfo, getFillInfoStart(pInfo->pFillInfo));
|
||||
|
||||
taosFillSetStartInfo(pInfo->pFillInfo, pInfo->existNewGroupBlock->info.rows, ekey);
|
||||
taosFillSetInputDataBlock(pInfo->pFillInfo, pInfo->existNewGroupBlock);
|
||||
doApplyScalarCalculation(pOperator, pInfo->existNewGroupBlock, order, scanFlag);
|
||||
|
||||
int32_t numOfResultRows = pResultInfo->capacity - pInfo->pRes->info.rows;
|
||||
taosFillResultDataBlock(pInfo->pFillInfo, pInfo->pRes, numOfResultRows);
|
||||
taosFillSetStartInfo(pInfo->pFillInfo, pInfo->pRes->info.rows, ekey);
|
||||
taosFillSetInputDataBlock(pInfo->pFillInfo, pInfo->pRes);
|
||||
|
||||
int32_t numOfResultRows = pResultInfo->capacity - pResBlock->info.rows;
|
||||
taosFillResultDataBlock(pInfo->pFillInfo, pResBlock, numOfResultRows);
|
||||
|
||||
pInfo->curGroupId = pInfo->existNewGroupBlock->info.groupId;
|
||||
pInfo->existNewGroupBlock = NULL;
|
||||
}
|
||||
|
||||
static void doHandleRemainBlockFromNewGroup(SFillOperatorInfo* pInfo, SResultInfo* pResultInfo,
|
||||
static void doHandleRemainBlockFromNewGroup(SOperatorInfo* pOperator, SFillOperatorInfo* pInfo, SResultInfo* pResultInfo,
|
||||
SExecTaskInfo* pTaskInfo) {
|
||||
if (taosFillHasMoreResults(pInfo->pFillInfo)) {
|
||||
int32_t numOfResultRows = pResultInfo->capacity - pInfo->pFinalRes->info.rows;
|
||||
|
@ -3220,7 +3228,34 @@ static void doHandleRemainBlockFromNewGroup(SFillOperatorInfo* pInfo, SResultInf
|
|||
|
||||
// handle the cached new group data block
|
||||
if (pInfo->existNewGroupBlock) {
|
||||
doHandleRemainBlockForNewGroupImpl(pInfo, pResultInfo, pTaskInfo);
|
||||
doHandleRemainBlockForNewGroupImpl(pOperator, pInfo, pResultInfo, pTaskInfo);
|
||||
}
|
||||
}
|
||||
|
||||
static void doApplyScalarCalculation(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32_t order, int32_t scanFlag) {
|
||||
SFillOperatorInfo* pInfo = pOperator->info;
|
||||
SExprSupp* pSup = &pOperator->exprSupp;
|
||||
SSDataBlock* pResBlock = pInfo->pFinalRes;
|
||||
|
||||
setInputDataBlock(pOperator, pSup->pCtx, pBlock, order, scanFlag, false);
|
||||
projectApplyFunctions(pSup->pExprInfo, pInfo->pRes, pBlock, pSup->pCtx, pSup->numOfExprs, NULL);
|
||||
pInfo->pRes->info.groupId = pBlock->info.groupId;
|
||||
|
||||
SColumnInfoData* pDst = taosArrayGet(pInfo->pRes->pDataBlock, pInfo->primaryTsCol);
|
||||
SColumnInfoData* pSrc = taosArrayGet(pBlock->pDataBlock, pInfo->primarySrcSlotId);
|
||||
colDataAssign(pDst, pSrc, pInfo->pRes->info.rows, &pResBlock->info);
|
||||
|
||||
for(int32_t i = 0; i < pInfo->numOfNotFillExpr; ++i) {
|
||||
SFillColInfo* pCol = &pInfo->pFillInfo->pFillCol[i + pInfo->numOfExpr];
|
||||
ASSERT(pCol->notFillCol);
|
||||
|
||||
SExprInfo* pExpr = pCol->pExpr;
|
||||
int32_t srcSlotId = pExpr->base.pParam[0].pCol->slotId;
|
||||
int32_t dstSlotId = pExpr->base.resSchema.slotId;
|
||||
|
||||
SColumnInfoData* pDst1 = taosArrayGet(pInfo->pRes->pDataBlock, dstSlotId);
|
||||
SColumnInfoData* pSrc1 = taosArrayGet(pBlock->pDataBlock, srcSlotId);
|
||||
colDataAssign(pDst1, pSrc1, pInfo->pRes->info.rows, &pResBlock->info);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -3236,8 +3271,9 @@ static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) {
|
|||
|
||||
int32_t order = TSDB_ORDER_ASC;
|
||||
int32_t scanFlag = MAIN_SCAN;
|
||||
getTableScanInfo(pOperator, &order, &scanFlag);
|
||||
|
||||
doHandleRemainBlockFromNewGroup(pInfo, pResultInfo, pTaskInfo);
|
||||
doHandleRemainBlockFromNewGroup(pOperator, pInfo, pResultInfo, pTaskInfo);
|
||||
if (pResBlock->info.rows > 0) {
|
||||
pResBlock->info.groupId = pInfo->curGroupId;
|
||||
return pResBlock;
|
||||
|
@ -3255,16 +3291,7 @@ static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) {
|
|||
taosFillSetStartInfo(pInfo->pFillInfo, 0, pInfo->win.ekey);
|
||||
} else {
|
||||
blockDataUpdateTsWindow(pBlock, pInfo->primaryTsCol);
|
||||
SExprSupp* pSup = &pOperator->exprSupp;
|
||||
|
||||
getTableScanInfo(pOperator, &order, &scanFlag);
|
||||
setInputDataBlock(pOperator, pSup->pCtx, pBlock, order, scanFlag, false);
|
||||
projectApplyFunctions(pSup->pExprInfo, pInfo->pRes, pBlock, pSup->pCtx, pSup->numOfExprs, NULL);
|
||||
pInfo->pRes->info.groupId = pBlock->info.groupId;
|
||||
|
||||
SColumnInfoData* pDst = taosArrayGet(pInfo->pRes->pDataBlock, pInfo->primaryTsCol);
|
||||
SColumnInfoData* pSrc = taosArrayGet(pBlock->pDataBlock, pInfo->primarySrcSlotId);
|
||||
colDataAssign(pDst, pSrc, pInfo->pRes->info.rows, &pResBlock->info);
|
||||
doApplyScalarCalculation(pOperator, pBlock, order, scanFlag);
|
||||
|
||||
if (pInfo->curGroupId == 0 || pInfo->curGroupId == pInfo->pRes->info.groupId) {
|
||||
pInfo->curGroupId = pInfo->pRes->info.groupId; // the first data block
|
||||
|
@ -3293,14 +3320,18 @@ static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) {
|
|||
return pResBlock;
|
||||
}
|
||||
|
||||
doHandleRemainBlockFromNewGroup(pInfo, pResultInfo, pTaskInfo);
|
||||
doHandleRemainBlockFromNewGroup(pOperator, pInfo, pResultInfo, pTaskInfo);
|
||||
if (pResBlock->info.rows >= pOperator->resultInfo.threshold || pBlock == NULL) {
|
||||
pResBlock->info.groupId = pInfo->curGroupId;
|
||||
return pResBlock;
|
||||
}
|
||||
} else if (pInfo->existNewGroupBlock) { // try next group
|
||||
assert(pBlock != NULL);
|
||||
doHandleRemainBlockForNewGroupImpl(pInfo, pResultInfo, pTaskInfo);
|
||||
|
||||
blockDataCleanup(pResBlock);
|
||||
blockDataCleanup(pInfo->pRes);
|
||||
|
||||
doHandleRemainBlockForNewGroupImpl(pOperator, pInfo, pResultInfo, pTaskInfo);
|
||||
if (pResBlock->info.rows > pResultInfo->threshold) {
|
||||
pResBlock->info.groupId = pInfo->curGroupId;
|
||||
return pResBlock;
|
||||
|
|
|
@ -34,28 +34,9 @@
|
|||
((_v1) + ((_v2) - (_v1)) * (((double)(_k)) - ((double)(_k1))) / (((double)(_k2)) - ((double)(_k1))))
|
||||
|
||||
#define GET_DEST_SLOT_ID(_p) ((_p)->pExpr->base.resSchema.slotId)
|
||||
#define GET_SRC_SLOT_ID(_p) ((_p)->pExpr->base.pParam[0].pCol->slotId)
|
||||
|
||||
static void doSetVal(SColumnInfoData* pDstColInfoData, int32_t rowIndex, const SGroupKeys* pKey);
|
||||
|
||||
static void setTagsValue(SFillInfo* pFillInfo, void** data, int32_t genRows) {
|
||||
#if 0
|
||||
for (int32_t j = 0; j < pFillInfo->numOfCols; ++j) {
|
||||
SFillColInfo* pCol = &pFillInfo->pFillCol[j];
|
||||
if (TSDB_COL_IS_NORMAL_COL(pCol->flag) || TSDB_COL_IS_UD_COL(pCol->flag)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
SResSchema* pSchema = &pCol->pExpr->base.resSchema;
|
||||
char* val1 = elePtrAt(data[j], pSchema->bytes, genRows);
|
||||
|
||||
assert(pCol->tagIndex >= 0 && pCol->tagIndex < pFillInfo->numOfTags);
|
||||
SFillTagColInfo* pTag = &pFillInfo->pTags[pCol->tagIndex];
|
||||
assignVal(val1, pTag->tagVal, pSchema->bytes, pSchema->type);
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
||||
static void setNullRow(SSDataBlock* pBlock, SFillInfo* pFillInfo, int32_t rowIndex) {
|
||||
for(int32_t i = 0; i < pFillInfo->numOfCols; ++i) {
|
||||
SFillColInfo* pCol = &pFillInfo->pFillCol[i];
|
||||
|
|
Loading…
Reference in New Issue