fix: modify pull linear
This commit is contained in:
parent
a812847317
commit
a614ef777f
|
@ -92,8 +92,8 @@ typedef struct SResultRowData {
|
|||
|
||||
typedef struct SStreamFillLinearInfo {
|
||||
TSKEY nextEnd;
|
||||
SArray* pDeltaVal; // double. value for Fill(linear).
|
||||
SArray* pNextDeltaVal; // double. value for Fill(linear).
|
||||
SArray* pEndPoints;
|
||||
SArray* pNextEndPoints;
|
||||
int64_t winIndex;
|
||||
bool hasNext;
|
||||
} SStreamFillLinearInfo;
|
||||
|
|
|
@ -447,9 +447,14 @@ void* destroyStreamFillSupporter(SStreamFillSupporter* pFillSup) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
void destroySPoint(void* ptr) {
|
||||
SPoint* point = (SPoint*) ptr;
|
||||
taosMemoryFreeClear(point->val);
|
||||
}
|
||||
|
||||
void* destroyStreamFillLinearInfo(SStreamFillLinearInfo* pFillLinear) {
|
||||
taosArrayDestroy(pFillLinear->pDeltaVal);
|
||||
taosArrayDestroy(pFillLinear->pNextDeltaVal);
|
||||
taosArrayDestroyEx(pFillLinear->pEndPoints, destroySPoint);
|
||||
taosArrayDestroyEx(pFillLinear->pNextEndPoints, destroySPoint);
|
||||
taosMemoryFree(pFillLinear);
|
||||
return NULL;
|
||||
}
|
||||
|
@ -611,19 +616,15 @@ static void calcDeltaData(SSDataBlock* pBlock, int32_t rowId, SResultRowData* pR
|
|||
}
|
||||
}
|
||||
|
||||
static void calcRowDeltaData(SResultRowData* pStartRow, SResultRowData* pEndRow, SArray* pDelta, SFillColInfo* pFillCol,
|
||||
int32_t numOfCol, int32_t winCount) {
|
||||
static void calcRowDeltaData(SResultRowData* pEndRow, SArray* pEndPoins, SFillColInfo* pFillCol,
|
||||
int32_t numOfCol) {
|
||||
for (int32_t i = 0; i < numOfCol; i++) {
|
||||
if (!pFillCol[i].notFillCol) {
|
||||
int32_t slotId = GET_DEST_SLOT_ID(pFillCol + i);
|
||||
SResultCellData* pSCell = getResultCell(pStartRow, slotId);
|
||||
double start = 0.0;
|
||||
GET_TYPED_DATA(start, double, pSCell->type, pSCell->pData);
|
||||
SResultCellData* pECell = getResultCell(pEndRow, slotId);
|
||||
double end = 0.0;
|
||||
GET_TYPED_DATA(end, double, pECell->type, pECell->pData);
|
||||
double delta = (end - start) / winCount;
|
||||
taosArraySet(pDelta, slotId, &delta);
|
||||
SPoint* pPoint = taosArrayGet(pEndPoins, slotId);
|
||||
pPoint->key = pEndRow->key;
|
||||
memcpy(pPoint->val, pECell->pData, pECell->bytes);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -674,10 +675,8 @@ void setDeleteFillValueInfo(TSKEY start, TSKEY end, SStreamFillSupporter* pFillS
|
|||
setFillKeyInfo(pFillSup->prev.key, pFillSup->next.key, &pFillSup->interval, pFillInfo);
|
||||
pFillInfo->pLinearInfo->hasNext = false;
|
||||
pFillInfo->pLinearInfo->nextEnd = INT64_MIN;
|
||||
int32_t numOfWins = taosTimeCountInterval(pFillSup->prev.key, pFillSup->next.key, pFillSup->interval.sliding,
|
||||
pFillSup->interval.slidingUnit, pFillSup->interval.precision);
|
||||
calcRowDeltaData(&pFillSup->prev, &pFillSup->next, pFillInfo->pLinearInfo->pDeltaVal, pFillSup->pAllColInfo,
|
||||
pFillSup->numOfAllCols, numOfWins);
|
||||
calcRowDeltaData(&pFillSup->next, pFillInfo->pLinearInfo->pEndPoints, pFillSup->pAllColInfo,
|
||||
pFillSup->numOfAllCols);
|
||||
pFillInfo->pResRow = &pFillSup->prev;
|
||||
pFillInfo->pLinearInfo->winIndex = 0;
|
||||
} break;
|
||||
|
@ -780,25 +779,19 @@ void setFillValueInfo(SSDataBlock* pBlock, TSKEY ts, int32_t rowId, SStreamFillS
|
|||
setFillKeyInfo(prevWKey, ts, &pFillSup->interval, pFillInfo);
|
||||
pFillInfo->pos = FILL_POS_MID;
|
||||
pFillInfo->pLinearInfo->nextEnd = nextWKey;
|
||||
int32_t numOfWins = taosTimeCountInterval(prevWKey, ts, pFillSup->interval.sliding,
|
||||
pFillSup->interval.slidingUnit, pFillSup->interval.precision);
|
||||
calcRowDeltaData(&pFillSup->prev, &pFillSup->cur, pFillInfo->pLinearInfo->pDeltaVal, pFillSup->pAllColInfo,
|
||||
pFillSup->numOfAllCols, numOfWins);
|
||||
calcRowDeltaData(&pFillSup->cur, pFillInfo->pLinearInfo->pEndPoints, pFillSup->pAllColInfo,
|
||||
pFillSup->numOfAllCols);
|
||||
pFillInfo->pResRow = &pFillSup->prev;
|
||||
|
||||
numOfWins = taosTimeCountInterval(ts, nextWKey, pFillSup->interval.sliding, pFillSup->interval.slidingUnit,
|
||||
pFillSup->interval.precision);
|
||||
calcRowDeltaData(&pFillSup->cur, &pFillSup->next, pFillInfo->pLinearInfo->pNextDeltaVal, pFillSup->pAllColInfo,
|
||||
pFillSup->numOfAllCols, numOfWins);
|
||||
calcRowDeltaData(&pFillSup->next, pFillInfo->pLinearInfo->pNextEndPoints, pFillSup->pAllColInfo,
|
||||
pFillSup->numOfAllCols);
|
||||
pFillInfo->pLinearInfo->hasNext = true;
|
||||
} else if (hasPrevWindow(pFillSup)) {
|
||||
setFillKeyInfo(prevWKey, ts, &pFillSup->interval, pFillInfo);
|
||||
pFillInfo->pos = FILL_POS_END;
|
||||
pFillInfo->pLinearInfo->nextEnd = INT64_MIN;
|
||||
int32_t numOfWins = taosTimeCountInterval(prevWKey, ts, pFillSup->interval.sliding,
|
||||
pFillSup->interval.slidingUnit, pFillSup->interval.precision);
|
||||
calcRowDeltaData(&pFillSup->prev, &pFillSup->cur, pFillInfo->pLinearInfo->pDeltaVal, pFillSup->pAllColInfo,
|
||||
pFillSup->numOfAllCols, numOfWins);
|
||||
calcRowDeltaData(&pFillSup->cur, pFillInfo->pLinearInfo->pEndPoints, pFillSup->pAllColInfo,
|
||||
pFillSup->numOfAllCols);
|
||||
pFillInfo->pResRow = &pFillSup->prev;
|
||||
pFillInfo->pLinearInfo->hasNext = false;
|
||||
} else {
|
||||
|
@ -806,10 +799,8 @@ void setFillValueInfo(SSDataBlock* pBlock, TSKEY ts, int32_t rowId, SStreamFillS
|
|||
setFillKeyInfo(ts, nextWKey, &pFillSup->interval, pFillInfo);
|
||||
pFillInfo->pos = FILL_POS_START;
|
||||
pFillInfo->pLinearInfo->nextEnd = INT64_MIN;
|
||||
int32_t numOfWins = taosTimeCountInterval(ts, nextWKey, pFillSup->interval.sliding,
|
||||
pFillSup->interval.slidingUnit, pFillSup->interval.precision);
|
||||
calcRowDeltaData(&pFillSup->cur, &pFillSup->next, pFillInfo->pLinearInfo->pDeltaVal, pFillSup->pAllColInfo,
|
||||
pFillSup->numOfAllCols, numOfWins);
|
||||
calcRowDeltaData(&pFillSup->next, pFillInfo->pLinearInfo->pEndPoints, pFillSup->pAllColInfo,
|
||||
pFillSup->numOfAllCols);
|
||||
pFillInfo->pResRow = &pFillSup->cur;
|
||||
pFillInfo->pLinearInfo->hasNext = false;
|
||||
}
|
||||
|
@ -906,13 +897,18 @@ static void doStreamFillLinear(SStreamFillSupporter* pFillSup, SStreamFillInfo*
|
|||
colDataSetNULL(pColData, index);
|
||||
continue;
|
||||
}
|
||||
double* pDelta = taosArrayGet(pFillInfo->pLinearInfo->pDeltaVal, slotId);
|
||||
SPoint* pEnd = taosArrayGet(pFillInfo->pLinearInfo->pEndPoints, slotId);
|
||||
double vCell = 0;
|
||||
GET_TYPED_DATA(vCell, double, pCell->type, pCell->pData);
|
||||
vCell += (*pDelta) * pFillInfo->pLinearInfo->winIndex;
|
||||
int64_t result = 0;
|
||||
SET_TYPED_DATA(&result, pCell->type, vCell);
|
||||
colDataSetVal(pColData, index, (const char*)&result, false);
|
||||
SPoint start = {0};
|
||||
start.key = pFillInfo->pResRow->key;
|
||||
start.val = pCell->pData;
|
||||
|
||||
SPoint cur = {0};
|
||||
cur.key = pFillInfo->current;
|
||||
cur.val = taosMemoryCalloc(1, pCell->bytes);
|
||||
taosGetLinearInterpolationVal(&cur, pCell->type, &start, pEnd, pCell->type);
|
||||
colDataSetVal(pColData, index, (const char*)cur.val, false);
|
||||
destroySPoint(&cur);
|
||||
}
|
||||
}
|
||||
pFillInfo->current = taosTimeAdd(pFillInfo->current, pFillSup->interval.sliding, pFillSup->interval.slidingUnit,
|
||||
|
@ -953,8 +949,7 @@ static void doStreamFillRange(SStreamFillInfo* pFillInfo, SStreamFillSupporter*
|
|||
if (pFillInfo->current > pFillInfo->end && pFillInfo->pLinearInfo->hasNext) {
|
||||
pFillInfo->pLinearInfo->hasNext = false;
|
||||
pFillInfo->pLinearInfo->winIndex = 0;
|
||||
taosArrayClear(pFillInfo->pLinearInfo->pDeltaVal);
|
||||
taosArrayAddAll(pFillInfo->pLinearInfo->pDeltaVal, pFillInfo->pLinearInfo->pNextDeltaVal);
|
||||
taosArraySwap(pFillInfo->pLinearInfo->pEndPoints, pFillInfo->pLinearInfo->pNextEndPoints);
|
||||
pFillInfo->pResRow = &pFillSup->cur;
|
||||
setFillKeyInfo(pFillSup->cur.key, pFillInfo->pLinearInfo->nextEnd, &pFillSup->interval, pFillInfo);
|
||||
doStreamFillLinear(pFillSup, pFillInfo, pRes);
|
||||
|
@ -1359,15 +1354,19 @@ SStreamFillInfo* initStreamFillInfo(SStreamFillSupporter* pFillSup, SSDataBlock*
|
|||
pFillInfo->pLinearInfo = taosMemoryCalloc(1, sizeof(SStreamFillLinearInfo));
|
||||
pFillInfo->pLinearInfo->hasNext = false;
|
||||
pFillInfo->pLinearInfo->nextEnd = INT64_MIN;
|
||||
pFillInfo->pLinearInfo->pDeltaVal = NULL;
|
||||
pFillInfo->pLinearInfo->pNextDeltaVal = NULL;
|
||||
pFillInfo->pLinearInfo->pEndPoints = NULL;
|
||||
pFillInfo->pLinearInfo->pNextEndPoints = NULL;
|
||||
if (pFillSup->type == TSDB_FILL_LINEAR) {
|
||||
pFillInfo->pLinearInfo->pDeltaVal = taosArrayInit(pFillSup->numOfAllCols, sizeof(double));
|
||||
pFillInfo->pLinearInfo->pNextDeltaVal = taosArrayInit(pFillSup->numOfAllCols, sizeof(double));
|
||||
pFillInfo->pLinearInfo->pEndPoints = taosArrayInit(pFillSup->numOfAllCols, sizeof(SPoint));
|
||||
pFillInfo->pLinearInfo->pNextEndPoints = taosArrayInit(pFillSup->numOfAllCols, sizeof(SPoint));
|
||||
for (int32_t i = 0; i < pFillSup->numOfAllCols; i++) {
|
||||
double value = 0.0;
|
||||
taosArrayPush(pFillInfo->pLinearInfo->pDeltaVal, &value);
|
||||
taosArrayPush(pFillInfo->pLinearInfo->pNextDeltaVal, &value);
|
||||
SColumnInfoData* pColData = taosArrayGet(pRes->pDataBlock, i);
|
||||
SPoint value = {0};
|
||||
value.val = taosMemoryCalloc(1, pColData->info.bytes);
|
||||
taosArrayPush(pFillInfo->pLinearInfo->pEndPoints, &value);
|
||||
|
||||
value.val = taosMemoryCalloc(1, pColData->info.bytes);
|
||||
taosArrayPush(pFillInfo->pLinearInfo->pNextEndPoints, &value);
|
||||
}
|
||||
}
|
||||
pFillInfo->pLinearInfo->winIndex = 0;
|
||||
|
|
Loading…
Reference in New Issue