[TD-1978]

This commit is contained in:
Haojun Liao 2020-11-09 11:31:13 +08:00
parent 7314a0dd08
commit 276b7a6dbb
5 changed files with 91 additions and 47 deletions

View File

@ -370,7 +370,7 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
SFillColInfo* pFillCol = createFillColInfo(pQueryInfo);
pReducer->pFillInfo = taosInitFillInfo(pQueryInfo->order.order, revisedSTime, pQueryInfo->groupbyExpr.numOfGroupCols,
4096, (int32_t)numOfCols, pQueryInfo->interval.sliding, pQueryInfo->interval.slidingUnit,
tinfo.precision, pQueryInfo->fillType, pFillCol);
tinfo.precision, pQueryInfo->fillType, pFillCol, pSql);
}
}

36
src/inc/ttype.h Normal file
View File

@ -0,0 +1,36 @@
#ifndef TDENGINE_TTYPE_H
#define TDENGINE_TTYPE_H
#ifdef __cplusplus
extern "C" {
#endif
#include "taosdef.h"
#define GET_TYPED_DATA(_v, _type, _data) \
switch (_type) { \
case TSDB_DATA_TYPE_TINYINT: \
(_v) = GET_INT8_VAL(_data); \
break; \
case TSDB_DATA_TYPE_SMALLINT: \
(_v) = GET_INT16_VAL(_data); \
break; \
case TSDB_DATA_TYPE_BIGINT: \
(_v) = (GET_INT64_VAL(_data)); \
break; \
case TSDB_DATA_TYPE_FLOAT: \
(_v) = GET_FLOAT_VAL(_data); \
break; \
case TSDB_DATA_TYPE_DOUBLE: \
(_v) = GET_DOUBLE_VAL(_data); \
break; \
default: \
(_v) = GET_INT32_VAL(_data); \
break; \
};
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_TTYPE_H

View File

@ -70,7 +70,7 @@ typedef struct SPoint {
SFillInfo* taosInitFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int32_t capacity, int32_t numOfCols,
int64_t slidingTime, int8_t slidingUnit, int8_t precision, int32_t fillType,
SFillColInfo* pFillCol);
SFillColInfo* pFillCol, void* handle);
void taosResetFillInfo(SFillInfo* pFillInfo, TSKEY startTimestamp);

View File

@ -4628,7 +4628,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bo
pRuntimeEnv->pFillInfo = taosInitFillInfo(pQuery->order.order, w.skey, 0, (int32_t)pQuery->rec.capacity, pQuery->numOfOutput,
pQuery->interval.sliding, pQuery->interval.slidingUnit, (int8_t)pQuery->precision,
pQuery->fillType, pColInfo);
pQuery->fillType, pColInfo, pQInfo);
}
setQueryStatus(pQuery, QUERY_NOT_COMPLETED);

View File

@ -26,39 +26,10 @@
#define FILL_IS_ASC_FILL(_f) ((_f)->order == TSDB_ORDER_ASC)
SFillInfo* taosInitFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int32_t capacity, int32_t numOfCols,
int64_t slidingTime, int8_t slidingUnit, int8_t precision, int32_t fillType,
SFillColInfo* pCol) {
if (fillType == TSDB_FILL_NONE) {
return NULL;
}
SFillInfo* pFillInfo = calloc(1, sizeof(SFillInfo));
taosResetFillInfo(pFillInfo, skey);
pFillInfo->order = order;
pFillInfo->type = fillType;
pFillInfo->pFillCol = pCol;
pFillInfo->numOfTags = numOfTags;
pFillInfo->numOfCols = numOfCols;
pFillInfo->precision = precision;
pFillInfo->interval.interval = slidingTime;
pFillInfo->interval.intervalUnit = slidingUnit;
pFillInfo->interval.sliding = slidingTime;
pFillInfo->interval.slidingUnit = slidingUnit;
pFillInfo->pData = malloc(POINTER_BYTES * numOfCols);
if (numOfTags > 0) {
pFillInfo->pTags = calloc(pFillInfo->numOfTags, sizeof(SFillTagColInfo));
for (int32_t i = 0; i < numOfTags; ++i) {
pFillInfo->pTags[i].col.colId = -2; // TODO
}
}
// there are no duplicated tags in the SFillTagColInfo list
// there are no duplicated tags in the SFillTagColInfo list
static int32_t setTagColumnInfo(SFillInfo* pFillInfo, int32_t numOfCols, int32_t capacity) {
int32_t rowsize = 0;
int32_t k = 0;
for (int32_t i = 0; i < numOfCols; ++i) {
SFillColInfo* pColInfo = &pFillInfo->pFillCol[i];
@ -81,11 +52,49 @@ SFillInfo* taosInitFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int32_
k += 1;
}
}
rowsize += pColInfo->col.bytes;
}
pFillInfo->rowSize = rowsize;
pFillInfo->alloc = capacity;
assert(k < pFillInfo->numOfTags);
return rowsize;
}
SFillInfo* taosInitFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int32_t capacity, int32_t numOfCols,
int64_t slidingTime, int8_t slidingUnit, int8_t precision, int32_t fillType,
SFillColInfo* pCol, void* handle) {
if (fillType == TSDB_FILL_NONE) {
return NULL;
}
SFillInfo* pFillInfo = calloc(1, sizeof(SFillInfo));
taosResetFillInfo(pFillInfo, skey);
pFillInfo->order = order;
pFillInfo->type = fillType;
pFillInfo->pFillCol = pCol;
pFillInfo->numOfTags = numOfTags;
pFillInfo->numOfCols = numOfCols;
pFillInfo->precision = precision;
pFillInfo->alloc = capacity;
pFillInfo->handle = handle;
pFillInfo->interval.interval = slidingTime;
pFillInfo->interval.intervalUnit = slidingUnit;
pFillInfo->interval.sliding = slidingTime;
pFillInfo->interval.slidingUnit = slidingUnit;
pFillInfo->pData = malloc(POINTER_BYTES * numOfCols);
if (numOfTags > 0) {
pFillInfo->pTags = calloc(pFillInfo->numOfTags, sizeof(SFillTagColInfo));
for (int32_t i = 0; i < numOfTags; ++i) {
pFillInfo->pTags[i].col.colId = -2; // TODO
}
}
pFillInfo->rowSize = setTagColumnInfo(pFillInfo, pFillInfo->numOfCols, pFillInfo->alloc);
assert(pFillInfo->rowSize > 0);
return pFillInfo;
}
@ -350,7 +359,7 @@ static void initBeforeAfterDataBuf(SFillInfo* pFillInfo, char** next) {
}
}
static void copyCurrentRowIntoBuf(SFillInfo* pFillInfo, char** srcData, int32_t numOfTags, char* buf) {
static void copyCurrentRowIntoBuf(SFillInfo* pFillInfo, char** srcData, char* buf) {
int32_t rowIndex = pFillInfo->index;
for (int32_t i = 0; i < pFillInfo->numOfCols; ++i) {
SFillColInfo* pCol = &pFillInfo->pFillCol[i];
@ -365,7 +374,6 @@ static int32_t fillResultImpl(SFillInfo* pFillInfo, tFilePage** data, int32_t ou
char** prev = &pFillInfo->prevValues;
char** next = &pFillInfo->nextValues;
int32_t numOfTags = pFillInfo->numOfTags;
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pFillInfo->order);
if (FILL_IS_ASC_FILL(pFillInfo)) {
@ -381,7 +389,7 @@ static int32_t fillResultImpl(SFillInfo* pFillInfo, tFilePage** data, int32_t ou
(pFillInfo->currentKey > ts && !FILL_IS_ASC_FILL(pFillInfo))) {
/* set the next value for interpolation */
initBeforeAfterDataBuf(pFillInfo, next);
copyCurrentRowIntoBuf(pFillInfo, srcData, numOfTags, *next);
copyCurrentRowIntoBuf(pFillInfo, srcData, *next);
}
if (((pFillInfo->currentKey < ts && FILL_IS_ASC_FILL(pFillInfo)) || (pFillInfo->currentKey > ts && !FILL_IS_ASC_FILL(pFillInfo))) &&
@ -476,15 +484,15 @@ int64_t taosFillResultDataBlock(SFillInfo* pFillInfo, tFilePage** output, int32_
// no data existed for fill operation now, append result according to the fill strategy
if (remain == 0) {
return fillExternalResults(pFillInfo, output, numOfRes);
fillExternalResults(pFillInfo, output, numOfRes);
} else {
fillResultImpl(pFillInfo, output, numOfRes);
assert(numOfRes == pFillInfo->numOfCurrent);
}
fillResultImpl(pFillInfo, output, numOfRes);
assert(numOfRes == pFillInfo->numOfCurrent);
qDebug("generated fill result, src block:%d, index:%d, startKey:%"PRId64", currentKey:%"PRId64", current:%d, total:%d",
pFillInfo->numOfRows, pFillInfo->index, pFillInfo->start, pFillInfo->currentKey, pFillInfo->numOfCurrent,
pFillInfo->numOfTotal);
qDebug("fill:%p, generated fill result, src block:%d, index:%d, brange:%"PRId64"-%"PRId64", currentKey:%"PRId64", current:%d, total:%d, %p",
pFillInfo, pFillInfo->numOfRows, pFillInfo->index, pFillInfo->start, pFillInfo->end, pFillInfo->currentKey, pFillInfo->numOfCurrent,
pFillInfo->numOfTotal, pFillInfo->handle);
return numOfRes;
}