From 276b7a6dbb33b29cd61783c2d7490459705290d4 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 9 Nov 2020 11:31:13 +0800 Subject: [PATCH] [TD-1978] --- src/client/src/tscLocalMerge.c | 2 +- src/inc/ttype.h | 36 +++++++++++++ src/query/inc/qFill.h | 2 +- src/query/src/qExecutor.c | 2 +- src/query/src/qFill.c | 96 ++++++++++++++++++---------------- 5 files changed, 91 insertions(+), 47 deletions(-) create mode 100644 src/inc/ttype.h diff --git a/src/client/src/tscLocalMerge.c b/src/client/src/tscLocalMerge.c index ee72a96e47..10c5e7d32e 100644 --- a/src/client/src/tscLocalMerge.c +++ b/src/client/src/tscLocalMerge.c @@ -370,7 +370,7 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd SFillColInfo* pFillCol = createFillColInfo(pQueryInfo); pReducer->pFillInfo = taosInitFillInfo(pQueryInfo->order.order, revisedSTime, pQueryInfo->groupbyExpr.numOfGroupCols, 4096, (int32_t)numOfCols, pQueryInfo->interval.sliding, pQueryInfo->interval.slidingUnit, - tinfo.precision, pQueryInfo->fillType, pFillCol); + tinfo.precision, pQueryInfo->fillType, pFillCol, pSql); } } diff --git a/src/inc/ttype.h b/src/inc/ttype.h new file mode 100644 index 0000000000..caa7c58f40 --- /dev/null +++ b/src/inc/ttype.h @@ -0,0 +1,36 @@ +#ifndef TDENGINE_TTYPE_H +#define TDENGINE_TTYPE_H + +#ifdef __cplusplus +extern "C" { +#endif + +#include "taosdef.h" + +#define GET_TYPED_DATA(_v, _type, _data) \ + switch (_type) { \ + case TSDB_DATA_TYPE_TINYINT: \ + (_v) = GET_INT8_VAL(_data); \ + break; \ + case TSDB_DATA_TYPE_SMALLINT: \ + (_v) = GET_INT16_VAL(_data); \ + break; \ + case TSDB_DATA_TYPE_BIGINT: \ + (_v) = (GET_INT64_VAL(_data)); \ + break; \ + case TSDB_DATA_TYPE_FLOAT: \ + (_v) = GET_FLOAT_VAL(_data); \ + break; \ + case TSDB_DATA_TYPE_DOUBLE: \ + (_v) = GET_DOUBLE_VAL(_data); \ + break; \ + default: \ + (_v) = GET_INT32_VAL(_data); \ + break; \ + }; + +#ifdef __cplusplus +} +#endif + +#endif // TDENGINE_TTYPE_H diff --git a/src/query/inc/qFill.h b/src/query/inc/qFill.h index 9589d01cc4..385ae88543 100644 --- a/src/query/inc/qFill.h +++ b/src/query/inc/qFill.h @@ -70,7 +70,7 @@ typedef struct SPoint { SFillInfo* taosInitFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int32_t capacity, int32_t numOfCols, int64_t slidingTime, int8_t slidingUnit, int8_t precision, int32_t fillType, - SFillColInfo* pFillCol); + SFillColInfo* pFillCol, void* handle); void taosResetFillInfo(SFillInfo* pFillInfo, TSKEY startTimestamp); diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index e463495616..6191f536a3 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -4628,7 +4628,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bo pRuntimeEnv->pFillInfo = taosInitFillInfo(pQuery->order.order, w.skey, 0, (int32_t)pQuery->rec.capacity, pQuery->numOfOutput, pQuery->interval.sliding, pQuery->interval.slidingUnit, (int8_t)pQuery->precision, - pQuery->fillType, pColInfo); + pQuery->fillType, pColInfo, pQInfo); } setQueryStatus(pQuery, QUERY_NOT_COMPLETED); diff --git a/src/query/src/qFill.c b/src/query/src/qFill.c index cf9821b890..a4830f7d88 100644 --- a/src/query/src/qFill.c +++ b/src/query/src/qFill.c @@ -26,39 +26,10 @@ #define FILL_IS_ASC_FILL(_f) ((_f)->order == TSDB_ORDER_ASC) -SFillInfo* taosInitFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int32_t capacity, int32_t numOfCols, - int64_t slidingTime, int8_t slidingUnit, int8_t precision, int32_t fillType, - SFillColInfo* pCol) { - if (fillType == TSDB_FILL_NONE) { - return NULL; - } - - SFillInfo* pFillInfo = calloc(1, sizeof(SFillInfo)); - - taosResetFillInfo(pFillInfo, skey); - - pFillInfo->order = order; - pFillInfo->type = fillType; - pFillInfo->pFillCol = pCol; - pFillInfo->numOfTags = numOfTags; - pFillInfo->numOfCols = numOfCols; - pFillInfo->precision = precision; - - pFillInfo->interval.interval = slidingTime; - pFillInfo->interval.intervalUnit = slidingUnit; - pFillInfo->interval.sliding = slidingTime; - pFillInfo->interval.slidingUnit = slidingUnit; - - pFillInfo->pData = malloc(POINTER_BYTES * numOfCols); - if (numOfTags > 0) { - pFillInfo->pTags = calloc(pFillInfo->numOfTags, sizeof(SFillTagColInfo)); - for (int32_t i = 0; i < numOfTags; ++i) { - pFillInfo->pTags[i].col.colId = -2; // TODO - } - } - - // there are no duplicated tags in the SFillTagColInfo list +// there are no duplicated tags in the SFillTagColInfo list +static int32_t setTagColumnInfo(SFillInfo* pFillInfo, int32_t numOfCols, int32_t capacity) { int32_t rowsize = 0; + int32_t k = 0; for (int32_t i = 0; i < numOfCols; ++i) { SFillColInfo* pColInfo = &pFillInfo->pFillCol[i]; @@ -81,11 +52,49 @@ SFillInfo* taosInitFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int32_ k += 1; } } + rowsize += pColInfo->col.bytes; } - pFillInfo->rowSize = rowsize; - pFillInfo->alloc = capacity; + assert(k < pFillInfo->numOfTags); + return rowsize; +} + +SFillInfo* taosInitFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int32_t capacity, int32_t numOfCols, + int64_t slidingTime, int8_t slidingUnit, int8_t precision, int32_t fillType, + SFillColInfo* pCol, void* handle) { + if (fillType == TSDB_FILL_NONE) { + return NULL; + } + + SFillInfo* pFillInfo = calloc(1, sizeof(SFillInfo)); + + taosResetFillInfo(pFillInfo, skey); + + pFillInfo->order = order; + pFillInfo->type = fillType; + pFillInfo->pFillCol = pCol; + pFillInfo->numOfTags = numOfTags; + pFillInfo->numOfCols = numOfCols; + pFillInfo->precision = precision; + pFillInfo->alloc = capacity; + pFillInfo->handle = handle; + + pFillInfo->interval.interval = slidingTime; + pFillInfo->interval.intervalUnit = slidingUnit; + pFillInfo->interval.sliding = slidingTime; + pFillInfo->interval.slidingUnit = slidingUnit; + + pFillInfo->pData = malloc(POINTER_BYTES * numOfCols); + if (numOfTags > 0) { + pFillInfo->pTags = calloc(pFillInfo->numOfTags, sizeof(SFillTagColInfo)); + for (int32_t i = 0; i < numOfTags; ++i) { + pFillInfo->pTags[i].col.colId = -2; // TODO + } + } + + pFillInfo->rowSize = setTagColumnInfo(pFillInfo, pFillInfo->numOfCols, pFillInfo->alloc); + assert(pFillInfo->rowSize > 0); return pFillInfo; } @@ -350,7 +359,7 @@ static void initBeforeAfterDataBuf(SFillInfo* pFillInfo, char** next) { } } -static void copyCurrentRowIntoBuf(SFillInfo* pFillInfo, char** srcData, int32_t numOfTags, char* buf) { +static void copyCurrentRowIntoBuf(SFillInfo* pFillInfo, char** srcData, char* buf) { int32_t rowIndex = pFillInfo->index; for (int32_t i = 0; i < pFillInfo->numOfCols; ++i) { SFillColInfo* pCol = &pFillInfo->pFillCol[i]; @@ -365,7 +374,6 @@ static int32_t fillResultImpl(SFillInfo* pFillInfo, tFilePage** data, int32_t ou char** prev = &pFillInfo->prevValues; char** next = &pFillInfo->nextValues; - int32_t numOfTags = pFillInfo->numOfTags; int32_t step = GET_FORWARD_DIRECTION_FACTOR(pFillInfo->order); if (FILL_IS_ASC_FILL(pFillInfo)) { @@ -381,7 +389,7 @@ static int32_t fillResultImpl(SFillInfo* pFillInfo, tFilePage** data, int32_t ou (pFillInfo->currentKey > ts && !FILL_IS_ASC_FILL(pFillInfo))) { /* set the next value for interpolation */ initBeforeAfterDataBuf(pFillInfo, next); - copyCurrentRowIntoBuf(pFillInfo, srcData, numOfTags, *next); + copyCurrentRowIntoBuf(pFillInfo, srcData, *next); } if (((pFillInfo->currentKey < ts && FILL_IS_ASC_FILL(pFillInfo)) || (pFillInfo->currentKey > ts && !FILL_IS_ASC_FILL(pFillInfo))) && @@ -476,15 +484,15 @@ int64_t taosFillResultDataBlock(SFillInfo* pFillInfo, tFilePage** output, int32_ // no data existed for fill operation now, append result according to the fill strategy if (remain == 0) { - return fillExternalResults(pFillInfo, output, numOfRes); + fillExternalResults(pFillInfo, output, numOfRes); + } else { + fillResultImpl(pFillInfo, output, numOfRes); + assert(numOfRes == pFillInfo->numOfCurrent); } - fillResultImpl(pFillInfo, output, numOfRes); - assert(numOfRes == pFillInfo->numOfCurrent); - - qDebug("generated fill result, src block:%d, index:%d, startKey:%"PRId64", currentKey:%"PRId64", current:%d, total:%d", - pFillInfo->numOfRows, pFillInfo->index, pFillInfo->start, pFillInfo->currentKey, pFillInfo->numOfCurrent, - pFillInfo->numOfTotal); + qDebug("fill:%p, generated fill result, src block:%d, index:%d, brange:%"PRId64"-%"PRId64", currentKey:%"PRId64", current:%d, total:%d, %p", + pFillInfo, pFillInfo->numOfRows, pFillInfo->index, pFillInfo->start, pFillInfo->end, pFillInfo->currentKey, pFillInfo->numOfCurrent, + pFillInfo->numOfTotal, pFillInfo->handle); return numOfRes; }