Merge pull request #14805 from taosdata/feature/3_liaohj
fix(query): set the ts to be the time window start key value
This commit is contained in:
commit
be94e05e9b
|
@ -42,6 +42,7 @@ typedef struct SFillInfo {
|
||||||
TSKEY start; // start timestamp
|
TSKEY start; // start timestamp
|
||||||
TSKEY end; // endKey for fill
|
TSKEY end; // endKey for fill
|
||||||
TSKEY currentKey; // current active timestamp, the value may be changed during the fill procedure.
|
TSKEY currentKey; // current active timestamp, the value may be changed during the fill procedure.
|
||||||
|
int32_t tsSlotId; // primary time stamp slot id
|
||||||
int32_t order; // order [TSDB_ORDER_ASC|TSDB_ORDER_DESC]
|
int32_t order; // order [TSDB_ORDER_ASC|TSDB_ORDER_DESC]
|
||||||
int32_t type; // fill type
|
int32_t type; // fill type
|
||||||
int32_t numOfRows; // number of rows in the input data block
|
int32_t numOfRows; // number of rows in the input data block
|
||||||
|
@ -74,8 +75,8 @@ struct SFillColInfo* createFillColInfo(SExprInfo* pExpr, int32_t numOfOutput, co
|
||||||
bool taosFillHasMoreResults(struct SFillInfo* pFillInfo);
|
bool taosFillHasMoreResults(struct SFillInfo* pFillInfo);
|
||||||
|
|
||||||
SFillInfo* taosCreateFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int32_t capacity, int32_t numOfCols,
|
SFillInfo* taosCreateFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int32_t capacity, int32_t numOfCols,
|
||||||
SInterval* pInterval, int32_t fillType,
|
SInterval* pInterval, int32_t fillType, struct SFillColInfo* pCol, int32_t slotId,
|
||||||
struct SFillColInfo* pCol, const char* id);
|
const char* id);
|
||||||
|
|
||||||
void* taosDestroyFillInfo(struct SFillInfo *pFillInfo);
|
void* taosDestroyFillInfo(struct SFillInfo *pFillInfo);
|
||||||
int64_t taosFillResultDataBlock(struct SFillInfo* pFillInfo, SSDataBlock* p, int32_t capacity);
|
int64_t taosFillResultDataBlock(struct SFillInfo* pFillInfo, SSDataBlock* p, int32_t capacity);
|
||||||
|
|
|
@ -4013,10 +4013,12 @@ static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t
|
||||||
w = getFirstQualifiedTimeWindow(win.skey, &w, pInterval, TSDB_ORDER_ASC);
|
w = getFirstQualifiedTimeWindow(win.skey, &w, pInterval, TSDB_ORDER_ASC);
|
||||||
|
|
||||||
int32_t order = TSDB_ORDER_ASC;
|
int32_t order = TSDB_ORDER_ASC;
|
||||||
pInfo->pFillInfo = taosCreateFillInfo(order, w.skey, 0, capacity, numOfCols, pInterval, fillType, pColInfo, id);
|
pInfo->pFillInfo = taosCreateFillInfo(order, w.skey, 0, capacity, numOfCols, pInterval,
|
||||||
|
fillType, pColInfo, pInfo->primaryTsCol, id);
|
||||||
|
|
||||||
pInfo->win = win;
|
pInfo->win = win;
|
||||||
pInfo->p = taosMemoryCalloc(numOfCols, POINTER_BYTES);
|
pInfo->p = taosMemoryCalloc(numOfCols, POINTER_BYTES);
|
||||||
|
|
||||||
if (pInfo->pFillInfo == NULL || pInfo->p == NULL) {
|
if (pInfo->pFillInfo == NULL || pInfo->p == NULL) {
|
||||||
taosMemoryFree(pInfo->pFillInfo);
|
taosMemoryFree(pInfo->pFillInfo);
|
||||||
taosMemoryFree(pInfo->p);
|
taosMemoryFree(pInfo->p);
|
||||||
|
|
|
@ -14,6 +14,7 @@
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "os.h"
|
#include "os.h"
|
||||||
|
#include "query.h"
|
||||||
#include "taosdef.h"
|
#include "taosdef.h"
|
||||||
#include "tmsg.h"
|
#include "tmsg.h"
|
||||||
#include "ttypes.h"
|
#include "ttypes.h"
|
||||||
|
@ -48,14 +49,15 @@ static void setTagsValue(SFillInfo* pFillInfo, void** data, int32_t genRows) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static void setNullRow(SSDataBlock* pBlock, int32_t numOfCol, int32_t rowIndex) {
|
static void setNullRow(SSDataBlock* pBlock, int64_t ts, int32_t rowIndex) {
|
||||||
// the first are always the timestamp column, so start from the second column.
|
// the first are always the timestamp column, so start from the second column.
|
||||||
for (int32_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); ++i) {
|
for (int32_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); ++i) {
|
||||||
SColumnInfoData* p = taosArrayGet(pBlock->pDataBlock, i);
|
SColumnInfoData* p = taosArrayGet(pBlock->pDataBlock, i);
|
||||||
if (p->info.type == TSDB_DATA_TYPE_TIMESTAMP && i == 0) {
|
if (p->info.type == TSDB_DATA_TYPE_TIMESTAMP) {
|
||||||
continue;
|
colDataAppend(p, rowIndex, (const char*)&ts, false);
|
||||||
|
} else {
|
||||||
|
colDataAppendNULL(p, rowIndex);
|
||||||
}
|
}
|
||||||
colDataAppendNULL(p, rowIndex);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -64,16 +66,17 @@ static void setNullRow(SSDataBlock* pBlock, int32_t numOfCol, int32_t rowIndex)
|
||||||
|
|
||||||
static void doSetVal(SColumnInfoData* pDstColInfoData, int32_t rowIndex, const SGroupKeys* pKey);
|
static void doSetVal(SColumnInfoData* pDstColInfoData, int32_t rowIndex, const SGroupKeys* pKey);
|
||||||
|
|
||||||
static void doFillOneRowResult(SFillInfo* pFillInfo, SSDataBlock* pBlock, SSDataBlock* pSrcBlock, int64_t ts,
|
static void doFillOneRow(SFillInfo* pFillInfo, SSDataBlock* pBlock, SSDataBlock* pSrcBlock, int64_t ts,
|
||||||
bool outOfBound) {
|
bool outOfBound) {
|
||||||
SPoint point1, point2, point;
|
SPoint point1, point2, point;
|
||||||
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pFillInfo->order);
|
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pFillInfo->order);
|
||||||
|
|
||||||
// set the primary timestamp column value
|
// set the primary timestamp column value
|
||||||
int32_t index = pFillInfo->numOfCurrent;
|
int32_t index = pFillInfo->numOfCurrent;
|
||||||
SColumnInfoData* pCol0 = taosArrayGet(pBlock->pDataBlock, 0);
|
SColumnInfoData* pCol0 = taosArrayGet(pBlock->pDataBlock, pFillInfo->tsSlotId);
|
||||||
char* val = colDataGetData(pCol0, index);
|
char* val = colDataGetData(pCol0, index);
|
||||||
|
|
||||||
|
// set the primary timestamp value
|
||||||
*(TSKEY*)val = pFillInfo->currentKey;
|
*(TSKEY*)val = pFillInfo->currentKey;
|
||||||
|
|
||||||
// set the other values
|
// set the other values
|
||||||
|
@ -92,7 +95,7 @@ static void doFillOneRowResult(SFillInfo* pFillInfo, SSDataBlock* pBlock, SSData
|
||||||
}
|
}
|
||||||
} else if (pFillInfo->type == TSDB_FILL_NEXT) {
|
} else if (pFillInfo->type == TSDB_FILL_NEXT) {
|
||||||
SArray* p = FILL_IS_ASC_FILL(pFillInfo) ? pFillInfo->next : pFillInfo->prev;
|
SArray* p = FILL_IS_ASC_FILL(pFillInfo) ? pFillInfo->next : pFillInfo->prev;
|
||||||
|
// todo refactor: start from 0 not 1
|
||||||
for (int32_t i = 1; i < pFillInfo->numOfCols; ++i) {
|
for (int32_t i = 1; i < pFillInfo->numOfCols; ++i) {
|
||||||
SFillColInfo* pCol = &pFillInfo->pFillCol[i];
|
SFillColInfo* pCol = &pFillInfo->pFillCol[i];
|
||||||
if (TSDB_COL_IS_TAG(pCol->flag)) {
|
if (TSDB_COL_IS_TAG(pCol->flag)) {
|
||||||
|
@ -106,7 +109,7 @@ static void doFillOneRowResult(SFillInfo* pFillInfo, SSDataBlock* pBlock, SSData
|
||||||
} else if (pFillInfo->type == TSDB_FILL_LINEAR) {
|
} else if (pFillInfo->type == TSDB_FILL_LINEAR) {
|
||||||
// TODO : linear interpolation supports NULL value
|
// TODO : linear interpolation supports NULL value
|
||||||
if (outOfBound) {
|
if (outOfBound) {
|
||||||
setNullRow(pBlock, pFillInfo->numOfCols, index);
|
setNullRow(pBlock, pFillInfo->currentKey, index);
|
||||||
} else {
|
} else {
|
||||||
for (int32_t i = 1; i < pFillInfo->numOfCols; ++i) {
|
for (int32_t i = 1; i < pFillInfo->numOfCols; ++i) {
|
||||||
SFillColInfo* pCol = &pFillInfo->pFillCol[i];
|
SFillColInfo* pCol = &pFillInfo->pFillCol[i];
|
||||||
|
@ -143,7 +146,7 @@ static void doFillOneRowResult(SFillInfo* pFillInfo, SSDataBlock* pBlock, SSData
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else if (pFillInfo->type == TSDB_FILL_NULL) { // fill with NULL
|
} else if (pFillInfo->type == TSDB_FILL_NULL) { // fill with NULL
|
||||||
setNullRow(pBlock, pFillInfo->numOfCols, index);
|
setNullRow(pBlock, pFillInfo->currentKey, index);
|
||||||
} else { // fill with user specified value for each column
|
} else { // fill with user specified value for each column
|
||||||
for (int32_t i = 1; i < pFillInfo->numOfCols; ++i) {
|
for (int32_t i = 1; i < pFillInfo->numOfCols; ++i) {
|
||||||
SFillColInfo* pCol = &pFillInfo->pFillCol[i];
|
SFillColInfo* pCol = &pFillInfo->pFillCol[i];
|
||||||
|
@ -166,6 +169,8 @@ static void doFillOneRowResult(SFillInfo* pFillInfo, SSDataBlock* pBlock, SSData
|
||||||
int64_t v = 0;
|
int64_t v = 0;
|
||||||
GET_TYPED_DATA(v, int64_t, pVar->nType, &pVar->i);
|
GET_TYPED_DATA(v, int64_t, pVar->nType, &pVar->i);
|
||||||
colDataAppend(pDst, index, (char*)&v, false);
|
colDataAppend(pDst, index, (char*)&v, false);
|
||||||
|
} else if (pDst->info.type == TSDB_DATA_TYPE_TIMESTAMP) {
|
||||||
|
colDataAppend(pDst, index, (const char*)&pFillInfo->currentKey, false);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -247,7 +252,7 @@ static int32_t fillResultImpl(SFillInfo* pFillInfo, SSDataBlock* pBlock, int32_t
|
||||||
// fill the gap between two input rows
|
// fill the gap between two input rows
|
||||||
while (((pFillInfo->currentKey < ts && ascFill) || (pFillInfo->currentKey > ts && !ascFill)) &&
|
while (((pFillInfo->currentKey < ts && ascFill) || (pFillInfo->currentKey > ts && !ascFill)) &&
|
||||||
pFillInfo->numOfCurrent < outputRows) {
|
pFillInfo->numOfCurrent < outputRows) {
|
||||||
doFillOneRowResult(pFillInfo, pBlock, pFillInfo->pSrcBlock, ts, false);
|
doFillOneRow(pFillInfo, pBlock, pFillInfo->pSrcBlock, ts, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
// output buffer is full, abort
|
// output buffer is full, abort
|
||||||
|
@ -343,7 +348,7 @@ static int64_t appendFilledResult(SFillInfo* pFillInfo, SSDataBlock* pBlock, int
|
||||||
*/
|
*/
|
||||||
pFillInfo->numOfCurrent = 0;
|
pFillInfo->numOfCurrent = 0;
|
||||||
while (pFillInfo->numOfCurrent < resultCapacity) {
|
while (pFillInfo->numOfCurrent < resultCapacity) {
|
||||||
doFillOneRowResult(pFillInfo, pBlock, pFillInfo->pSrcBlock, pFillInfo->start, true);
|
doFillOneRow(pFillInfo, pBlock, pFillInfo->pSrcBlock, pFillInfo->start, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
pFillInfo->numOfTotal += pFillInfo->numOfCurrent;
|
pFillInfo->numOfTotal += pFillInfo->numOfCurrent;
|
||||||
|
@ -408,7 +413,7 @@ static int32_t taosNumOfRemainRows(SFillInfo* pFillInfo) {
|
||||||
}
|
}
|
||||||
|
|
||||||
struct SFillInfo* taosCreateFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int32_t capacity, int32_t numOfCols,
|
struct SFillInfo* taosCreateFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int32_t capacity, int32_t numOfCols,
|
||||||
SInterval* pInterval, int32_t fillType, struct SFillColInfo* pCol,
|
SInterval* pInterval, int32_t fillType, struct SFillColInfo* pCol, int32_t primaryTsSlotId,
|
||||||
const char* id) {
|
const char* id) {
|
||||||
if (fillType == TSDB_FILL_NONE) {
|
if (fillType == TSDB_FILL_NONE) {
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -420,6 +425,8 @@ struct SFillInfo* taosCreateFillInfo(int32_t order, TSKEY skey, int32_t numOfTag
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pFillInfo->tsSlotId = primaryTsSlotId;
|
||||||
|
|
||||||
taosResetFillInfo(pFillInfo, skey);
|
taosResetFillInfo(pFillInfo, skey);
|
||||||
pFillInfo->order = order;
|
pFillInfo->order = order;
|
||||||
|
|
||||||
|
@ -589,11 +596,10 @@ int64_t taosFillResultDataBlock(SFillInfo* pFillInfo, SSDataBlock* p, int32_t ca
|
||||||
assert(numOfRes == pFillInfo->numOfCurrent);
|
assert(numOfRes == pFillInfo->numOfCurrent);
|
||||||
}
|
}
|
||||||
|
|
||||||
// qDebug("fill:%p, generated fill result, src block:%d, index:%d, brange:%"PRId64"-%"PRId64", currentKey:%"PRId64",
|
qDebug("fill:%p, generated fill result, src block:%d, index:%d, brange:%" PRId64 "-%" PRId64 ", currentKey:%" PRId64
|
||||||
// current:%d, total:%d, %p",
|
", current : % d, total : % d, %s", pFillInfo,
|
||||||
// pFillInfo, pFillInfo->numOfRows, pFillInfo->index, pFillInfo->start, pFillInfo->end, pFillInfo->currentKey,
|
pFillInfo->numOfRows, pFillInfo->index, pFillInfo->start, pFillInfo->end, pFillInfo->currentKey,
|
||||||
// pFillInfo->numOfCurrent,
|
pFillInfo->numOfCurrent, pFillInfo->numOfTotal, pFillInfo->id);
|
||||||
// pFillInfo->numOfTotal, pFillInfo->handle);
|
|
||||||
|
|
||||||
return numOfRes;
|
return numOfRes;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue