enh(query): time window interpolation for interval query.

This commit is contained in:
Haojun Liao 2022-05-02 23:52:32 +08:00
parent f096f27ce8
commit ffbf455fee
7 changed files with 295 additions and 231 deletions

View File

@ -20,6 +20,12 @@
extern "C" { extern "C" {
#endif #endif
typedef struct {
char* pData;
bool isNull;
int16_t type;
int32_t bytes;
} SGroupKeys, SStateKeys;
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -40,6 +40,7 @@ extern "C" {
#include "tpagedbuf.h" #include "tpagedbuf.h"
#include "vnode.h" #include "vnode.h"
#include "executorInt.h"
typedef int32_t (*__block_search_fn_t)(char* data, int32_t num, int64_t key, int32_t order); typedef int32_t (*__block_search_fn_t)(char* data, int32_t num, int64_t key, int32_t order);
@ -478,16 +479,8 @@ typedef struct SFillOperatorInfo {
void** p; void** p;
SSDataBlock* existNewGroupBlock; SSDataBlock* existNewGroupBlock;
bool multigroupResult; bool multigroupResult;
SInterval intervalInfo;
} SFillOperatorInfo; } SFillOperatorInfo;
typedef struct {
char* pData;
bool isNull;
int16_t type;
int32_t bytes;
} SGroupKeys, SStateKeys;
typedef struct SGroupbyOperatorInfo { typedef struct SGroupbyOperatorInfo {
SOptrBasicInfo binfo; SOptrBasicInfo binfo;
SArray* pGroupCols; // group by columns, SArray<SColumn> SArray* pGroupCols; // group by columns, SArray<SColumn>
@ -676,7 +669,7 @@ SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, SSDataBlock*
SArray* pTableIdList, SExecTaskInfo* pTaskInfo, SNode* pConditions); SArray* pTableIdList, SExecTaskInfo* pTaskInfo, SNode* pConditions);
SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols,
SInterval* pInterval, SSDataBlock* pResBlock, int32_t fillType, char* fillVal, SInterval* pInterval, STimeWindow* pWindow, SSDataBlock* pResBlock, int32_t fillType, SNodeListNode* fillVal,
bool multigroupResult, SExecTaskInfo* pTaskInfo); bool multigroupResult, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols,
SSDataBlock* pResBlock, STimeWindowAggSupp *pTwAggSupp, SExecTaskInfo* pTaskInfo); SSDataBlock* pResBlock, STimeWindowAggSupp *pTwAggSupp, SExecTaskInfo* pTaskInfo);

View File

@ -27,13 +27,12 @@ extern "C" {
struct SSDataBlock; struct SSDataBlock;
typedef struct SFillColInfo { typedef struct SFillColInfo {
// STColumn col; // column info SExprInfo *pExpr;
SResSchema col; // SResSchema schema;
int16_t functionId; // sql function id // int16_t functionId; // sql function id
int16_t flag; // column flag: TAG COLUMN|NORMAL COLUMN int16_t flag; // column flag: TAG COLUMN|NORMAL COLUMN
int16_t tagIndex; // index of current tag in SFillTagColInfo array list int16_t tagIndex; // index of current tag in SFillTagColInfo array list
int32_t offset; SVariant fillVal;
union {int64_t i; double d;} val;
} SFillColInfo; } SFillColInfo;
typedef struct { typedef struct {
@ -56,9 +55,10 @@ typedef struct SFillInfo {
int32_t numOfCols; // number of columns, including the tags columns int32_t numOfCols; // number of columns, including the tags columns
int32_t rowSize; // size of each row int32_t rowSize; // size of each row
SInterval interval; SInterval interval;
char * prevValues; // previous row of data, to generate the interpolation results
char * nextValues; // next row of data SArray *prev;
char** pData; // original result data block involved in filling data SArray *next;
SSDataBlock *pSrcBlock;
int32_t alloc; // data buffer size in rows int32_t alloc; // data buffer size in rows
SFillColInfo* pFillCol; // column info for fill operations SFillColInfo* pFillCol; // column info for fill operations
@ -72,7 +72,7 @@ int64_t getNumOfResultsAfterFillGap(SFillInfo* pFillInfo, int64_t ekey, int32_t
void taosFillSetStartInfo(struct SFillInfo* pFillInfo, int32_t numOfRows, TSKEY endKey); void taosFillSetStartInfo(struct SFillInfo* pFillInfo, int32_t numOfRows, TSKEY endKey);
void taosResetFillInfo(struct SFillInfo* pFillInfo, TSKEY startTimestamp); void taosResetFillInfo(struct SFillInfo* pFillInfo, TSKEY startTimestamp);
void taosFillSetInputDataBlock(struct SFillInfo* pFillInfo, const struct SSDataBlock* pInput); void taosFillSetInputDataBlock(struct SFillInfo* pFillInfo, const struct SSDataBlock* pInput);
struct SFillColInfo* createFillColInfo(SExprInfo* pExpr, int32_t numOfOutput, const struct SValueNode* val); struct SFillColInfo* createFillColInfo(SExprInfo* pExpr, int32_t numOfOutput, const struct SNodeListNode* val);
bool taosFillHasMoreResults(struct SFillInfo* pFillInfo); bool taosFillHasMoreResults(struct SFillInfo* pFillInfo);
SFillInfo* taosCreateFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int32_t capacity, int32_t numOfCols, SFillInfo* taosCreateFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int32_t capacity, int32_t numOfCols,
@ -80,7 +80,7 @@ SFillInfo* taosCreateFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int3
struct SFillColInfo* pCol, const char* id); struct SFillColInfo* pCol, const char* id);
void* taosDestroyFillInfo(struct SFillInfo *pFillInfo); void* taosDestroyFillInfo(struct SFillInfo *pFillInfo);
int64_t taosFillResultDataBlock(struct SFillInfo* pFillInfo, void** output, int32_t capacity); int64_t taosFillResultDataBlock(struct SFillInfo* pFillInfo, SSDataBlock* p, int32_t capacity);
int64_t getFillInfoStart(struct SFillInfo *pFillInfo); int64_t getFillInfoStart(struct SFillInfo *pFillInfo);

View File

@ -3201,16 +3201,16 @@ static int32_t compressQueryColData(SColumnInfoData* pColRes, int32_t numOfRows,
colSize + COMP_OVERFLOW_BYTES, compressed, NULL, 0); colSize + COMP_OVERFLOW_BYTES, compressed, NULL, 0);
} }
int32_t doFillTimeIntervalGapsInResults(struct SFillInfo* pFillInfo, SSDataBlock* pOutput, int32_t capacity, void** p) { int32_t doFillTimeIntervalGapsInResults(struct SFillInfo* pFillInfo, SSDataBlock* pBlock, int32_t capacity) {
// for(int32_t i = 0; i < pFillInfo->numOfCols; ++i) { // for(int32_t i = 0; i < pFillInfo->numOfCols; ++i) {
// SColumnInfoData* pColInfoData = taosArrayGet(pOutput->pDataBlock, i); // SColumnInfoData* pColInfoData = taosArrayGet(pOutput->pDataBlock, i);
// p[i] = pColInfoData->pData + (pColInfoData->info.bytes * pOutput->info.rows); // p[i] = pColInfoData->pData + (pColInfoData->info.bytes * pOutput->info.rows);
// } // }
int32_t numOfRows = (int32_t)taosFillResultDataBlock(pFillInfo, p, capacity - pOutput->info.rows); int32_t numOfRows = (int32_t)taosFillResultDataBlock(pFillInfo, pBlock, capacity - pBlock->info.rows);
pOutput->info.rows += numOfRows; pBlock->info.rows += numOfRows;
return pOutput->info.rows; return pBlock->info.rows;
} }
void publishOperatorProfEvent(SOperatorInfo* pOperator, EQueryProfEventType eventType) { void publishOperatorProfEvent(SOperatorInfo* pOperator, EQueryProfEventType eventType) {
@ -5559,7 +5559,7 @@ static void doHandleRemainBlockForNewGroupImpl(SFillOperatorInfo* pInfo, SResult
taosFillSetStartInfo(pInfo->pFillInfo, pInfo->existNewGroupBlock->info.rows, ekey); taosFillSetStartInfo(pInfo->pFillInfo, pInfo->existNewGroupBlock->info.rows, ekey);
taosFillSetInputDataBlock(pInfo->pFillInfo, pInfo->existNewGroupBlock); taosFillSetInputDataBlock(pInfo->pFillInfo, pInfo->existNewGroupBlock);
doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, pResultInfo->capacity, pInfo->p); doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, pResultInfo->capacity);
pInfo->existNewGroupBlock = NULL; pInfo->existNewGroupBlock = NULL;
*newgroup = true; *newgroup = true;
} }
@ -5568,7 +5568,7 @@ static void doHandleRemainBlockFromNewGroup(SFillOperatorInfo* pInfo, SResultInf
SExecTaskInfo* pTaskInfo) { SExecTaskInfo* pTaskInfo) {
if (taosFillHasMoreResults(pInfo->pFillInfo)) { if (taosFillHasMoreResults(pInfo->pFillInfo)) {
*newgroup = false; *newgroup = false;
doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, (int32_t)pResultInfo->capacity, pInfo->p); doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, (int32_t)pResultInfo->capacity);
if (pInfo->pRes->info.rows > pResultInfo->threshold || (!pInfo->multigroupResult)) { if (pInfo->pRes->info.rows > pResultInfo->threshold || (!pInfo->multigroupResult)) {
return; return;
} }
@ -5629,7 +5629,8 @@ static SSDataBlock* doFill(SOperatorInfo* pOperator, bool* newgroup) {
} }
} }
doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pResBlock, pOperator->resultInfo.capacity, pInfo->p); blockDataEnsureCapacity(pResBlock, pOperator->resultInfo.capacity);
doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pResBlock, pOperator->resultInfo.capacity);
// current group has no more result to return // current group has no more result to return
if (pResBlock->info.rows > 0) { if (pResBlock->info.rows > 0) {
@ -6203,19 +6204,17 @@ _error:
return NULL; return NULL;
} }
static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t numOfCols, int64_t* fillVal, static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t numOfCols, SNodeListNode* pValNode,
STimeWindow win, int32_t capacity, const char* id, SInterval* pInterval, int32_t fillType) { STimeWindow win, int32_t capacity, const char* id, SInterval* pInterval, int32_t fillType) {
SFillColInfo* pColInfo = createFillColInfo(pExpr, numOfCols, NULL); SFillColInfo* pColInfo = createFillColInfo(pExpr, numOfCols, pValNode);
// TODO set correct time precision
STimeWindow w = TSWINDOW_INITIALIZER; STimeWindow w = TSWINDOW_INITIALIZER;
getAlignQueryTimeWindow(pInterval, TSDB_TIME_PRECISION_MILLI, win.skey, &w); getAlignQueryTimeWindow(pInterval, pInterval->precision, win.skey, &w);
int32_t order = TSDB_ORDER_ASC; int32_t order = TSDB_ORDER_ASC;
pInfo->pFillInfo = taosCreateFillInfo(order, w.skey, 0, capacity, numOfCols, pInterval, fillType, pColInfo, id); pInfo->pFillInfo = taosCreateFillInfo(order, w.skey, 0, capacity, numOfCols, pInterval, fillType, pColInfo, id);
pInfo->p = taosMemoryCalloc(numOfCols, POINTER_BYTES); pInfo->p = taosMemoryCalloc(numOfCols, POINTER_BYTES);
if (pInfo->pFillInfo == NULL || pInfo->p == NULL) { if (pInfo->pFillInfo == NULL || pInfo->p == NULL) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} else { } else {
@ -6223,15 +6222,29 @@ static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t
} }
} }
static SArray* getFillValue(SNodeListNode* pNodeList) {
SArray* pList = taosArrayInit(4, sizeof(SVariant));
size_t len = LIST_LENGTH(pNodeList->pNodeList);
for(int32_t i = 0; i < len; ++i) {
SValueNode* pvalue = (SValueNode*)nodesListGetNode(pNodeList->pNodeList, i);
SVariant v = {0};
valueNodeToVariant(pvalue, &v);
taosArrayPush(pList, &v);
}
return pList;
}
SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols,
SInterval* pInterval, SSDataBlock* pResBlock, int32_t fillType, char* fillVal, SInterval* pInterval, STimeWindow* pWindow, SSDataBlock* pResBlock, int32_t fillType, SNodeListNode* pValueNode,
bool multigroupResult, SExecTaskInfo* pTaskInfo) { bool multigroupResult, SExecTaskInfo* pTaskInfo) {
SFillOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SFillOperatorInfo)); SFillOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SFillOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
pInfo->pRes = pResBlock; pInfo->pRes = pResBlock;
pInfo->multigroupResult = multigroupResult; pInfo->multigroupResult = multigroupResult;
pInfo->intervalInfo = *pInterval;
int32_t type = TSDB_FILL_NONE; int32_t type = TSDB_FILL_NONE;
switch (fillType) { switch (fillType) {
@ -6260,7 +6273,7 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExp
SResultInfo* pResultInfo = &pOperator->resultInfo; SResultInfo* pResultInfo = &pOperator->resultInfo;
initResultSizeInfo(pOperator, 4096); initResultSizeInfo(pOperator, 4096);
int32_t code = initFillInfo(pInfo, pExpr, numOfCols, (int64_t*)fillVal, pTaskInfo->window, pResultInfo->capacity, int32_t code = initFillInfo(pInfo, pExpr, numOfCols, pValueNode, *pWindow, pResultInfo->capacity,
pTaskInfo->id.str, pInterval, type); pTaskInfo->id.str, pInterval, type);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
@ -6269,7 +6282,7 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExp
pOperator->name = "FillOperator"; pOperator->name = "FillOperator";
pOperator->blockingOptr = false; pOperator->blockingOptr = false;
pOperator->status = OP_NOT_OPENED; pOperator->status = OP_NOT_OPENED;
// pOperator->operatorType = OP_Fill; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_FILL;
pOperator->pExpr = pExpr; pOperator->pExpr = pExpr;
pOperator->numOfOutput = numOfCols; pOperator->numOfOutput = numOfCols;
pOperator->info = pInfo; pOperator->info = pInfo;
@ -6618,13 +6631,6 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
int32_t primaryTsSlotId = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId; int32_t primaryTsSlotId = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
pOptr = createIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, primaryTsSlotId, &as, pOptr = createIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, primaryTsSlotId, &as,
pTableGroupInfo, pTaskInfo); pTableGroupInfo, pTaskInfo);
// if (pIntervalPhyNode->pFill != NULL) {
// pOptr = createFillOperatorInfo(pOptr, pExprInfo, num, &interval, pResBlock, pIntervalPhyNode->pFill->mode,
// NULL,
// false, pTaskInfo);
// }
} else if (QUERY_NODE_PHYSICAL_PLAN_SORT == type) { } else if (QUERY_NODE_PHYSICAL_PLAN_SORT == type) {
SSortPhysiNode* pSortPhyNode = (SSortPhysiNode*)pPhyNode; SSortPhysiNode* pSortPhyNode = (SSortPhysiNode*)pPhyNode;
@ -6662,6 +6668,13 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
SExprInfo* pExprInfo = createExprInfo(pJoinNode->pTargets, NULL, &num); SExprInfo* pExprInfo = createExprInfo(pJoinNode->pTargets, NULL, &num);
pOptr = createJoinOperatorInfo(ops, size, pExprInfo, num, pResBlock, pJoinNode->pOnConditions, pTaskInfo); pOptr = createJoinOperatorInfo(ops, size, pExprInfo, num, pResBlock, pJoinNode->pOnConditions, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_FILL == type) {
SFillPhysiNode* pFillNode = (SFillPhysiNode*)pPhyNode;
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
SExprInfo* pExprInfo = createExprInfo(pFillNode->pTargets, NULL, &num);
SInterval* pInterval = &((STableIntervalOperatorInfo*)ops[0]->info)->interval;
pOptr = createFillOperatorInfo(ops[0], pExprInfo, num, pInterval, &pFillNode->timeRange, pResBlock, pFillNode->mode, (SNodeListNode*)pFillNode->pValues, false, pTaskInfo);
} else { } else {
ASSERT(0); ASSERT(0);
} }

View File

@ -13,20 +13,21 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "function.h"
#include "os.h" #include "os.h"
#include "querynodes.h"
#include "taosdef.h" #include "taosdef.h"
#include "tmsg.h" #include "tmsg.h"
#include "ttypes.h" #include "ttypes.h"
#include "tfill.h"
#include "function.h"
#include "tcommon.h" #include "tcommon.h"
#include "thash.h" #include "thash.h"
#include "ttime.h" #include "ttime.h"
#include "function.h"
#include "tdatablock.h"
#include "executorInt.h"
#include "querynodes.h"
#include "tfill.h"
#define FILL_IS_ASC_FILL(_f) ((_f)->order == TSDB_ORDER_ASC) #define FILL_IS_ASC_FILL(_f) ((_f)->order == TSDB_ORDER_ASC)
#define DO_INTERPOLATION(_v1, _v2, _k1, _k2, _k) ((_v1) + ((_v2) - (_v1)) * (((double)(_k)) - ((double)(_k1))) / (((double)(_k2)) - ((double)(_k1)))) #define DO_INTERPOLATION(_v1, _v2, _k1, _k2, _k) ((_v1) + ((_v2) - (_v1)) * (((double)(_k)) - ((double)(_k1))) / (((double)(_k2)) - ((double)(_k1))))
@ -37,168 +38,210 @@ static void setTagsValue(SFillInfo* pFillInfo, void** data, int32_t genRows) {
continue; continue;
} }
char* val1 = elePtrAt(data[j], pCol->col.bytes, genRows); SResSchema* pSchema = &pCol->pExpr->base.resSchema;
char* val1 = elePtrAt(data[j], pSchema->bytes, genRows);
assert(pCol->tagIndex >= 0 && pCol->tagIndex < pFillInfo->numOfTags); assert(pCol->tagIndex >= 0 && pCol->tagIndex < pFillInfo->numOfTags);
SFillTagColInfo* pTag = &pFillInfo->pTags[pCol->tagIndex]; SFillTagColInfo* pTag = &pFillInfo->pTags[pCol->tagIndex];
assignVal(val1, pTag->tagVal, pSchema->bytes, pSchema->type);
// assert (pTag->col.colId == pCol->col.colId);
assignVal(val1, pTag->tagVal, pCol->col.bytes, pCol->col.type);
} }
} }
static void setNullValueForRow(SFillInfo* pFillInfo, void** data, int32_t numOfCol, int32_t rowIndex) { static void setNullRow(SSDataBlock* pBlock, int32_t numOfCol, int32_t rowIndex) {
// the first are always the timestamp column, so start from the second column. // the first are always the timestamp column, so start from the second column.
for (int32_t i = 1; i < numOfCol; ++i) { for (int32_t i = 1; i < pBlock->info.numOfCols; ++i) {
SFillColInfo* pCol = &pFillInfo->pFillCol[i]; SColumnInfoData* p = taosArrayGet(pBlock->pDataBlock, i);
colDataAppendNULL(p, rowIndex);
char* output = elePtrAt(data[i], pCol->col.bytes, rowIndex);
setNull(output, pCol->col.type, pCol->col.bytes);
} }
} }
static void doFillOneRowResult(SFillInfo* pFillInfo, void** data, char** srcData, int64_t ts, bool outOfBound) { #define GET_DEST_SLOT_ID(_p) ((_p)->pExpr->base.resSchema.slotId)
char* prev = pFillInfo->prevValues; #define GET_SRC_SLOT_ID(_p) ((_p)->pExpr->base.pParam[0].pCol->slotId)
char* next = pFillInfo->nextValues;
static void doSetVal(SColumnInfoData* pDstColInfoData, int32_t rowIndex, const SGroupKeys* pKey);
static void doFillOneRowResult(SFillInfo* pFillInfo, SSDataBlock *pBlock, SSDataBlock* pSrcBlock, int64_t ts, bool outOfBound) {
SPoint point1, point2, point; SPoint point1, point2, point;
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pFillInfo->order); int32_t step = GET_FORWARD_DIRECTION_FACTOR(pFillInfo->order);
// set the primary timestamp column value // set the primary timestamp column value
int32_t index = pFillInfo->numOfCurrent; int32_t index = pFillInfo->numOfCurrent;
char* val = elePtrAt(data[0], TSDB_KEYSIZE, index); SColumnInfoData *pCol0 = taosArrayGet(pBlock->pDataBlock, 0);
char* val = colDataGetData(pCol0, index);
*(TSKEY*) val = pFillInfo->currentKey; *(TSKEY*) val = pFillInfo->currentKey;
// set the other values // set the other values
if (pFillInfo->type == TSDB_FILL_PREV) { if (pFillInfo->type == TSDB_FILL_PREV) {
char* p = FILL_IS_ASC_FILL(pFillInfo) ? prev : next; SArray* p = FILL_IS_ASC_FILL(pFillInfo) ? pFillInfo->prev : pFillInfo->next;
if (p != NULL) {
for (int32_t i = 1; i < pFillInfo->numOfCols; ++i) { for (int32_t i = 1; i < pFillInfo->numOfCols; ++i) {
SFillColInfo* pCol = &pFillInfo->pFillCol[i]; SFillColInfo* pCol = &pFillInfo->pFillCol[i];
if (TSDB_COL_IS_TAG(pCol->flag)) { if (TSDB_COL_IS_TAG(pCol->flag)) {
continue; continue;
} }
char* output = elePtrAt(data[i], pCol->col.bytes, index); SGroupKeys* pKey = taosArrayGet(p, i);
// assignVal(output, p + pCol->offset, pCol->col.bytes, pCol->col.type); SColumnInfoData* pDstColInfoData = taosArrayGet(pBlock->pDataBlock, GET_DEST_SLOT_ID(pCol));
} doSetVal(pDstColInfoData, index, pKey);
} else { // no prev value yet, set the value for NULL
setNullValueForRow(pFillInfo, data, pFillInfo->numOfCols, index);
} }
} else if (pFillInfo->type == TSDB_FILL_NEXT) { } else if (pFillInfo->type == TSDB_FILL_NEXT) {
char* p = FILL_IS_ASC_FILL(pFillInfo)? next : prev; SArray* p = FILL_IS_ASC_FILL(pFillInfo) ? pFillInfo->next : pFillInfo->prev;
if (p != NULL) {
for (int32_t i = 1; i < pFillInfo->numOfCols; ++i) { for (int32_t i = 1; i < pFillInfo->numOfCols; ++i) {
SFillColInfo* pCol = &pFillInfo->pFillCol[i]; SFillColInfo* pCol = &pFillInfo->pFillCol[i];
if (TSDB_COL_IS_TAG(pCol->flag)) { if (TSDB_COL_IS_TAG(pCol->flag)) {
continue; continue;
} }
char* output = elePtrAt(data[i], pCol->col.bytes, index); SGroupKeys* pKey = taosArrayGet(p, i);
// assignVal(output, p + pCol->offset, pCol->col.bytes, pCol->col.type); SColumnInfoData* pDstColInfoData = taosArrayGet(pBlock->pDataBlock, GET_DEST_SLOT_ID(pCol));
} doSetVal(pDstColInfoData, index, pKey);
} else { // no prev value yet, set the value for NULL
setNullValueForRow(pFillInfo, data, pFillInfo->numOfCols, index);
} }
} else if (pFillInfo->type == TSDB_FILL_LINEAR) { } else if (pFillInfo->type == TSDB_FILL_LINEAR) {
// TODO : linear interpolation supports NULL value // TODO : linear interpolation supports NULL value
if (prev != NULL && !outOfBound) { if (outOfBound) {
setNullRow(pBlock, pFillInfo->numOfCols, index);
} else {
for (int32_t i = 1; i < pFillInfo->numOfCols; ++i) { for (int32_t i = 1; i < pFillInfo->numOfCols; ++i) {
SFillColInfo* pCol = &pFillInfo->pFillCol[i]; SFillColInfo* pCol = &pFillInfo->pFillCol[i];
if (TSDB_COL_IS_TAG(pCol->flag)) { if (TSDB_COL_IS_TAG(pCol->flag)) {
continue; continue;
} }
int16_t type = pCol->col.type; int32_t srcSlotId = GET_SRC_SLOT_ID(pCol);
int16_t bytes = pCol->col.bytes;
char *val1 = elePtrAt(data[i], pCol->col.bytes, index); int32_t dstSlotId = GET_DEST_SLOT_ID(pCol);
if (type == TSDB_DATA_TYPE_BINARY|| type == TSDB_DATA_TYPE_NCHAR || type == TSDB_DATA_TYPE_BOOL) { SColumnInfoData* pDstCol = taosArrayGet(pBlock->pDataBlock, dstSlotId);
setNull(val1, pCol->col.type, bytes);
int16_t type = pCol->pExpr->base.resSchema.type;
SGroupKeys* pKey = taosArrayGet(pFillInfo->prev, i);
if (IS_VAR_DATA_TYPE(type) || type == TSDB_DATA_TYPE_BOOL || pKey->isNull) {
colDataAppendNULL(pDstCol, index);
continue; continue;
} }
point1 = (SPoint){.key = *(TSKEY*)(prev), .val = prev + pCol->offset}; SGroupKeys* pKey1 = taosArrayGet(pFillInfo->prev, 0);
point2 = (SPoint){.key = ts, .val = srcData[i] + pFillInfo->index * bytes}; int64_t prevTs = *(int64_t*)pKey1->pData;
point = (SPoint){.key = pFillInfo->currentKey, .val = val1};
SColumnInfoData* pSrcCol = taosArrayGet(pSrcBlock->pDataBlock, srcSlotId);
char* data = colDataGetData(pSrcCol, pFillInfo->index);
point1 = (SPoint){.key = prevTs, .val = pKey->pData};
point2 = (SPoint){.key = ts, .val = data};
int64_t out = 0;
point = (SPoint){.key = pFillInfo->currentKey, .val = &out};
taosGetLinearInterpolationVal(&point, type, &point1, &point2, type); taosGetLinearInterpolationVal(&point, type, &point1, &point2, type);
colDataAppend(pDstCol, index, (const char*)&out, false);
} }
} else {
setNullValueForRow(pFillInfo, data, pFillInfo->numOfCols, index);
} }
} else { // fill the default value */ } else if (pFillInfo->type == TSDB_FILL_NULL) { // fill with NULL
setNullRow(pBlock, pFillInfo->numOfCols, index);
} else { // fill with user specified value for each column
for (int32_t i = 1; i < pFillInfo->numOfCols; ++i) { for (int32_t i = 1; i < pFillInfo->numOfCols; ++i) {
SFillColInfo* pCol = &pFillInfo->pFillCol[i]; SFillColInfo* pCol = &pFillInfo->pFillCol[i];
if (TSDB_COL_IS_TAG(pCol->flag)/* || IS_VAR_DATA_TYPE(pCol->col.type)*/) { if (TSDB_COL_IS_TAG(pCol->flag)/* || IS_VAR_DATA_TYPE(pCol->schema.type)*/) {
continue; continue;
} }
char* val1 = elePtrAt(data[i], pCol->col.bytes, index); SVariant* pVar = &pFillInfo->pFillCol[i].fillVal;
assignVal(val1, (char*)&pCol->val, pCol->col.bytes, pCol->col.type);
SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, i);
if (pDst->info.type == TSDB_DATA_TYPE_FLOAT) {
float v = 0;
GET_TYPED_DATA(v, float, pVar->nType, &pVar->i);
colDataAppend(pDst, index, (char*)&v, false);
} else if (pDst->info.type == TSDB_DATA_TYPE_DOUBLE) {
double v = 0;
GET_TYPED_DATA(v, double, pVar->nType, &pVar->i);
colDataAppend(pDst, index, (char*)&v, false);
} else if (IS_SIGNED_NUMERIC_TYPE(pDst->info.type)) {
int64_t v = 0;
GET_TYPED_DATA(v, int64_t, pVar->nType, &pVar->i);
colDataAppend(pDst, index, (char*)&v, false);
}
// colDataAppend(pDst, index, (char*)&pVar->i, false);
} }
} }
setTagsValue(pFillInfo, data, index); // setTagsValue(pFillInfo, data, index);
pFillInfo->currentKey = taosTimeAdd(pFillInfo->currentKey, pFillInfo->interval.sliding * step, pFillInfo->interval.slidingUnit, SInterval* pInterval = &pFillInfo->interval;
pFillInfo->interval.precision); pFillInfo->currentKey = taosTimeAdd(pFillInfo->currentKey, pInterval->sliding * step, pInterval->slidingUnit, pInterval->precision);
pFillInfo->numOfCurrent++; pFillInfo->numOfCurrent++;
} }
static void initBeforeAfterDataBuf(SFillInfo* pFillInfo, char** next) { void doSetVal(SColumnInfoData* pDstCol, int32_t rowIndex, const SGroupKeys* pKey) {
if (*next != NULL) { if (pKey->isNull) {
colDataAppendNULL(pDstCol, rowIndex);
} else {
colDataAppend(pDstCol, rowIndex, pKey->pData, false);
}
}
static void initBeforeAfterDataBuf(SFillInfo* pFillInfo) {
if (taosArrayGetSize(pFillInfo->next) > 0) {
return; return;
} }
*next = taosMemoryCalloc(1, pFillInfo->rowSize); for (int i = 0; i < pFillInfo->numOfCols; i++) {
for (int i = 1; i < pFillInfo->numOfCols; i++) {
SFillColInfo* pCol = &pFillInfo->pFillCol[i]; SFillColInfo* pCol = &pFillInfo->pFillCol[i];
setNull(*next + pCol->offset, pCol->col.type, pCol->col.bytes);
SGroupKeys key = {0};
SResSchema* pSchema = &pCol->pExpr->base.resSchema;
key.pData = taosMemoryMalloc(pSchema->bytes);
key.isNull = true;
key.bytes = pSchema->bytes;
key.type = pSchema->type;
taosArrayPush(pFillInfo->next, &key);
key.pData = taosMemoryMalloc(pSchema->bytes);
taosArrayPush(pFillInfo->prev, &key);
} }
} }
static void copyCurrentRowIntoBuf(SFillInfo* pFillInfo, char** srcData, char* buf) { static void saveColData(SArray* rowBuf, int32_t columnIndex, const char* src, bool isNull);
int32_t rowIndex = pFillInfo->index;
static void copyCurrentRowIntoBuf(SFillInfo* pFillInfo, int32_t rowIndex, SArray* pRow) {
for (int32_t i = 0; i < pFillInfo->numOfCols; ++i) { for (int32_t i = 0; i < pFillInfo->numOfCols; ++i) {
SFillColInfo* pCol = &pFillInfo->pFillCol[i]; int32_t srcSlotId = GET_SRC_SLOT_ID(&pFillInfo->pFillCol[i]);
memcpy(buf + pCol->offset, srcData[i] + rowIndex * pCol->col.bytes, pCol->col.bytes);
SColumnInfoData* pSrcCol = taosArrayGet(pFillInfo->pSrcBlock->pDataBlock, srcSlotId);
bool isNull = colDataIsNull_s(pSrcCol, rowIndex);
char* p = colDataGetData(pSrcCol, rowIndex);
saveColData(pRow, i, p, isNull);
} }
} }
static int32_t fillResultImpl(SFillInfo* pFillInfo, void** data, int32_t outputRows) { static int32_t fillResultImpl(SFillInfo* pFillInfo, SSDataBlock* pBlock, int32_t outputRows) {
pFillInfo->numOfCurrent = 0; pFillInfo->numOfCurrent = 0;
char** srcData = pFillInfo->pData; // todo make sure the first column is always the primary timestamp column?
char** prev = &pFillInfo->prevValues; SColumnInfoData* pTsCol = taosArrayGet(pFillInfo->pSrcBlock->pDataBlock, 0);
char** next = &pFillInfo->nextValues;
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pFillInfo->order); int32_t step = GET_FORWARD_DIRECTION_FACTOR(pFillInfo->order);
bool ascFill = FILL_IS_ASC_FILL(pFillInfo);
if (FILL_IS_ASC_FILL(pFillInfo)) { #if 0
assert(pFillInfo->currentKey >= pFillInfo->start); ASSERT(ascFill && (pFillInfo->currentKey >= pFillInfo->start) || (!ascFill && (pFillInfo->currentKey <= pFillInfo->start)));
} else { #endif
assert(pFillInfo->currentKey <= pFillInfo->start);
}
while (pFillInfo->numOfCurrent < outputRows) { while (pFillInfo->numOfCurrent < outputRows) {
int64_t ts = ((int64_t*)pFillInfo->pData[0])[pFillInfo->index]; int64_t ts = ((int64_t*)pTsCol->pData)[pFillInfo->index];
// set the next value for interpolation // set the next value for interpolation
if ((pFillInfo->currentKey < ts && FILL_IS_ASC_FILL(pFillInfo)) || if ((pFillInfo->currentKey < ts && ascFill) || (pFillInfo->currentKey > ts && !ascFill)) {
(pFillInfo->currentKey > ts && !FILL_IS_ASC_FILL(pFillInfo))) { copyCurrentRowIntoBuf(pFillInfo, pFillInfo->index, pFillInfo->next);
initBeforeAfterDataBuf(pFillInfo, next);
copyCurrentRowIntoBuf(pFillInfo, srcData, *next);
} }
if (((pFillInfo->currentKey < ts && FILL_IS_ASC_FILL(pFillInfo)) || (pFillInfo->currentKey > ts && !FILL_IS_ASC_FILL(pFillInfo))) && if (((pFillInfo->currentKey < ts && ascFill) || (pFillInfo->currentKey > ts && !ascFill)) && pFillInfo->numOfCurrent < outputRows) {
pFillInfo->numOfCurrent < outputRows) { // fill the gap between two input rows
while (((pFillInfo->currentKey < ts && ascFill) || (pFillInfo->currentKey > ts && !ascFill)) && pFillInfo->numOfCurrent < outputRows) {
// fill the gap between two actual input rows doFillOneRowResult(pFillInfo, pBlock, pFillInfo->pSrcBlock, ts, false);
while (((pFillInfo->currentKey < ts && FILL_IS_ASC_FILL(pFillInfo)) ||
(pFillInfo->currentKey > ts && !FILL_IS_ASC_FILL(pFillInfo))) &&
pFillInfo->numOfCurrent < outputRows) {
doFillOneRowResult(pFillInfo, data, srcData, ts, false);
} }
// output buffer is full, abort // output buffer is full, abort
@ -208,61 +251,66 @@ static int32_t fillResultImpl(SFillInfo* pFillInfo, void** data, int32_t outputR
} }
} else { } else {
assert(pFillInfo->currentKey == ts); assert(pFillInfo->currentKey == ts);
initBeforeAfterDataBuf(pFillInfo, prev);
if (pFillInfo->type == TSDB_FILL_NEXT && (pFillInfo->index + 1) < pFillInfo->numOfRows) { if (pFillInfo->type == TSDB_FILL_NEXT && (pFillInfo->index + 1) < pFillInfo->numOfRows) {
initBeforeAfterDataBuf(pFillInfo, next);
++pFillInfo->index; ++pFillInfo->index;
copyCurrentRowIntoBuf(pFillInfo, srcData, *next); copyCurrentRowIntoBuf(pFillInfo, pFillInfo->index, pFillInfo->next);
--pFillInfo->index; --pFillInfo->index;
} }
// assign rows to dst buffer // assign rows to dst buffer
for (int32_t i = 0; i < pFillInfo->numOfCols; ++i) { for (int32_t i = 0; i < pFillInfo->numOfCols; ++i) {
SFillColInfo* pCol = &pFillInfo->pFillCol[i]; SFillColInfo* pCol = &pFillInfo->pFillCol[i];
if (TSDB_COL_IS_TAG(pCol->flag)/* || IS_VAR_DATA_TYPE(pCol->col.type)*/) { if (TSDB_COL_IS_TAG(pCol->flag)/* || IS_VAR_DATA_TYPE(pCol->schema.type)*/) {
continue; continue;
} }
char* output = elePtrAt(data[i], pCol->col.bytes, pFillInfo->numOfCurrent); int32_t srcSlotId = GET_SRC_SLOT_ID(pCol);
char* src = elePtrAt(srcData[i], pCol->col.bytes, pFillInfo->index); int32_t dstSlotId = GET_DEST_SLOT_ID(pCol);
if (i == 0 || (pCol->functionId != FUNCTION_COUNT && !isNull(src, pCol->col.type)) || SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, dstSlotId);
(pCol->functionId == FUNCTION_COUNT && GET_INT64_VAL(src) != 0)) { SColumnInfoData* pSrc = taosArrayGet(pFillInfo->pSrcBlock->pDataBlock, srcSlotId);
assignVal(output, src, pCol->col.bytes, pCol->col.type);
memcpy(*prev + pCol->offset, src, pCol->col.bytes); char* src = colDataGetData(pSrc, pFillInfo->index);
if (i == 0 || (/*pCol->functionId != FUNCTION_COUNT &&*/ !colDataIsNull_s(pSrc, pFillInfo->index)) /*||
(pCol->functionId == FUNCTION_COUNT && GET_INT64_VAL(src) != 0)*/) {
bool isNull = colDataIsNull_s(pSrc, pFillInfo->index);
colDataAppend(pDst, pFillInfo->numOfCurrent, src, isNull);
saveColData(pFillInfo->prev, i, src, isNull);
} else { // i > 0 and data is null , do interpolation } else { // i > 0 and data is null , do interpolation
if (pFillInfo->type == TSDB_FILL_PREV) { if (pFillInfo->type == TSDB_FILL_PREV) {
assignVal(output, *prev + pCol->offset, pCol->col.bytes, pCol->col.type); SGroupKeys *pKey = taosArrayGet(pFillInfo->prev, i);
doSetVal(pDst, pFillInfo->numOfCurrent, pKey);
} else if (pFillInfo->type == TSDB_FILL_LINEAR) { } else if (pFillInfo->type == TSDB_FILL_LINEAR) {
assignVal(output, src, pCol->col.bytes, pCol->col.type); bool isNull = colDataIsNull_s(pSrc, pFillInfo->index);
memcpy(*prev + pCol->offset, src, pCol->col.bytes); colDataAppend(pDst, pFillInfo->numOfCurrent, src, isNull);
saveColData(pFillInfo->prev, i, src, isNull);
} else if (pFillInfo->type == TSDB_FILL_NULL) {
colDataAppendNULL(pDst, pFillInfo->numOfCurrent);
} else if (pFillInfo->type == TSDB_FILL_NEXT) { } else if (pFillInfo->type == TSDB_FILL_NEXT) {
if (*next) { SGroupKeys *pKey = taosArrayGet(pFillInfo->next, i);
assignVal(output, *next + pCol->offset, pCol->col.bytes, pCol->col.type); doSetVal(pDst, pFillInfo->numOfCurrent, pKey);
} else { } else {
setNull(output, pCol->col.type, pCol->col.bytes); SVariant* pVar = &pFillInfo->pFillCol[i].fillVal;
} colDataAppend(pDst, pFillInfo->numOfCurrent, (char*)&pVar->i, false);
} else {
assignVal(output, (char*)&pCol->val, pCol->col.bytes, pCol->col.type);
} }
} }
} }
// set the tag value for final result // set the tag value for final result
setTagsValue(pFillInfo, data, pFillInfo->numOfCurrent); // setTagsValue(pFillInfo, data, pFillInfo->numOfCurrent);
SInterval *pInterval = &pFillInfo->interval;
pFillInfo->currentKey = taosTimeAdd(pFillInfo->currentKey, pInterval->sliding * step, pInterval->slidingUnit, pInterval->precision);
pFillInfo->currentKey = taosTimeAdd(pFillInfo->currentKey, pFillInfo->interval.sliding * step,
pFillInfo->interval.slidingUnit, pFillInfo->interval.precision);
pFillInfo->index += 1; pFillInfo->index += 1;
pFillInfo->numOfCurrent += 1; pFillInfo->numOfCurrent += 1;
} }
if (pFillInfo->index >= pFillInfo->numOfRows || pFillInfo->numOfCurrent >= outputRows) { if (pFillInfo->index >= pFillInfo->numOfRows || pFillInfo->numOfCurrent >= outputRows) {
/* the raw data block is exhausted, next value does not exists */ /* the raw data block is exhausted, next value does not exists */
if (pFillInfo->index >= pFillInfo->numOfRows) { // if (pFillInfo->index >= pFillInfo->numOfRows) {
taosMemoryFreeClear(*next); // taosMemoryFreeClear(*next);
} // }
pFillInfo->numOfTotal += pFillInfo->numOfCurrent; pFillInfo->numOfTotal += pFillInfo->numOfCurrent;
return pFillInfo->numOfCurrent; return pFillInfo->numOfCurrent;
} }
@ -271,14 +319,24 @@ static int32_t fillResultImpl(SFillInfo* pFillInfo, void** data, int32_t outputR
return pFillInfo->numOfCurrent; return pFillInfo->numOfCurrent;
} }
static int64_t appendFilledResult(SFillInfo* pFillInfo, void** output, int64_t resultCapacity) { static void saveColData(SArray* rowBuf, int32_t columnIndex, const char* src, bool isNull) {
SGroupKeys *pKey = taosArrayGet(rowBuf, columnIndex);
if (isNull) {
pKey->isNull = true;
} else {
memcpy(pKey->pData, src, pKey->bytes);
pKey->isNull = false;
}
}
static int64_t appendFilledResult(SFillInfo* pFillInfo, SSDataBlock* pBlock, int64_t resultCapacity) {
/* /*
* These data are generated according to fill strategy, since the current timestamp is out of the time window of * These data are generated according to fill strategy, since the current timestamp is out of the time window of
* real result set. Note that we need to keep the direct previous result rows, to generated the filled data. * real result set. Note that we need to keep the direct previous result rows, to generated the filled data.
*/ */
pFillInfo->numOfCurrent = 0; pFillInfo->numOfCurrent = 0;
while (pFillInfo->numOfCurrent < resultCapacity) { while (pFillInfo->numOfCurrent < resultCapacity) {
doFillOneRowResult(pFillInfo, output, pFillInfo->pData, pFillInfo->start, true); doFillOneRowResult(pFillInfo, pBlock, pFillInfo->pSrcBlock, pFillInfo->start, true);
} }
pFillInfo->numOfTotal += pFillInfo->numOfCurrent; pFillInfo->numOfTotal += pFillInfo->numOfCurrent;
@ -295,15 +353,15 @@ static int32_t setTagColumnInfo(SFillInfo* pFillInfo, int32_t numOfCols, int32_t
int32_t k = 0; int32_t k = 0;
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
SFillColInfo* pColInfo = &pFillInfo->pFillCol[i]; SFillColInfo* pColInfo = &pFillInfo->pFillCol[i];
pFillInfo->pData[i] = NULL; SResSchema* pSchema = &pColInfo->pExpr->base.resSchema;
if (TSDB_COL_IS_TAG(pColInfo->flag) || pColInfo->col.type == TSDB_DATA_TYPE_BINARY) { if (TSDB_COL_IS_TAG(pColInfo->flag) || pSchema->type == TSDB_DATA_TYPE_BINARY) {
numOfTags += 1; numOfTags += 1;
bool exists = false; bool exists = false;
int32_t index = -1; int32_t index = -1;
for (int32_t j = 0; j < k; ++j) { for (int32_t j = 0; j < k; ++j) {
if (pFillInfo->pTags[j].col.colId == pColInfo->col.slotId) { if (pFillInfo->pTags[j].col.colId == pSchema->slotId) {
exists = true; exists = true;
index = j; index = j;
break; break;
@ -311,12 +369,12 @@ static int32_t setTagColumnInfo(SFillInfo* pFillInfo, int32_t numOfCols, int32_t
} }
if (!exists) { if (!exists) {
SSchema* pSchema = &pFillInfo->pTags[k].col; SSchema* pSchema1 = &pFillInfo->pTags[k].col;
pSchema->colId = pColInfo->col.slotId; pSchema1->colId = pSchema->slotId;
pSchema->type = pColInfo->col.type; pSchema1->type = pSchema->type;
pSchema->bytes = pColInfo->col.bytes; pSchema1->bytes = pSchema->bytes;
pFillInfo->pTags[k].tagVal = taosMemoryCalloc(1, pColInfo->col.bytes); pFillInfo->pTags[k].tagVal = taosMemoryCalloc(1, pSchema->bytes);
pColInfo->tagIndex = k; pColInfo->tagIndex = k;
k += 1; k += 1;
@ -325,7 +383,7 @@ static int32_t setTagColumnInfo(SFillInfo* pFillInfo, int32_t numOfCols, int32_t
} }
} }
rowsize += pColInfo->col.bytes; rowsize += pSchema->bytes;
} }
pFillInfo->numOfTags = numOfTags; pFillInfo->numOfTags = numOfTags;
@ -355,7 +413,6 @@ struct SFillInfo* taosCreateFillInfo(int32_t order, TSKEY skey, int32_t numOfTag
} }
taosResetFillInfo(pFillInfo, skey); taosResetFillInfo(pFillInfo, skey);
pFillInfo->order = order; pFillInfo->order = order;
switch(fillType) { switch(fillType) {
@ -364,6 +421,7 @@ struct SFillInfo* taosCreateFillInfo(int32_t order, TSKEY skey, int32_t numOfTag
case FILL_MODE_NULL: pFillInfo->type = TSDB_FILL_NULL; break; case FILL_MODE_NULL: pFillInfo->type = TSDB_FILL_NULL; break;
case FILL_MODE_LINEAR: pFillInfo->type = TSDB_FILL_LINEAR;break; case FILL_MODE_LINEAR: pFillInfo->type = TSDB_FILL_LINEAR;break;
case FILL_MODE_NEXT: pFillInfo->type = TSDB_FILL_NEXT; break; case FILL_MODE_NEXT: pFillInfo->type = TSDB_FILL_NEXT; break;
case FILL_MODE_VALUE: pFillInfo->type = TSDB_FILL_SET_VALUE; break;
default: default:
terrno = TSDB_CODE_INVALID_PARA; terrno = TSDB_CODE_INVALID_PARA;
return NULL; return NULL;
@ -376,7 +434,6 @@ struct SFillInfo* taosCreateFillInfo(int32_t order, TSKEY skey, int32_t numOfTag
pFillInfo->alloc = capacity; pFillInfo->alloc = capacity;
pFillInfo->id = id; pFillInfo->id = id;
pFillInfo->interval = *pInterval; pFillInfo->interval = *pInterval;
pFillInfo->pData = taosMemoryMalloc(POINTER_BYTES * numOfCols);
// if (numOfTags > 0) { // if (numOfTags > 0) {
pFillInfo->pTags = taosMemoryCalloc(numOfCols, sizeof(SFillTagColInfo)); pFillInfo->pTags = taosMemoryCalloc(numOfCols, sizeof(SFillTagColInfo));
@ -385,6 +442,11 @@ struct SFillInfo* taosCreateFillInfo(int32_t order, TSKEY skey, int32_t numOfTag
} }
// } // }
pFillInfo->next = taosArrayInit(numOfCols, sizeof(SGroupKeys));
pFillInfo->prev = taosArrayInit(numOfCols, sizeof(SGroupKeys));
initBeforeAfterDataBuf(pFillInfo);
pFillInfo->rowSize = setTagColumnInfo(pFillInfo, pFillInfo->numOfCols, pFillInfo->alloc); pFillInfo->rowSize = setTagColumnInfo(pFillInfo, pFillInfo->numOfCols, pFillInfo->alloc);
assert(pFillInfo->rowSize > 0); assert(pFillInfo->rowSize > 0);
return pFillInfo; return pFillInfo;
@ -405,18 +467,15 @@ void* taosDestroyFillInfo(SFillInfo* pFillInfo) {
return NULL; return NULL;
} }
taosMemoryFreeClear(pFillInfo->prevValues); taosArrayDestroy(pFillInfo->prev);
taosMemoryFreeClear(pFillInfo->nextValues); taosArrayDestroy(pFillInfo->next);
for(int32_t i = 0; i < pFillInfo->numOfTags; ++i) { for(int32_t i = 0; i < pFillInfo->numOfTags; ++i) {
taosMemoryFreeClear(pFillInfo->pTags[i].tagVal); taosMemoryFreeClear(pFillInfo->pTags[i].tagVal);
} }
taosMemoryFreeClear(pFillInfo->pTags); taosMemoryFreeClear(pFillInfo->pTags);
taosMemoryFreeClear(pFillInfo->pData);
taosMemoryFreeClear(pFillInfo->pFillCol); taosMemoryFreeClear(pFillInfo->pFillCol);
taosMemoryFreeClear(pFillInfo); taosMemoryFreeClear(pFillInfo);
return NULL; return NULL;
} }
@ -436,18 +495,7 @@ void taosFillSetStartInfo(SFillInfo* pFillInfo, int32_t numOfRows, TSKEY endKey)
} }
void taosFillSetInputDataBlock(SFillInfo* pFillInfo, const SSDataBlock* pInput) { void taosFillSetInputDataBlock(SFillInfo* pFillInfo, const SSDataBlock* pInput) {
for (int32_t i = 0; i < pFillInfo->numOfCols; ++i) { pFillInfo->pSrcBlock = (SSDataBlock*) pInput;
SFillColInfo* pCol = &pFillInfo->pFillCol[i];
SColumnInfoData* pColData = taosArrayGet(pInput->pDataBlock, i);
pFillInfo->pData[i] = pColData->pData;
if (TSDB_COL_IS_TAG(pCol->flag)) { // copy the tag value to tag value buffer
SFillTagColInfo* pTag = &pFillInfo->pTags[pCol->tagIndex];
assert (pTag->col.colId == pCol->col.slotId);
memcpy(pTag->tagVal, pColData->pData, pCol->col.bytes); // TODO not memcpy??
}
}
} }
bool taosFillHasMoreResults(SFillInfo* pFillInfo) { bool taosFillHasMoreResults(SFillInfo* pFillInfo) {
@ -465,8 +513,9 @@ bool taosFillHasMoreResults(SFillInfo* pFillInfo) {
} }
int64_t getNumOfResultsAfterFillGap(SFillInfo* pFillInfo, TSKEY ekey, int32_t maxNumOfRows) { int64_t getNumOfResultsAfterFillGap(SFillInfo* pFillInfo, TSKEY ekey, int32_t maxNumOfRows) {
int64_t* tsList = (int64_t*) pFillInfo->pData[0]; SColumnInfoData* pCol = taosArrayGet(pFillInfo->pSrcBlock->pDataBlock, 0);
int64_t* tsList = (int64_t*) pCol->pData;
int32_t numOfRows = taosNumOfRemainRows(pFillInfo); int32_t numOfRows = taosNumOfRemainRows(pFillInfo);
TSKEY ekey1 = ekey; TSKEY ekey1 = ekey;
@ -513,7 +562,7 @@ int32_t taosGetLinearInterpolationVal(SPoint* point, int32_t outputType, SPoint*
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int64_t taosFillResultDataBlock(SFillInfo* pFillInfo, void** output, int32_t capacity) { int64_t taosFillResultDataBlock(SFillInfo* pFillInfo, SSDataBlock* p, int32_t capacity) {
int32_t remain = taosNumOfRemainRows(pFillInfo); int32_t remain = taosNumOfRemainRows(pFillInfo);
int64_t numOfRes = getNumOfResultsAfterFillGap(pFillInfo, pFillInfo->end, capacity); int64_t numOfRes = getNumOfResultsAfterFillGap(pFillInfo, pFillInfo->end, capacity);
@ -521,9 +570,9 @@ int64_t taosFillResultDataBlock(SFillInfo* pFillInfo, void** output, int32_t cap
// no data existed for fill operation now, append result according to the fill strategy // no data existed for fill operation now, append result according to the fill strategy
if (remain == 0) { if (remain == 0) {
appendFilledResult(pFillInfo, output, numOfRes); appendFilledResult(pFillInfo, p, numOfRes);
} else { } else {
fillResultImpl(pFillInfo, output, (int32_t) numOfRes); fillResultImpl(pFillInfo, p, (int32_t) numOfRes);
assert(numOfRes == pFillInfo->numOfCurrent); assert(numOfRes == pFillInfo->numOfCurrent);
} }
@ -538,28 +587,30 @@ int64_t getFillInfoStart(struct SFillInfo *pFillInfo) {
return pFillInfo->start; return pFillInfo->start;
} }
struct SFillColInfo* createFillColInfo(SExprInfo* pExpr, int32_t numOfOutput, const struct SValueNode* val) { SFillColInfo* createFillColInfo(SExprInfo* pExpr, int32_t numOfOutput, const struct SNodeListNode* pValNode) {
int32_t offset = 0; SFillColInfo* pFillCol = taosMemoryCalloc(numOfOutput, sizeof(SFillColInfo));
struct SFillColInfo* pFillCol = taosMemoryCalloc(numOfOutput, sizeof(SFillColInfo));
if (pFillCol == NULL) { if (pFillCol == NULL) {
return NULL; return NULL;
} }
size_t len = (pValNode != NULL)? LIST_LENGTH(pValNode->pNodeList):0;
for(int32_t i = 0; i < numOfOutput; ++i) { for(int32_t i = 0; i < numOfOutput; ++i) {
SExprInfo* pExprInfo = &pExpr[i]; SExprInfo* pExprInfo = &pExpr[i];
pFillCol[i].pExpr = pExprInfo;
pFillCol[i].col = pExprInfo->base.resSchema;
pFillCol[i].offset = offset;
pFillCol[i].tagIndex = -2; pFillCol[i].tagIndex = -2;
// todo refactor
if (len > 0) {
// if the user specified value is less than the column, alway use the last one as the fill value
int32_t index = (i >= len)? (len - 1):i;
SValueNode* pv = (SValueNode*)nodesListGetNode(pValNode->pNodeList, index);
valueNodeToVariant(pv, &pFillCol[i].fillVal);
}
if (pExprInfo->base.numOfParams > 0) { if (pExprInfo->base.numOfParams > 0) {
pFillCol[i].flag = pExprInfo->base.pParam[0].pCol->flag; // always be the normal column for table query pFillCol[i].flag = pExprInfo->base.pParam[0].pCol->flag; // always be the normal column for table query
} }
// pFillCol[i].functionId = pExprInfo->pExpr->_function.functionId;
// pFillCol[i].val.d = *val;
offset += pExprInfo->base.resSchema.bytes;
} }
return pFillCol; return pFillCol;

View File

@ -1521,7 +1521,7 @@ static int32_t physiFillNodeToJson(const void* pObj, SJson* pJson) {
static int32_t jsonToPhysiFillNode(const SJson* pJson, void* pObj) { static int32_t jsonToPhysiFillNode(const SJson* pJson, void* pObj) {
SFillPhysiNode* pNode = (SFillPhysiNode*)pObj; SFillPhysiNode* pNode = (SFillPhysiNode*)pObj;
int32_t code = jsonToPhysiWindowNode(pJson, pObj); int32_t code = jsonToPhysicPlanNode(pJson, pObj);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetNumberValue(pJson, jkFillPhysiPlanMode, pNode->mode); code = tjsonGetNumberValue(pJson, jkFillPhysiPlanMode, pNode->mode);
} }

View File

@ -945,7 +945,8 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType, int8_t ex
code = qStringToSubplan(qwMsg->msg, &plan); code = qStringToSubplan(qwMsg->msg, &plan);
if (TSDB_CODE_SUCCESS != code) { if (TSDB_CODE_SUCCESS != code) {
QW_TASK_ELOG("task string to subplan failed, code:%x - %s", code, tstrerror(code)); code = TSDB_CODE_INVALID_MSG;
QW_TASK_ELOG("task physical plan to subplan failed, code:%x - %s", code, tstrerror(code));
QW_ERR_JRET(code); QW_ERR_JRET(code);
} }