[td-3047] refactor.

This commit is contained in:
Haojun Liao 2021-03-03 14:39:16 +08:00
parent 77db8bf108
commit 75a45e3f5a
2 changed files with 66 additions and 53 deletions

View File

@ -4086,22 +4086,35 @@ static void interp_function_impl(SQLFunctionCtx *pCtx) {
if (pCtx->inputType == TSDB_DATA_TYPE_TIMESTAMP) { if (pCtx->inputType == TSDB_DATA_TYPE_TIMESTAMP) {
*(TSKEY *)pCtx->pOutput = pCtx->startTs; *(TSKEY *)pCtx->pOutput = pCtx->startTs;
} else { } else if (type == TSDB_FILL_NULL) {
if (type == TSDB_FILL_NULL) {
setNull(pCtx->pOutput, pCtx->outputType, pCtx->outputBytes); setNull(pCtx->pOutput, pCtx->outputType, pCtx->outputBytes);
} else if (type == TSDB_FILL_SET_VALUE) { } else if (type == TSDB_FILL_SET_VALUE) {
tVariantDump(&pCtx->param[1], pCtx->pOutput, pCtx->inputType, true); tVariantDump(&pCtx->param[1], pCtx->pOutput, pCtx->inputType, true);
} else if (type == TSDB_FILL_PREV) { } else {
if (pCtx->start.key != INT64_MIN && pCtx->start.key < pCtx->startTs && pCtx->end.key > pCtx->startTs) {
// the value of prev/next/linear interpolation is placed in pCtx->start
if (IS_NUMERIC_TYPE(pCtx->inputType) || pCtx->inputType == TSDB_DATA_TYPE_BOOL) {
SET_TYPED_DATA(pCtx->pOutput, pCtx->inputType, pCtx->start.val);
} else {
assignVal(pCtx->pOutput, pCtx->start.ptr, pCtx->outputBytes, pCtx->inputType);
}
} else {
// check the timestamp in input buffer
TSKEY skey = GET_TS_DATA(pCtx, 0);
TSKEY ekey = GET_TS_DATA(pCtx, 1);
assert(pCtx->start.key == INT64_MIN && skey < pCtx->startTs && ekey > pCtx->startTs);
if (type == TSDB_FILL_PREV) {
assignVal(pCtx->pOutput, pCtx->pInput, pCtx->outputBytes, pCtx->inputType); assignVal(pCtx->pOutput, pCtx->pInput, pCtx->outputBytes, pCtx->inputType);
} else if (type == TSDB_FILL_NEXT) { } else if (type == TSDB_FILL_NEXT) {
char* d = GET_INPUT_DATA(pCtx, 1); char* val = pCtx->pInput + pCtx->inputBytes;
assignVal(pCtx->pOutput, d, pCtx->outputBytes, pCtx->inputType); assignVal(pCtx->pOutput, val, pCtx->outputBytes, pCtx->inputType);
} else if (type == TSDB_FILL_LINEAR) { } else if (type == TSDB_FILL_LINEAR) {
char *start = GET_INPUT_DATA(pCtx, 0); char *start = GET_INPUT_DATA(pCtx, 0);
char *end = GET_INPUT_DATA(pCtx, 1); char *end = GET_INPUT_DATA(pCtx, 1);
TSKEY skey = GET_TS_DATA(pCtx, 0);
TSKEY ekey = GET_TS_DATA(pCtx, 1);
SPoint point1 = {.key = skey, .val = start}; SPoint point1 = {.key = skey, .val = start};
SPoint point2 = {.key = ekey, .val = end}; SPoint point2 = {.key = ekey, .val = end};
@ -4119,6 +4132,7 @@ static void interp_function_impl(SQLFunctionCtx *pCtx) {
} }
} }
} }
}
SET_VAL(pCtx, 1, 1); SET_VAL(pCtx, 1, 1);
} }

View File

@ -1126,9 +1126,8 @@ static void arithmeticApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionC
} }
} }
void doRowwiseTimeWindowInterpolation(SOperatorInfo* pOperator, SOptrBasicInfo* pInfo, void doTimeWindowInterpolation(SOperatorInfo* pOperator, SOptrBasicInfo* pInfo, SArray* pDataBlock, TSKEY prevTs,
SArray *pDataBlock, TSKEY prevTs, int32_t prevRowIndex, TSKEY curTs, int32_t curRowIndex, TSKEY windowKey, int32_t prevRowIndex, TSKEY curTs, int32_t curRowIndex, TSKEY windowKey, int32_t type) {
int32_t type) {
SQueryRuntimeEnv *pRuntimeEnv = pOperator->pRuntimeEnv; SQueryRuntimeEnv *pRuntimeEnv = pOperator->pRuntimeEnv;
SExprInfo* pExpr = pOperator->pExpr; SExprInfo* pExpr = pOperator->pExpr;
@ -1156,11 +1155,19 @@ void doRowwiseTimeWindowInterpolation(SOperatorInfo* pOperator, SOptrBasicInfo*
GET_TYPED_DATA(v2, double, pColInfo->info.type, (char *)pColInfo->pData + curRowIndex * pColInfo->info.bytes); GET_TYPED_DATA(v2, double, pColInfo->info.type, (char *)pColInfo->pData + curRowIndex * pColInfo->info.bytes);
if (functionId == TSDB_FUNC_INTERP) {
if (type == RESULT_ROW_START_INTERP) {
pCtx[k].start.key = prevTs;
pCtx[k].start.val = v1;
pCtx[k].end.key = curTs;
pCtx[k].end.val = v2;
}
} else if (functionId == TSDB_FUNC_TWA) {
SPoint point1 = (SPoint){.key = prevTs, .val = &v1}; SPoint point1 = (SPoint){.key = prevTs, .val = &v1};
SPoint point2 = (SPoint){.key = curTs, .val = &v2}; SPoint point2 = (SPoint){.key = curTs, .val = &v2};
SPoint point = (SPoint){.key = windowKey, .val = &v }; SPoint point = (SPoint){.key = windowKey, .val = &v };
if (functionId == TSDB_FUNC_TWA) {
taosGetLinearInterpolationVal(&point, TSDB_DATA_TYPE_DOUBLE, &point1, &point2, TSDB_DATA_TYPE_DOUBLE); taosGetLinearInterpolationVal(&point, TSDB_DATA_TYPE_DOUBLE, &point1, &point2, TSDB_DATA_TYPE_DOUBLE);
if (type == RESULT_ROW_START_INTERP) { if (type == RESULT_ROW_START_INTERP) {
@ -1170,49 +1177,43 @@ void doRowwiseTimeWindowInterpolation(SOperatorInfo* pOperator, SOptrBasicInfo*
pCtx[k].end.key = point.key; pCtx[k].end.key = point.key;
pCtx[k].end.val = v; pCtx[k].end.val = v;
} }
} else {
if (type == RESULT_ROW_START_INTERP) {
pCtx[k].start.key = prevTs;
pCtx[k].start.val = v1;
pCtx[k].end.key = curTs;
pCtx[k].end.val = v2;
}
} }
} }
} }
static bool setTimeWindowInterpolationStartTs(SOperatorInfo* pOperatorInfo, SQLFunctionCtx* pCtx, static bool setTimeWindowInterpolationStartTs(SOperatorInfo* pOperatorInfo, SQLFunctionCtx* pCtx, int32_t pos,
int32_t pos, int32_t numOfRows, SArray* pDataBlock, TSKEY* tsCols, STimeWindow* win, int16_t type) { int32_t numOfRows, SArray* pDataBlock, const TSKEY* tsCols, STimeWindow* win) {
SQueryRuntimeEnv* pRuntimeEnv = pOperatorInfo->pRuntimeEnv; SQueryRuntimeEnv* pRuntimeEnv = pOperatorInfo->pRuntimeEnv;
SQuery* pQuery = pRuntimeEnv->pQuery; SQuery* pQuery = pRuntimeEnv->pQuery;
bool ascQuery = QUERY_IS_ASC_QUERY(pQuery);
TSKEY curTs = tsCols[pos]; TSKEY curTs = tsCols[pos];
TSKEY lastTs = *(TSKEY *) pRuntimeEnv->prevRow[0]; TSKEY lastTs = *(TSKEY *) pRuntimeEnv->prevRow[0];
// lastTs == INT64_MIN and pos == 0 means this is the first time window, interpolation is not needed. // lastTs == INT64_MIN and pos == 0 means this is the first time window, interpolation is not needed.
// start exactly from this point, no need to do interpolation // start exactly from this point, no need to do interpolation
TSKEY key = QUERY_IS_ASC_QUERY(pQuery)? win->skey:win->ekey; TSKEY key = ascQuery? win->skey:win->ekey;
if (key == curTs) { if (key == curTs) {
setNotInterpoWindowKey(pCtx, pOperatorInfo->numOfOutput, RESULT_ROW_START_INTERP); setNotInterpoWindowKey(pCtx, pOperatorInfo->numOfOutput, RESULT_ROW_START_INTERP);
return true; return true;
} }
if (lastTs == INT64_MIN && ((pos == 0 && QUERY_IS_ASC_QUERY(pQuery)) || (pos == (numOfRows - 1) && !QUERY_IS_ASC_QUERY(pQuery)))) { if (lastTs == INT64_MIN && ((pos == 0 && ascQuery) || (pos == (numOfRows - 1) && !ascQuery))) {
setNotInterpoWindowKey(pCtx, pOperatorInfo->numOfOutput, RESULT_ROW_START_INTERP); setNotInterpoWindowKey(pCtx, pOperatorInfo->numOfOutput, RESULT_ROW_START_INTERP);
return true; return true;
} }
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
TSKEY prevTs = ((pos == 0 && QUERY_IS_ASC_QUERY(pQuery)) || (pos == (numOfRows - 1) && !QUERY_IS_ASC_QUERY(pQuery)))? TSKEY prevTs = ((pos == 0 && ascQuery) || (pos == (numOfRows - 1) && !ascQuery))? lastTs:tsCols[pos - step];
lastTs:tsCols[pos - step];
doRowwiseTimeWindowInterpolation(pOperatorInfo, pOperatorInfo->info, pDataBlock, prevTs, pos - step, curTs, pos, key, RESULT_ROW_START_INTERP); doTimeWindowInterpolation(pOperatorInfo, pOperatorInfo->info, pDataBlock, prevTs, pos - step, curTs, pos,
key, RESULT_ROW_START_INTERP);
return true; return true;
} }
static bool setTimeWindowInterpolationEndTs(SOperatorInfo* pOperatorInfo, SQLFunctionCtx* pCtx, static bool setTimeWindowInterpolationEndTs(SOperatorInfo* pOperatorInfo, SQLFunctionCtx* pCtx,
int32_t endRowIndex, SArray* pDataBlock, TSKEY* tsCols, TSKEY blockEkey, STimeWindow* win) { int32_t endRowIndex, SArray* pDataBlock, const TSKEY* tsCols, TSKEY blockEkey, STimeWindow* win) {
SQueryRuntimeEnv *pRuntimeEnv = pOperatorInfo->pRuntimeEnv; SQueryRuntimeEnv *pRuntimeEnv = pOperatorInfo->pRuntimeEnv;
SQuery* pQuery = pRuntimeEnv->pQuery; SQuery* pQuery = pRuntimeEnv->pQuery;
int32_t numOfOutput = pOperatorInfo->numOfOutput; int32_t numOfOutput = pOperatorInfo->numOfOutput;
@ -1238,7 +1239,8 @@ static bool setTimeWindowInterpolationEndTs(SOperatorInfo* pOperatorInfo, SQLFun
assert(nextRowIndex >= 0); assert(nextRowIndex >= 0);
TSKEY nextKey = tsCols[nextRowIndex]; TSKEY nextKey = tsCols[nextRowIndex];
doRowwiseTimeWindowInterpolation(pOperatorInfo, pOperatorInfo->info, pDataBlock, actualEndKey, endRowIndex, nextKey, nextRowIndex, key, RESULT_ROW_END_INTERP); doTimeWindowInterpolation(pOperatorInfo, pOperatorInfo->info, pDataBlock, actualEndKey, endRowIndex, nextKey,
nextRowIndex, key, RESULT_ROW_END_INTERP);
return true; return true;
} }
@ -1251,9 +1253,6 @@ static void doWindowBorderInterpolation(SOperatorInfo* pOperatorInfo, SSDataBloc
} }
assert(pBlock != NULL); assert(pBlock != NULL);
int32_t fillType = pQuery->fillType;
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, 0); SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, 0);
@ -1263,7 +1262,7 @@ static void doWindowBorderInterpolation(SOperatorInfo* pOperatorInfo, SSDataBloc
if (!done) { // it is not interpolated, now start to generated the interpolated value if (!done) { // it is not interpolated, now start to generated the interpolated value
int32_t startRowIndex = startPos; int32_t startRowIndex = startPos;
bool interp = setTimeWindowInterpolationStartTs(pOperatorInfo, pCtx, startRowIndex, pBlock->info.rows, pBlock->pDataBlock, bool interp = setTimeWindowInterpolationStartTs(pOperatorInfo, pCtx, startRowIndex, pBlock->info.rows, pBlock->pDataBlock,
tsCols, win, fillType); tsCols, win);
if (interp) { if (interp) {
setResultRowInterpo(pResult, RESULT_ROW_START_INTERP); setResultRowInterpo(pResult, RESULT_ROW_START_INTERP);
} }
@ -1342,7 +1341,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
numOfOutput, pInfo->rowCellInfoOffset); numOfOutput, pInfo->rowCellInfoOffset);
assert(ret == TSDB_CODE_SUCCESS && !resultRowInterpolated(pResult, RESULT_ROW_END_INTERP)); assert(ret == TSDB_CODE_SUCCESS && !resultRowInterpolated(pResult, RESULT_ROW_END_INTERP));
doRowwiseTimeWindowInterpolation(pOperatorInfo, pInfo, pSDataBlock->pDataBlock, *(TSKEY *)pRuntimeEnv->prevRow[0], doTimeWindowInterpolation(pOperatorInfo, pInfo, pSDataBlock->pDataBlock, *(TSKEY *)pRuntimeEnv->prevRow[0],
-1, tsCols[startPos], startPos, w.ekey, RESULT_ROW_END_INTERP); -1, tsCols[startPos], startPos, w.ekey, RESULT_ROW_END_INTERP);
setResultRowInterpo(pResult, RESULT_ROW_END_INTERP); setResultRowInterpo(pResult, RESULT_ROW_END_INTERP);