refactor code
This commit is contained in:
parent
7500f3e27e
commit
b902ab0d61
|
@ -2121,6 +2121,40 @@ static bool genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp
|
||||||
return hasInterp;
|
return hasInterp;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void addCurrentRowToResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp* pExprSup, SSDataBlock* pResBlock,
|
||||||
|
SSDataBlock* pSrcBlock, int32_t index) {
|
||||||
|
blockDataEnsureCapacity(pResBlock, pResBlock->info.rows + 1);
|
||||||
|
for (int32_t j = 0; j < pExprSup->numOfExprs; ++j) {
|
||||||
|
SExprInfo* pExprInfo = &pExprSup->pExprInfo[j];
|
||||||
|
|
||||||
|
int32_t dstSlot = pExprInfo->base.resSchema.slotId;
|
||||||
|
SColumnInfoData* pDst = taosArrayGet(pResBlock->pDataBlock, dstSlot);
|
||||||
|
|
||||||
|
if (IS_TIMESTAMP_TYPE(pExprInfo->base.resSchema.type)) {
|
||||||
|
colDataAppend(pDst, pResBlock->info.rows, (char*)&pSliceInfo->current, false);
|
||||||
|
} else {
|
||||||
|
int32_t srcSlot = pExprInfo->base.pParam[0].pCol->slotId;
|
||||||
|
SColumnInfoData* pSrc = taosArrayGet(pSrcBlock->pDataBlock, srcSlot);
|
||||||
|
|
||||||
|
if (colDataIsNull_s(pSrc, index)) {
|
||||||
|
if (pSliceInfo->fillType != TSDB_FILL_LINEAR) {
|
||||||
|
colDataAppendNULL(pDst, pResBlock->info.rows);
|
||||||
|
continue;
|
||||||
|
} else {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
char* v = colDataGetData(pSrc, index);
|
||||||
|
colDataAppend(pDst, pResBlock->info.rows, v, false);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pResBlock->info.rows += 1;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
static int32_t initPrevRowsKeeper(STimeSliceOperatorInfo* pInfo, SSDataBlock* pBlock) {
|
static int32_t initPrevRowsKeeper(STimeSliceOperatorInfo* pInfo, SSDataBlock* pBlock) {
|
||||||
if (pInfo->pPrevRow != NULL) {
|
if (pInfo->pPrevRow != NULL) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -2263,37 +2297,8 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (ts == pSliceInfo->current) {
|
if (ts == pSliceInfo->current) {
|
||||||
blockDataEnsureCapacity(pResBlock, pResBlock->info.rows + 1);
|
addCurrentRowToResult(pSliceInfo, &pOperator->exprSupp, pResBlock, pBlock, i);
|
||||||
bool isNullRow = false;
|
|
||||||
for (int32_t j = 0; j < pOperator->exprSupp.numOfExprs; ++j) {
|
|
||||||
SExprInfo* pExprInfo = &pOperator->exprSupp.pExprInfo[j];
|
|
||||||
|
|
||||||
int32_t dstSlot = pExprInfo->base.resSchema.slotId;
|
|
||||||
SColumnInfoData* pDst = taosArrayGet(pResBlock->pDataBlock, dstSlot);
|
|
||||||
|
|
||||||
if (IS_TIMESTAMP_TYPE(pExprInfo->base.resSchema.type)) {
|
|
||||||
colDataAppend(pDst, pResBlock->info.rows, (char*)&pSliceInfo->current, false);
|
|
||||||
} else {
|
|
||||||
int32_t srcSlot = pExprInfo->base.pParam[0].pCol->slotId;
|
|
||||||
SColumnInfoData* pSrc = taosArrayGet(pBlock->pDataBlock, srcSlot);
|
|
||||||
|
|
||||||
if (colDataIsNull_s(pSrc, i)) {
|
|
||||||
if (pSliceInfo->fillType != TSDB_FILL_LINEAR) {
|
|
||||||
colDataAppendNULL(pDst, pResBlock->info.rows);
|
|
||||||
continue;
|
|
||||||
} else {
|
|
||||||
isNullRow = true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
char* v = colDataGetData(pSrc, i);
|
|
||||||
colDataAppend(pDst, pResBlock->info.rows, v, false);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!isNullRow) {
|
|
||||||
pResBlock->info.rows += 1;
|
|
||||||
}
|
|
||||||
doKeepPrevRows(pSliceInfo, pBlock, i);
|
doKeepPrevRows(pSliceInfo, pBlock, i);
|
||||||
doKeepLinearInfo(pSliceInfo, pBlock, i);
|
doKeepLinearInfo(pSliceInfo, pBlock, i);
|
||||||
|
|
||||||
|
@ -2348,37 +2353,7 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
|
||||||
|
|
||||||
// add current row if timestamp match
|
// add current row if timestamp match
|
||||||
if (ts == pSliceInfo->current && pSliceInfo->current <= pSliceInfo->win.ekey) {
|
if (ts == pSliceInfo->current && pSliceInfo->current <= pSliceInfo->win.ekey) {
|
||||||
blockDataEnsureCapacity(pResBlock, pResBlock->info.rows + 1);
|
addCurrentRowToResult(pSliceInfo, &pOperator->exprSupp, pResBlock, pBlock, i);
|
||||||
bool isNullRow = false;
|
|
||||||
for (int32_t j = 0; j < pOperator->exprSupp.numOfExprs; ++j) {
|
|
||||||
SExprInfo* pExprInfo = &pOperator->exprSupp.pExprInfo[j];
|
|
||||||
|
|
||||||
int32_t dstSlot = pExprInfo->base.resSchema.slotId;
|
|
||||||
SColumnInfoData* pDst = taosArrayGet(pResBlock->pDataBlock, dstSlot);
|
|
||||||
|
|
||||||
if (IS_TIMESTAMP_TYPE(pExprInfo->base.resSchema.type)) {
|
|
||||||
colDataAppend(pDst, pResBlock->info.rows, (char*)&pSliceInfo->current, false);
|
|
||||||
} else {
|
|
||||||
int32_t srcSlot = pExprInfo->base.pParam[0].pCol->slotId;
|
|
||||||
SColumnInfoData* pSrc = taosArrayGet(pBlock->pDataBlock, srcSlot);
|
|
||||||
|
|
||||||
if (colDataIsNull_s(pSrc, i)) {
|
|
||||||
if (pSliceInfo->fillType != TSDB_FILL_LINEAR) {
|
|
||||||
colDataAppendNULL(pDst, pResBlock->info.rows);
|
|
||||||
continue;
|
|
||||||
} else {
|
|
||||||
isNullRow = true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
char* v = colDataGetData(pSrc, i);
|
|
||||||
colDataAppend(pDst, pResBlock->info.rows, v, false);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!isNullRow) {
|
|
||||||
pResBlock->info.rows += 1;
|
|
||||||
}
|
|
||||||
doKeepPrevRows(pSliceInfo, pBlock, i);
|
doKeepPrevRows(pSliceInfo, pBlock, i);
|
||||||
|
|
||||||
pSliceInfo->current =
|
pSliceInfo->current =
|
||||||
|
|
Loading…
Reference in New Issue