diff --git a/src/client/src/tscLocalMerge.c b/src/client/src/tscLocalMerge.c index 4aa751574c..6280cc520a 100644 --- a/src/client/src/tscLocalMerge.c +++ b/src/client/src/tscLocalMerge.c @@ -979,14 +979,13 @@ static void doFillResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool doneO pQueryInfo->limit.offset -= newRows; pRes->numOfRows = 0; - int32_t rpoints = taosNumOfRemainRows(pFillInfo); - if (rpoints <= 0) { + if (!taosFillHasMoreResults(pFillInfo)) { if (!doneOutput) { // reduce procedure has not completed yet, but current results for fill are exhausted break; } // all output in current group are completed - int32_t totalRemainRows = (int32_t)getNumOfResWithFill(pFillInfo, actualETime, pLocalReducer->resColModel->capacity); + int32_t totalRemainRows = (int32_t)getNumOfResultsAfterFillGap(pFillInfo, actualETime, pLocalReducer->resColModel->capacity); if (totalRemainRows <= 0) { break; } @@ -1337,14 +1336,14 @@ static bool doBuildFilledResultForGroup(SSqlObj *pSql) { SLocalReducer *pLocalReducer = pRes->pLocalReducer; SFillInfo *pFillInfo = pLocalReducer->pFillInfo; - if (pFillInfo != NULL && taosNumOfRemainRows(pFillInfo) > 0) { + if (pFillInfo != NULL && taosFillHasMoreResults(pFillInfo)) { assert(pQueryInfo->fillType != TSDB_FILL_NONE); tFilePage *pFinalDataBuf = pLocalReducer->pResultBuf; int64_t etime = *(int64_t *)(pFinalDataBuf->data + TSDB_KEYSIZE * (pFillInfo->numOfRows - 1)); // the first column must be the timestamp column - int32_t rows = (int32_t) getNumOfResWithFill(pFillInfo, etime, pLocalReducer->resColModel->capacity); + int32_t rows = (int32_t) getNumOfResultsAfterFillGap(pFillInfo, etime, pLocalReducer->resColModel->capacity); if (rows > 0) { // do fill gap doFillResult(pSql, pLocalReducer, false); } @@ -1373,7 +1372,7 @@ static bool doHandleLastRemainData(SSqlObj *pSql) { ((pRes->numOfRowsGroup < pQueryInfo->limit.limit && pQueryInfo->limit.limit > 0) || (pQueryInfo->limit.limit < 0))) { int64_t etime = (pQueryInfo->order.order == TSDB_ORDER_ASC)? pQueryInfo->window.ekey : pQueryInfo->window.skey; - int32_t rows = (int32_t)getNumOfResWithFill(pFillInfo, etime, pLocalReducer->resColModel->capacity); + int32_t rows = (int32_t)getNumOfResultsAfterFillGap(pFillInfo, etime, pLocalReducer->resColModel->capacity); if (rows > 0) { doFillResult(pSql, pLocalReducer, true); } diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index a09e96d678..44b21c1337 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -4514,6 +4514,8 @@ int32_t parseFillClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySQL* pQuery } } else if (strncasecmp(pItem->pVar.pz, "prev", 4) == 0 && pItem->pVar.nLen == 4) { pQueryInfo->fillType = TSDB_FILL_PREV; + } else if (strncasecmp(pItem->pVar.pz, "next", 4) == 0 && pItem->pVar.nLen == 4) { + pQueryInfo->fillType = TSDB_FILL_NEXT; } else if (strncasecmp(pItem->pVar.pz, "linear", 6) == 0 && pItem->pVar.nLen == 6) { pQueryInfo->fillType = TSDB_FILL_LINEAR; } else if (strncasecmp(pItem->pVar.pz, "value", 5) == 0 && pItem->pVar.nLen == 5) { diff --git a/src/common/src/ttypes.c b/src/common/src/ttypes.c index 14108abd8c..0d5910ea38 100644 --- a/src/common/src/ttypes.c +++ b/src/common/src/ttypes.c @@ -524,15 +524,18 @@ void assignVal(char *val, const char *src, int32_t len, int32_t type) { switch (type) { case TSDB_DATA_TYPE_BOOL: case TSDB_DATA_TYPE_TINYINT: + case TSDB_DATA_TYPE_UTINYINT: *((int8_t *)val) = GET_INT8_VAL(src); break; case TSDB_DATA_TYPE_SMALLINT: + case TSDB_DATA_TYPE_USMALLINT: *((int16_t *)val) = GET_INT16_VAL(src); break; - case TSDB_DATA_TYPE_INT: { + case TSDB_DATA_TYPE_INT: + case TSDB_DATA_TYPE_UINT: *((int32_t *)val) = GET_INT32_VAL(src); break; - } + case TSDB_DATA_TYPE_FLOAT: SET_FLOAT_VAL(val, GET_FLOAT_VAL(src)); break; @@ -540,6 +543,7 @@ void assignVal(char *val, const char *src, int32_t len, int32_t type) { SET_DOUBLE_VAL(val, GET_DOUBLE_VAL(src)); break; case TSDB_DATA_TYPE_BIGINT: + case TSDB_DATA_TYPE_UBIGINT: case TSDB_DATA_TYPE_TIMESTAMP: *((int64_t *)val) = GET_INT64_VAL(src); break; diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index ef6e0da29b..4582efbc40 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -153,30 +153,31 @@ enum _mgmt_table { #define TSDB_ALTER_TABLE_DROP_COLUMN 6 #define TSDB_ALTER_TABLE_CHANGE_COLUMN 7 -#define TSDB_FILL_NONE 0 -#define TSDB_FILL_NULL 1 -#define TSDB_FILL_SET_VALUE 2 -#define TSDB_FILL_LINEAR 3 -#define TSDB_FILL_PREV 4 +#define TSDB_FILL_NONE 0 +#define TSDB_FILL_NULL 1 +#define TSDB_FILL_SET_VALUE 2 +#define TSDB_FILL_LINEAR 3 +#define TSDB_FILL_PREV 4 +#define TSDB_FILL_NEXT 5 -#define TSDB_ALTER_USER_PASSWD 0x1 +#define TSDB_ALTER_USER_PASSWD 0x1 #define TSDB_ALTER_USER_PRIVILEGES 0x2 -#define TSDB_KILL_MSG_LEN 30 +#define TSDB_KILL_MSG_LEN 30 -#define TSDB_VN_READ_ACCCESS ((char)0x1) -#define TSDB_VN_WRITE_ACCCESS ((char)0x2) +#define TSDB_VN_READ_ACCCESS ((char)0x1) +#define TSDB_VN_WRITE_ACCCESS ((char)0x2) #define TSDB_VN_ALL_ACCCESS (TSDB_VN_READ_ACCCESS | TSDB_VN_WRITE_ACCCESS) -#define TSDB_COL_NORMAL 0x0u // the normal column of the table -#define TSDB_COL_TAG 0x1u // the tag column type -#define TSDB_COL_UDC 0x2u // the user specified normal string column, it is a dummy column -#define TSDB_COL_NULL 0x4u // the column filter NULL or not +#define TSDB_COL_NORMAL 0x0u // the normal column of the table +#define TSDB_COL_TAG 0x1u // the tag column type +#define TSDB_COL_UDC 0x2u // the user specified normal string column, it is a dummy column +#define TSDB_COL_NULL 0x4u // the column filter NULL or not -#define TSDB_COL_IS_TAG(f) (((f&(~(TSDB_COL_NULL)))&TSDB_COL_TAG) != 0) -#define TSDB_COL_IS_NORMAL_COL(f) ((f&(~(TSDB_COL_NULL))) == TSDB_COL_NORMAL) -#define TSDB_COL_IS_UD_COL(f) ((f&(~(TSDB_COL_NULL))) == TSDB_COL_UDC) -#define TSDB_COL_REQ_NULL(f) (((f)&TSDB_COL_NULL) != 0) +#define TSDB_COL_IS_TAG(f) (((f&(~(TSDB_COL_NULL)))&TSDB_COL_TAG) != 0) +#define TSDB_COL_IS_NORMAL_COL(f) ((f&(~(TSDB_COL_NULL))) == TSDB_COL_NORMAL) +#define TSDB_COL_IS_UD_COL(f) ((f&(~(TSDB_COL_NULL))) == TSDB_COL_UDC) +#define TSDB_COL_REQ_NULL(f) (((f)&TSDB_COL_NULL) != 0) extern char *taosMsg[]; diff --git a/src/query/inc/qFill.h b/src/query/inc/qFill.h index 385ae88543..93537ec3da 100644 --- a/src/query/inc/qFill.h +++ b/src/query/inc/qFill.h @@ -82,9 +82,9 @@ void taosFillCopyInputDataFromFilePage(SFillInfo* pFillInfo, const tFilePage** p void taosFillCopyInputDataFromOneFilePage(SFillInfo* pFillInfo, const tFilePage* pInput); -int64_t getNumOfResWithFill(SFillInfo* pFillInfo, int64_t ekey, int32_t maxNumOfRows); +bool taosFillHasMoreResults(SFillInfo* pFillInfo); -int32_t taosNumOfRemainRows(SFillInfo *pFillInfo); +int64_t getNumOfResultsAfterFillGap(SFillInfo* pFillInfo, int64_t ekey, int32_t maxNumOfRows); int32_t taosGetLinearInterpolationVal(int32_t type, SPoint *point1, SPoint *point2, SPoint *point); diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index e3cf6fd254..0cf8112eb4 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -4176,7 +4176,7 @@ static void stableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBloc } } -bool queryHasRemainResForTableQuery(SQueryRuntimeEnv* pRuntimeEnv) { +bool hasNotReturnedResults(SQueryRuntimeEnv* pRuntimeEnv) { SQuery *pQuery = pRuntimeEnv->pQuery; SFillInfo *pFillInfo = pRuntimeEnv->pFillInfo; @@ -4186,8 +4186,7 @@ bool queryHasRemainResForTableQuery(SQueryRuntimeEnv* pRuntimeEnv) { if (pQuery->fillType != TSDB_FILL_NONE && !isPointInterpoQuery(pQuery)) { // There are results not returned to client yet, so filling applied to the remain result is required firstly. - int32_t remain = taosNumOfRemainRows(pFillInfo); - if (remain > 0) { + if (taosFillHasMoreResults(pFillInfo)) { return true; } @@ -4201,7 +4200,7 @@ bool queryHasRemainResForTableQuery(SQueryRuntimeEnv* pRuntimeEnv) { * first result row in the actual result set will fill nothing. */ if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) { - int32_t numOfTotal = (int32_t)getNumOfResWithFill(pFillInfo, pQuery->window.ekey, (int32_t)pQuery->rec.capacity); + int32_t numOfTotal = (int32_t)getNumOfResultsAfterFillGap(pFillInfo, pQuery->window.ekey, (int32_t)pQuery->rec.capacity); return numOfTotal > 0; } @@ -4269,7 +4268,7 @@ static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data setQueryStatus(pQuery, QUERY_OVER); } } else { - if (!queryHasRemainResForTableQuery(&pQInfo->runtimeEnv)) { + if (!hasNotReturnedResults(&pQInfo->runtimeEnv)) { setQueryStatus(pQuery, QUERY_OVER); } } @@ -4296,7 +4295,6 @@ int32_t doFillGapsInResults(SQueryRuntimeEnv* pRuntimeEnv, tFilePage **pDst, int pQInfo, pFillInfo->numOfRows, ret, pQuery->limit.offset, ret - pQuery->limit.offset, 0); ret -= (int32_t)pQuery->limit.offset; - // todo !!!!there exactly number of interpo is not valid. for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { memmove(pDst[i]->data, pDst[i]->data + pQuery->pExpr1[i].bytes * pQuery->limit.offset, ret * pQuery->pExpr1[i].bytes); @@ -4314,7 +4312,7 @@ int32_t doFillGapsInResults(SQueryRuntimeEnv* pRuntimeEnv, tFilePage **pDst, int ret = 0; } - if (!queryHasRemainResForTableQuery(pRuntimeEnv)) { + if (!hasNotReturnedResults(pRuntimeEnv)) { return ret; } } @@ -5774,7 +5772,7 @@ static void tableQueryImpl(SQInfo *pQInfo) { SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; SQuery * pQuery = pRuntimeEnv->pQuery; - if (queryHasRemainResForTableQuery(pRuntimeEnv)) { + if (hasNotReturnedResults(pRuntimeEnv)) { if (pQuery->fillType != TSDB_FILL_NONE) { /* * There are remain results that are not returned due to result interpolation diff --git a/src/query/src/qFill.c b/src/query/src/qFill.c index ca1203cb17..65b58467b7 100644 --- a/src/query/src/qFill.c +++ b/src/query/src/qFill.c @@ -25,6 +25,251 @@ #include "queryLog.h" #define FILL_IS_ASC_FILL(_f) ((_f)->order == TSDB_ORDER_ASC) +#define DO_INTERPOLATION(_v1, _v2, _k1, _k2, _k) ((_v1) + ((_v2) - (_v1)) * (((double)(_k)) - ((double)(_k1))) / (((double)(_k2)) - ((double)(_k1)))) + +static void setTagsValue(SFillInfo* pFillInfo, tFilePage** data, int32_t genRows) { + for(int32_t j = 0; j < pFillInfo->numOfCols; ++j) { + SFillColInfo* pCol = &pFillInfo->pFillCol[j]; + if (TSDB_COL_IS_NORMAL_COL(pCol->flag)) { + continue; + } + + char* val1 = elePtrAt(data[j]->data, pCol->col.bytes, genRows); + + assert(pCol->tagIndex >= 0 && pCol->tagIndex < pFillInfo->numOfTags); + SFillTagColInfo* pTag = &pFillInfo->pTags[pCol->tagIndex]; + + assert (pTag->col.colId == pCol->col.colId); + assignVal(val1, pTag->tagVal, pCol->col.bytes, pCol->col.type); + } +} + +static void setNullValueForRow(SFillInfo* pFillInfo, tFilePage** data, int32_t numOfCol, int32_t rowIndex) { + // the first are always the timestamp column, so start from the second column. + for (int32_t i = 1; i < numOfCol; ++i) { + SFillColInfo* pCol = &pFillInfo->pFillCol[i]; + + char* output = elePtrAt(data[i]->data, pCol->col.bytes, rowIndex); + setNull(output, pCol->col.type, pCol->col.bytes); + } +} + +static void doFillOneRowResult(SFillInfo* pFillInfo, tFilePage** data, char** srcData, int64_t ts, bool outOfBound) { + char* prev = pFillInfo->prevValues; + char* next = pFillInfo->nextValues; + + SPoint point1, point2, point; + int32_t step = GET_FORWARD_DIRECTION_FACTOR(pFillInfo->order); + + // set the primary timestamp column value + int32_t index = pFillInfo->numOfCurrent; + char* val = elePtrAt(data[0]->data, TSDB_KEYSIZE, index); + *(TSKEY*) val = pFillInfo->currentKey; + + // set the other values + if (pFillInfo->type == TSDB_FILL_PREV) { + char* p = FILL_IS_ASC_FILL(pFillInfo) ? prev : next; + + if (p != NULL) { + for (int32_t i = 1; i < pFillInfo->numOfCols; ++i) { + SFillColInfo* pCol = &pFillInfo->pFillCol[i]; + if (TSDB_COL_IS_TAG(pCol->flag)) { + continue; + } + + char* output = elePtrAt(data[i]->data, pCol->col.bytes, index); + assignVal(output, p + pCol->col.offset, pCol->col.bytes, pCol->col.type); + } + } else { // no prev value yet, set the value for NULL + setNullValueForRow(pFillInfo, data, pFillInfo->numOfCols, index); + } + } else if (pFillInfo->type == TSDB_FILL_NEXT) { + char* p = FILL_IS_ASC_FILL(pFillInfo)? next : prev; + + if (p != NULL) { + for (int32_t i = 1; i < pFillInfo->numOfCols; ++i) { + SFillColInfo* pCol = &pFillInfo->pFillCol[i]; + if (TSDB_COL_IS_TAG(pCol->flag)) { + continue; + } + + char* output = elePtrAt(data[i]->data, pCol->col.bytes, index); + assignVal(output, p + pCol->col.offset, pCol->col.bytes, pCol->col.type); + } + } else { // no prev value yet, set the value for NULL + setNullValueForRow(pFillInfo, data, pFillInfo->numOfCols, index); + } + } else if (pFillInfo->type == TSDB_FILL_LINEAR) { + // TODO : linear interpolation supports NULL value + if (prev != NULL && !outOfBound) { + for (int32_t i = 1; i < pFillInfo->numOfCols; ++i) { + SFillColInfo* pCol = &pFillInfo->pFillCol[i]; + if (TSDB_COL_IS_TAG(pCol->flag)) { + continue; + } + + int16_t type = pCol->col.type; + int16_t bytes = pCol->col.bytes; + + char *val1 = elePtrAt(data[i]->data, pCol->col.bytes, index); + if (type == TSDB_DATA_TYPE_BINARY|| type == TSDB_DATA_TYPE_NCHAR || type == TSDB_DATA_TYPE_BOOL) { + setNull(val1, pCol->col.type, bytes); + continue; + } + + point1 = (SPoint){.key = *(TSKEY*)(prev), .val = prev + pCol->col.offset}; + point2 = (SPoint){.key = ts, .val = srcData[i] + pFillInfo->index * bytes}; + point = (SPoint){.key = pFillInfo->currentKey, .val = val1}; + taosGetLinearInterpolationVal(type, &point1, &point2, &point); + } + } else { + setNullValueForRow(pFillInfo, data, pFillInfo->numOfCols, index); + } + } else { /* fill the default value */ + for (int32_t i = 1; i < pFillInfo->numOfCols; ++i) { + SFillColInfo* pCol = &pFillInfo->pFillCol[i]; + if (TSDB_COL_IS_TAG(pCol->flag)) { + continue; + } + + char* val1 = elePtrAt(data[i]->data, pCol->col.bytes, index); + assignVal(val1, (char*)&pCol->fillVal.i, pCol->col.bytes, pCol->col.type); + } + } + + setTagsValue(pFillInfo, data, index); + pFillInfo->currentKey = taosTimeAdd(pFillInfo->currentKey, pFillInfo->interval.sliding * step, pFillInfo->interval.slidingUnit, pFillInfo->precision); + pFillInfo->numOfCurrent++; +} + +static void initBeforeAfterDataBuf(SFillInfo* pFillInfo, char** next) { + if (*next != NULL) { + return; + } + + *next = calloc(1, pFillInfo->rowSize); + for (int i = 1; i < pFillInfo->numOfCols; i++) { + SFillColInfo* pCol = &pFillInfo->pFillCol[i]; + setNull(*next + pCol->col.offset, pCol->col.type, pCol->col.bytes); + } +} + +static void copyCurrentRowIntoBuf(SFillInfo* pFillInfo, char** srcData, char* buf) { + int32_t rowIndex = pFillInfo->index; + for (int32_t i = 0; i < pFillInfo->numOfCols; ++i) { + SFillColInfo* pCol = &pFillInfo->pFillCol[i]; + memcpy(buf + pCol->col.offset, srcData[i] + rowIndex * pCol->col.bytes, pCol->col.bytes); + } +} + +static int32_t fillResultImpl(SFillInfo* pFillInfo, tFilePage** data, int32_t outputRows) { + pFillInfo->numOfCurrent = 0; + + char** srcData = pFillInfo->pData; + char** prev = &pFillInfo->prevValues; + char** next = &pFillInfo->nextValues; + + int32_t step = GET_FORWARD_DIRECTION_FACTOR(pFillInfo->order); + + if (FILL_IS_ASC_FILL(pFillInfo)) { + assert(pFillInfo->currentKey >= pFillInfo->start); + } else { + assert(pFillInfo->currentKey <= pFillInfo->start); + } + + while (pFillInfo->numOfCurrent < outputRows) { + int64_t ts = ((int64_t*)pFillInfo->pData[0])[pFillInfo->index]; + + // set the next value for interpolation + if ((pFillInfo->currentKey < ts && FILL_IS_ASC_FILL(pFillInfo)) || + (pFillInfo->currentKey > ts && !FILL_IS_ASC_FILL(pFillInfo))) { + initBeforeAfterDataBuf(pFillInfo, next); + copyCurrentRowIntoBuf(pFillInfo, srcData, *next); + } + + if (((pFillInfo->currentKey < ts && FILL_IS_ASC_FILL(pFillInfo)) || (pFillInfo->currentKey > ts && !FILL_IS_ASC_FILL(pFillInfo))) && + pFillInfo->numOfCurrent < outputRows) { + + // fill the gap between two actual input rows + while (((pFillInfo->currentKey < ts && FILL_IS_ASC_FILL(pFillInfo)) || + (pFillInfo->currentKey > ts && !FILL_IS_ASC_FILL(pFillInfo))) && + pFillInfo->numOfCurrent < outputRows) { + doFillOneRowResult(pFillInfo, data, srcData, ts, false); + } + + // output buffer is full, abort + if (pFillInfo->numOfCurrent == outputRows) { + pFillInfo->numOfTotal += pFillInfo->numOfCurrent; + return outputRows; + } + } else { + assert(pFillInfo->currentKey == ts); + initBeforeAfterDataBuf(pFillInfo, prev); + + // assign rows to dst buffer + for (int32_t i = 0; i < pFillInfo->numOfCols; ++i) { + SFillColInfo* pCol = &pFillInfo->pFillCol[i]; + if (TSDB_COL_IS_TAG(pCol->flag)) { + continue; + } + + char* output = elePtrAt(data[i]->data, pCol->col.bytes, pFillInfo->numOfCurrent); + char* src = elePtrAt(srcData[i], pCol->col.bytes, pFillInfo->index); + + if (i == 0 || (pCol->functionId != TSDB_FUNC_COUNT && !isNull(src, pCol->col.type)) || + (pCol->functionId == TSDB_FUNC_COUNT && GET_INT64_VAL(src) != 0)) { + assignVal(output, src, pCol->col.bytes, pCol->col.type); + memcpy(*prev + pCol->col.offset, src, pCol->col.bytes); + } else { // i > 0 and data is null , do interpolation + if (pFillInfo->type == TSDB_FILL_PREV) { + assignVal(output, *prev + pCol->col.offset, pCol->col.bytes, pCol->col.type); + } else if (pFillInfo->type == TSDB_FILL_LINEAR) { + assignVal(output, src, pCol->col.bytes, pCol->col.type); + memcpy(*prev + pCol->col.offset, src, pCol->col.bytes); + } else { + assignVal(output, (char*)&pCol->fillVal.i, pCol->col.bytes, pCol->col.type); + } + } + } + + // set the tag value for final result + setTagsValue(pFillInfo, data, pFillInfo->numOfCurrent); + + pFillInfo->currentKey = taosTimeAdd(pFillInfo->currentKey, pFillInfo->interval.sliding * step, + pFillInfo->interval.slidingUnit, pFillInfo->precision); + pFillInfo->index += 1; + pFillInfo->numOfCurrent += 1; + } + + if (pFillInfo->index >= pFillInfo->numOfRows || pFillInfo->numOfCurrent >= outputRows) { + /* the raw data block is exhausted, next value does not exists */ + if (pFillInfo->index >= pFillInfo->numOfRows) { + tfree(*next); + } + + pFillInfo->numOfTotal += pFillInfo->numOfCurrent; + return pFillInfo->numOfCurrent; + } + } + + return pFillInfo->numOfCurrent; +} + +static int64_t appendFilledResult(SFillInfo* pFillInfo, tFilePage** output, int64_t resultCapacity) { + /* + * These data are generated according to fill strategy, since the current timestamp is out of the time window of + * real result set. Note that we need to keep the direct previous result rows, to generated the filled data. + */ + pFillInfo->numOfCurrent = 0; + while (pFillInfo->numOfCurrent < resultCapacity) { + doFillOneRowResult(pFillInfo, output, pFillInfo->pData, pFillInfo->start, true); + } + + pFillInfo->numOfTotal += pFillInfo->numOfCurrent; + + assert(pFillInfo->numOfCurrent == resultCapacity); + return resultCapacity; +} // there are no duplicated tags in the SFillTagColInfo list static int32_t setTagColumnInfo(SFillInfo* pFillInfo, int32_t numOfCols, int32_t capacity) { @@ -68,6 +313,14 @@ static int32_t setTagColumnInfo(SFillInfo* pFillInfo, int32_t numOfCols, int32_t return rowsize; } +static int32_t taosNumOfRemainRows(SFillInfo* pFillInfo) { + if (pFillInfo->numOfRows == 0 || (pFillInfo->numOfRows > 0 && pFillInfo->index >= pFillInfo->numOfRows)) { + return 0; + } + + return pFillInfo->numOfRows - pFillInfo->index; +} + SFillInfo* taosInitFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int32_t capacity, int32_t numOfCols, int64_t slidingTime, int8_t slidingUnit, int8_t precision, int32_t fillType, SFillColInfo* pCol, void* handle) { @@ -76,22 +329,21 @@ SFillInfo* taosInitFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int32_ } SFillInfo* pFillInfo = calloc(1, sizeof(SFillInfo)); - taosResetFillInfo(pFillInfo, skey); - pFillInfo->order = order; - pFillInfo->type = fillType; - pFillInfo->pFillCol = pCol; + pFillInfo->order = order; + pFillInfo->type = fillType; + pFillInfo->pFillCol = pCol; pFillInfo->numOfTags = numOfTags; pFillInfo->numOfCols = numOfCols; pFillInfo->precision = precision; pFillInfo->alloc = capacity; pFillInfo->handle = handle; - pFillInfo->interval.interval = slidingTime; + pFillInfo->interval.interval = slidingTime; pFillInfo->interval.intervalUnit = slidingUnit; - pFillInfo->interval.sliding = slidingTime; - pFillInfo->interval.slidingUnit = slidingUnit; + pFillInfo->interval.sliding = slidingTime; + pFillInfo->interval.slidingUnit = slidingUnit; pFillInfo->pData = malloc(POINTER_BYTES * numOfCols); if (numOfTags > 0) { @@ -185,7 +437,11 @@ void taosFillCopyInputDataFromOneFilePage(SFillInfo* pFillInfo, const tFilePage* } } -int64_t getNumOfResWithFill(SFillInfo* pFillInfo, TSKEY ekey, int32_t maxNumOfRows) { +bool taosFillHasMoreResults(SFillInfo* pFillInfo) { + return taosNumOfRemainRows(pFillInfo) > 0; +} + +int64_t getNumOfResultsAfterFillGap(SFillInfo* pFillInfo, TSKEY ekey, int32_t maxNumOfRows) { int64_t* tsList = (int64_t*) pFillInfo->pData[0]; int32_t numOfRows = taosNumOfRemainRows(pFillInfo); @@ -223,16 +479,6 @@ int64_t getNumOfResWithFill(SFillInfo* pFillInfo, TSKEY ekey, int32_t maxNumOfRo return (numOfRes > maxNumOfRows) ? maxNumOfRows : numOfRes; } -int32_t taosNumOfRemainRows(SFillInfo* pFillInfo) { - if (pFillInfo->numOfRows == 0 || (pFillInfo->numOfRows > 0 && pFillInfo->index >= pFillInfo->numOfRows)) { - return 0; - } - - return pFillInfo->numOfRows - pFillInfo->index; -} - -#define DO_INTERPOLATION(_v1, _v2, _k1, _k2, _k) ((_v1) + ((_v2) - (_v1)) * (((double)(_k)) - ((double)(_k1))) / (((double)(_k2)) - ((double)(_k1)))) - int32_t taosGetLinearInterpolationVal(int32_t type, SPoint* point1, SPoint* point2, SPoint* point) { double v1 = -1; double v2 = -1; @@ -256,243 +502,15 @@ int32_t taosGetLinearInterpolationVal(int32_t type, SPoint* point1, SPoint* poin return TSDB_CODE_SUCCESS; } -static void setTagsValue(SFillInfo* pFillInfo, tFilePage** data, int32_t genRows) { - for(int32_t j = 0; j < pFillInfo->numOfCols; ++j) { - SFillColInfo* pCol = &pFillInfo->pFillCol[j]; - if (TSDB_COL_IS_NORMAL_COL(pCol->flag)) { - continue; - } - - char* val1 = elePtrAt(data[j]->data, pCol->col.bytes, genRows); - - assert(pCol->tagIndex >= 0 && pCol->tagIndex < pFillInfo->numOfTags); - SFillTagColInfo* pTag = &pFillInfo->pTags[pCol->tagIndex]; - - assert (pTag->col.colId == pCol->col.colId); - assignVal(val1, pTag->tagVal, pCol->col.bytes, pCol->col.type); - } -} - -static void setNullValueForRow(SFillInfo* pFillInfo, tFilePage** data, int32_t numOfCol, int32_t rowIndex) { - // the first are always the timestamp column, so start from the second column. - for (int32_t i = 1; i < numOfCol; ++i) { - SFillColInfo* pCol = &pFillInfo->pFillCol[i]; - - char* output = elePtrAt(data[i]->data, pCol->col.bytes, rowIndex); - setNull(output, pCol->col.type, pCol->col.bytes); - } -} - -static void doFillOneRowResult(SFillInfo* pFillInfo, tFilePage** data, char** srcData, int64_t ts, bool outOfBound) { - char* prev = pFillInfo->prevValues; - char* next = pFillInfo->nextValues; - - SPoint point1, point2, point; - int32_t step = GET_FORWARD_DIRECTION_FACTOR(pFillInfo->order); - - // set the primary timestamp column value - int32_t index = pFillInfo->numOfCurrent; - char* val = elePtrAt(data[0]->data, TSDB_KEYSIZE, index); - *(TSKEY*) val = pFillInfo->currentKey; - - // set the other values - if (pFillInfo->type == TSDB_FILL_PREV) { - char* p = FILL_IS_ASC_FILL(pFillInfo) ? prev : next; - - if (p != NULL) { - for (int32_t i = 1; i < pFillInfo->numOfCols; ++i) { - SFillColInfo* pCol = &pFillInfo->pFillCol[i]; - if (TSDB_COL_IS_TAG(pCol->flag)) { - continue; - } - - char* output = elePtrAt(data[i]->data, pCol->col.bytes, index); - assignVal(output, p + pCol->col.offset, pCol->col.bytes, pCol->col.type); - } - } else { // no prev value yet, set the value for NULL - setNullValueForRow(pFillInfo, data, pFillInfo->numOfCols, index); - } - } else if (pFillInfo->type == TSDB_FILL_LINEAR) { - // TODO : linear interpolation supports NULL value - if (prev != NULL && !outOfBound) { - for (int32_t i = 1; i < pFillInfo->numOfCols; ++i) { - SFillColInfo* pCol = &pFillInfo->pFillCol[i]; - if (TSDB_COL_IS_TAG(pCol->flag)) { - continue; - } - - int16_t type = pCol->col.type; - int16_t bytes = pCol->col.bytes; - - char *val1 = elePtrAt(data[i]->data, pCol->col.bytes, index); - if (type == TSDB_DATA_TYPE_BINARY|| type == TSDB_DATA_TYPE_NCHAR || type == TSDB_DATA_TYPE_BOOL) { - setNull(val1, pCol->col.type, bytes); - continue; - } - - point1 = (SPoint){.key = *(TSKEY*)(prev), .val = prev + pCol->col.offset}; - point2 = (SPoint){.key = ts, .val = srcData[i] + pFillInfo->index * bytes}; - point = (SPoint){.key = pFillInfo->currentKey, .val = val1}; - taosGetLinearInterpolationVal(type, &point1, &point2, &point); - } - } else { - setNullValueForRow(pFillInfo, data, pFillInfo->numOfCols, index); - } - } else { /* fill the default value */ - for (int32_t i = 1; i < pFillInfo->numOfCols; ++i) { - SFillColInfo* pCol = &pFillInfo->pFillCol[i]; - if (TSDB_COL_IS_TAG(pCol->flag)) { - continue; - } - - char* val1 = elePtrAt(data[i]->data, pCol->col.bytes, index); - assignVal(val1, (char*)&pCol->fillVal.i, pCol->col.bytes, pCol->col.type); - } - } - - setTagsValue(pFillInfo, data, index); - pFillInfo->currentKey = taosTimeAdd(pFillInfo->currentKey, pFillInfo->interval.sliding * step, pFillInfo->interval.slidingUnit, pFillInfo->precision); - pFillInfo->numOfCurrent++; -} - -static void initBeforeAfterDataBuf(SFillInfo* pFillInfo, char** next) { - if (*next != NULL) { - return; - } - - *next = calloc(1, pFillInfo->rowSize); - for (int i = 1; i < pFillInfo->numOfCols; i++) { - SFillColInfo* pCol = &pFillInfo->pFillCol[i]; - setNull(*next + pCol->col.offset, pCol->col.type, pCol->col.bytes); - } -} - -static void copyCurrentRowIntoBuf(SFillInfo* pFillInfo, char** srcData, char* buf) { - int32_t rowIndex = pFillInfo->index; - for (int32_t i = 0; i < pFillInfo->numOfCols; ++i) { - SFillColInfo* pCol = &pFillInfo->pFillCol[i]; - memcpy(buf + pCol->col.offset, srcData[i] + rowIndex * pCol->col.bytes, pCol->col.bytes); - } -} - -static int32_t fillResultImpl(SFillInfo* pFillInfo, tFilePage** data, int32_t outputRows) { - pFillInfo->numOfCurrent = 0; - - char** srcData = pFillInfo->pData; - char** prev = &pFillInfo->prevValues; - char** next = &pFillInfo->nextValues; - - int32_t step = GET_FORWARD_DIRECTION_FACTOR(pFillInfo->order); - - if (FILL_IS_ASC_FILL(pFillInfo)) { - assert(pFillInfo->currentKey >= pFillInfo->start); - } else { - assert(pFillInfo->currentKey <= pFillInfo->start); - } - - while (pFillInfo->numOfCurrent < outputRows) { - int64_t ts = ((int64_t*)pFillInfo->pData[0])[pFillInfo->index]; - - if ((pFillInfo->currentKey < ts && FILL_IS_ASC_FILL(pFillInfo)) || - (pFillInfo->currentKey > ts && !FILL_IS_ASC_FILL(pFillInfo))) { - /* set the next value for interpolation */ - initBeforeAfterDataBuf(pFillInfo, next); - copyCurrentRowIntoBuf(pFillInfo, srcData, *next); - } - - if (((pFillInfo->currentKey < ts && FILL_IS_ASC_FILL(pFillInfo)) || (pFillInfo->currentKey > ts && !FILL_IS_ASC_FILL(pFillInfo))) && - pFillInfo->numOfCurrent < outputRows) { - - // fill the gap between two actual input rows - while (((pFillInfo->currentKey < ts && FILL_IS_ASC_FILL(pFillInfo)) || - (pFillInfo->currentKey > ts && !FILL_IS_ASC_FILL(pFillInfo))) && - pFillInfo->numOfCurrent < outputRows) { - doFillOneRowResult(pFillInfo, data, srcData, ts, false); - } - - // output buffer is full, abort - if (pFillInfo->numOfCurrent == outputRows) { - pFillInfo->numOfTotal += pFillInfo->numOfCurrent; - return outputRows; - } - } else { - assert(pFillInfo->currentKey == ts); - initBeforeAfterDataBuf(pFillInfo, prev); - - // assign rows to dst buffer - for (int32_t i = 0; i < pFillInfo->numOfCols; ++i) { - SFillColInfo* pCol = &pFillInfo->pFillCol[i]; - if (TSDB_COL_IS_TAG(pCol->flag)) { - continue; - } - - char* output = elePtrAt(data[i]->data, pCol->col.bytes, pFillInfo->numOfCurrent); - char* src = elePtrAt(srcData[i], pCol->col.bytes, pFillInfo->index); - - if (i == 0 || (pCol->functionId != TSDB_FUNC_COUNT && !isNull(src, pCol->col.type)) || - (pCol->functionId == TSDB_FUNC_COUNT && GET_INT64_VAL(src) != 0)) { - assignVal(output, src, pCol->col.bytes, pCol->col.type); - memcpy(*prev + pCol->col.offset, src, pCol->col.bytes); - } else { // i > 0 and data is null , do interpolation - if (pFillInfo->type == TSDB_FILL_PREV) { - assignVal(output, *prev + pCol->col.offset, pCol->col.bytes, pCol->col.type); - } else if (pFillInfo->type == TSDB_FILL_LINEAR) { - assignVal(output, src, pCol->col.bytes, pCol->col.type); - memcpy(*prev + pCol->col.offset, src, pCol->col.bytes); - } else { - assignVal(output, (char*)&pCol->fillVal.i, pCol->col.bytes, pCol->col.type); - } - } - } - - // set the tag value for final result - setTagsValue(pFillInfo, data, pFillInfo->numOfCurrent); - - pFillInfo->currentKey = taosTimeAdd(pFillInfo->currentKey, pFillInfo->interval.sliding * step, - pFillInfo->interval.slidingUnit, pFillInfo->precision); - pFillInfo->index += 1; - pFillInfo->numOfCurrent += 1; - } - - if (pFillInfo->index >= pFillInfo->numOfRows || pFillInfo->numOfCurrent >= outputRows) { - /* the raw data block is exhausted, next value does not exists */ - if (pFillInfo->index >= pFillInfo->numOfRows) { - tfree(*next); - } - - pFillInfo->numOfTotal += pFillInfo->numOfCurrent; - return pFillInfo->numOfCurrent; - } - } - - return pFillInfo->numOfCurrent; -} - -static int64_t fillExternalResults(SFillInfo* pFillInfo, tFilePage** output, int64_t resultCapacity) { - /* - * These data are generated according to fill strategy, since the current timestamp is out of the time window of - * real result set. Note that we need to keep the direct previous result rows, to generated the filled data. - */ - pFillInfo->numOfCurrent = 0; - while (pFillInfo->numOfCurrent < resultCapacity) { - doFillOneRowResult(pFillInfo, output, pFillInfo->pData, pFillInfo->start, true); - } - - pFillInfo->numOfTotal += pFillInfo->numOfCurrent; - - assert(pFillInfo->numOfCurrent == resultCapacity); - return resultCapacity; -} - int64_t taosFillResultDataBlock(SFillInfo* pFillInfo, tFilePage** output, int32_t capacity) { int32_t remain = taosNumOfRemainRows(pFillInfo); - int64_t numOfRes = getNumOfResWithFill(pFillInfo, pFillInfo->end, capacity); + int64_t numOfRes = getNumOfResultsAfterFillGap(pFillInfo, pFillInfo->end, capacity); assert(numOfRes <= capacity); // no data existed for fill operation now, append result according to the fill strategy if (remain == 0) { - fillExternalResults(pFillInfo, output, numOfRes); + appendFilledResult(pFillInfo, output, numOfRes); } else { fillResultImpl(pFillInfo, output, (int32_t) numOfRes); assert(numOfRes == pFillInfo->numOfCurrent);