[TD-225]refactor.
This commit is contained in:
parent
af428c4d24
commit
f1657736e1
|
@ -3704,6 +3704,67 @@ static void restoreTimeWindow(STableGroupInfo* pTableGroupInfo, STsdbQueryCond*
|
|||
pKeyInfo->lastKey = pCond->twindow.skey;
|
||||
}
|
||||
|
||||
static void handleInterpolationQuery(SQInfo* pQInfo) {
|
||||
SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv;
|
||||
|
||||
SQuery *pQuery = pRuntimeEnv->pQuery;
|
||||
if (pQuery->numOfCheckedBlocks > 0 || !isPointInterpoQuery(pQuery)) {
|
||||
return;
|
||||
}
|
||||
|
||||
SArray *prev = tsdbGetExternalRow(pRuntimeEnv->pQueryHandle, &pQInfo->memRef, TSDB_PREV_ROW);
|
||||
SArray *next = tsdbGetExternalRow(pRuntimeEnv->pQueryHandle, &pQInfo->memRef, TSDB_NEXT_ROW);
|
||||
if (prev == NULL || next == NULL) {
|
||||
return;
|
||||
}
|
||||
|
||||
// setup the pCtx->start/end info and calculate the interpolation value
|
||||
SColumnInfoData *startTs = taosArrayGet(prev, 0);
|
||||
SColumnInfoData *endTs = taosArrayGet(next, 0);
|
||||
|
||||
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
|
||||
SQLFunctionCtx *pCtx = &pRuntimeEnv->pCtx[i];
|
||||
|
||||
int32_t functionId = pQuery->pExpr1[i].base.functionId;
|
||||
SColIndex *pColIndex = &pQuery->pExpr1[i].base.colInfo;
|
||||
|
||||
if (!TSDB_COL_IS_NORMAL_COL(pColIndex->flag)) {
|
||||
aAggs[functionId].xFunction(pCtx);
|
||||
continue;
|
||||
}
|
||||
|
||||
SColumnInfoData *p = taosArrayGet(prev, pColIndex->colIndex);
|
||||
SColumnInfoData *n = taosArrayGet(next, pColIndex->colIndex);
|
||||
|
||||
assert(p->info.colId == pColIndex->colId);
|
||||
|
||||
pCtx->start.key = *(TSKEY *)startTs->pData;
|
||||
pCtx->end.key = *(TSKEY *)endTs->pData;
|
||||
|
||||
if (p->info.type != TSDB_DATA_TYPE_BINARY && p->info.type != TSDB_DATA_TYPE_NCHAR) {
|
||||
GET_TYPED_DATA(pCtx->start.val, double, p->info.type, p->pData);
|
||||
GET_TYPED_DATA(pCtx->end.val, double, n->info.type, n->pData);
|
||||
} else { // string pointer
|
||||
pCtx->start.ptr = p->pData;
|
||||
pCtx->end.ptr = n->pData;
|
||||
}
|
||||
|
||||
pCtx->param[2].i64 = (int8_t)pQuery->fillType;
|
||||
pCtx->nStartQueryTimestamp = pQuery->window.skey;
|
||||
if (pQuery->fillVal != NULL) {
|
||||
if (isNull((const char *)&pQuery->fillVal[i], pCtx->inputType)) {
|
||||
pCtx->param[1].nType = TSDB_DATA_TYPE_NULL;
|
||||
} else { // todo refactor, tVariantCreateFromBinary should handle the NULL value
|
||||
if (pCtx->inputType != TSDB_DATA_TYPE_BINARY && pCtx->inputType != TSDB_DATA_TYPE_NCHAR) {
|
||||
tVariantCreateFromBinary(&pCtx->param[1], (char *)&pQuery->fillVal[i], pCtx->inputBytes, pCtx->inputType);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
aAggs[functionId].xFunction(pCtx);
|
||||
}
|
||||
}
|
||||
|
||||
void scanOneTableDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, TSKEY start) {
|
||||
SQInfo *pQInfo = (SQInfo *) GET_QINFO_ADDR(pRuntimeEnv);
|
||||
SQuery *pQuery = pRuntimeEnv->pQuery;
|
||||
|
@ -3769,60 +3830,7 @@ void scanOneTableDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, TSKEY start) {
|
|||
clearEnvAfterReverseScan(pRuntimeEnv, &qstatus);
|
||||
}
|
||||
|
||||
if (isPointInterpoQuery(pQuery) && pQuery->numOfCheckedBlocks == 0) {
|
||||
SArray *prev = tsdbGetExternalRow(pRuntimeEnv->pQueryHandle, &pQInfo->memRef, TSDB_PREV_ROW);
|
||||
SArray *next = tsdbGetExternalRow(pRuntimeEnv->pQueryHandle, &pQInfo->memRef, TSDB_NEXT_ROW);
|
||||
|
||||
if (prev == NULL || next == NULL) {
|
||||
return;
|
||||
}
|
||||
|
||||
// setup the pCtx->start/end info and calculate the interpolation value
|
||||
SColumnInfoData *startTs = taosArrayGet(prev, 0);
|
||||
SColumnInfoData *endTs = taosArrayGet(next, 0);
|
||||
|
||||
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
|
||||
SQLFunctionCtx* pCtx = &pRuntimeEnv->pCtx[i];
|
||||
|
||||
int32_t functionId = pQuery->pExpr1[i].base.functionId;
|
||||
SColIndex *pColIndex = &pQuery->pExpr1[i].base.colInfo;
|
||||
|
||||
if (!TSDB_COL_IS_NORMAL_COL(pColIndex->flag)) {
|
||||
aAggs[functionId].xFunction(pCtx);
|
||||
continue;
|
||||
}
|
||||
|
||||
SColumnInfoData *p = taosArrayGet(prev, pColIndex->colIndex);
|
||||
SColumnInfoData *n = taosArrayGet(next, pColIndex->colIndex);
|
||||
|
||||
assert(p->info.colId == pColIndex->colId);
|
||||
|
||||
pCtx->start.key = *(TSKEY *)startTs->pData;
|
||||
pCtx->end.key = *(TSKEY *)endTs->pData;
|
||||
|
||||
if (p->info.type != TSDB_DATA_TYPE_BINARY && p->info.type != TSDB_DATA_TYPE_NCHAR) {
|
||||
GET_TYPED_DATA(pCtx->start.val, double, p->info.type, p->pData);
|
||||
GET_TYPED_DATA(pCtx->end.val, double, n->info.type, n->pData);
|
||||
} else { // string pointer
|
||||
pCtx->start.ptr = p->pData;
|
||||
pCtx->end.ptr = n->pData;
|
||||
}
|
||||
|
||||
pCtx->param[2].i64 = (int8_t)pQuery->fillType;
|
||||
pCtx->nStartQueryTimestamp = pQuery->window.skey;
|
||||
if (pQuery->fillVal != NULL) {
|
||||
if (isNull((const char*) &pQuery->fillVal[i], pCtx->inputType)) {
|
||||
pCtx->param[1].nType = TSDB_DATA_TYPE_NULL;
|
||||
} else { // todo refactor, tVariantCreateFromBinary should handle the NULL value
|
||||
if (pCtx->inputType != TSDB_DATA_TYPE_BINARY && pCtx->inputType != TSDB_DATA_TYPE_NCHAR) {
|
||||
tVariantCreateFromBinary(&pCtx->param[1], (char*) &pQuery->fillVal[i], pCtx->inputBytes, pCtx->inputType);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
aAggs[functionId].xFunction(pCtx);
|
||||
}
|
||||
}
|
||||
handleInterpolationQuery(pQInfo);
|
||||
}
|
||||
|
||||
void finalizeQueryResult(SQueryRuntimeEnv *pRuntimeEnv) {
|
||||
|
|
|
@ -1929,8 +1929,8 @@ static void changeQueryHandleForInterpQuery(TsdbQueryHandleT pHandle) {
|
|||
STableCheckInfo* pCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, i);
|
||||
|
||||
// the first qualified table for interpolation query
|
||||
if (pQueryHandle->window.skey <= pCheckInfo->pTableObj->lastKey &&
|
||||
pCheckInfo->pTableObj->lastKey != TSKEY_INITIAL_VAL) {
|
||||
if ((pQueryHandle->window.skey <= pCheckInfo->pTableObj->lastKey) &&
|
||||
(pCheckInfo->pTableObj->lastKey != TSKEY_INITIAL_VAL)) {
|
||||
break;
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue