diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index f5dfdaf779..5f1108ae97 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -246,11 +246,14 @@ typedef struct SQueryInfo { int16_t fillType; // final result fill type int16_t numOfTables; STableMetaInfo **pTableMetaInfo; - struct STSBuf * tsBuf; + struct STSBuf *tsBuf; int64_t * fillVal; // default value for fill char * msg; // pointer to the pCmd->payload to keep error message temporarily int64_t clauseLimit; // limit for current sub clause + int64_t prjOffset; // offset value in the original sql expression, only applied at client side + int64_t tableLimit; // table limit in case of super table projection query + global order + limit + int32_t udColumnId; // current user-defined constant output field column id, monotonically decreases from TSDB_UD_COLUMN_INDEX int16_t resColumnId; // result column id } SQueryInfo; diff --git a/src/client/src/tscFunctionImpl.c b/src/client/src/tscFunctionImpl.c index 3867032d46..56b7f052f7 100644 --- a/src/client/src/tscFunctionImpl.c +++ b/src/client/src/tscFunctionImpl.c @@ -426,8 +426,7 @@ static void count_function_f(SQLFunctionCtx *pCtx, int32_t index) { } SET_VAL(pCtx, 1, 1); - - *((int64_t *)pCtx->aOutputBuf) += 1; + *((int64_t *)pCtx->aOutputBuf) += pCtx->size; // do not need it actually SResultRowCellInfo *pInfo = GET_RES_INFO(pCtx); @@ -3632,114 +3631,119 @@ static bool twa_function_setup(SQLFunctionCtx *pCtx) { return true; } -static int32_t twa_function_impl(SQLFunctionCtx* pCtx, int32_t index, int32_t size) { +static int32_t twa_function_impl(SQLFunctionCtx* pCtx, int32_t tsIndex, int32_t index, int32_t size) { int32_t notNullElems = 0; TSKEY *primaryKey = pCtx->ptsList; SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); STwaInfo *pInfo = GET_ROWCELL_INTERBUF(pResInfo); + int32_t i = index; + int32_t step = GET_FORWARD_DIRECTION_FACTOR(pCtx->order); if (pCtx->start.key != INT64_MIN) { - assert(pCtx->start.key < primaryKey[index] && pInfo->lastKey == INT64_MIN); + assert((pCtx->start.key < primaryKey[tsIndex + i] && pCtx->order == TSDB_ORDER_ASC) || + (pCtx->start.key > primaryKey[tsIndex + i] && pCtx->order == TSDB_ORDER_DESC)); - pInfo->lastKey = primaryKey[index]; - GET_TYPED_DATA(pInfo->lastValue, double, pCtx->inputType, GET_INPUT_CHAR_INDEX(pCtx, 0)); + assert(pInfo->lastKey == INT64_MIN); + + pInfo->lastKey = primaryKey[tsIndex + i]; + GET_TYPED_DATA(pInfo->lastValue, double, pCtx->inputType, GET_INPUT_CHAR_INDEX(pCtx, index)); pInfo->dOutput += ((pInfo->lastValue + pCtx->start.val) / 2) * (pInfo->lastKey - pCtx->start.key); pInfo->hasResult = DATA_SET_FLAG; - pInfo->win.skey = pCtx->start.key; + pInfo->win.skey = pCtx->start.key; notNullElems++; - i += 1; + i += step; } else if (pInfo->lastKey == INT64_MIN) { - pInfo->lastKey = primaryKey[index]; - GET_TYPED_DATA(pInfo->lastValue, double, pCtx->inputType, GET_INPUT_CHAR_INDEX(pCtx, 0)); + pInfo->lastKey = primaryKey[tsIndex + i]; + GET_TYPED_DATA(pInfo->lastValue, double, pCtx->inputType, GET_INPUT_CHAR_INDEX(pCtx, index)); pInfo->hasResult = DATA_SET_FLAG; - pInfo->win.skey = pInfo->lastKey; + pInfo->win.skey = pInfo->lastKey; notNullElems++; - i += 1; + i += step; } // calculate the value of switch(pCtx->inputType) { case TSDB_DATA_TYPE_TINYINT: { - int8_t *val = (int8_t*) GET_INPUT_CHAR_INDEX(pCtx, index); - for (; i < size; i++) { + int8_t *val = (int8_t*) GET_INPUT_CHAR_INDEX(pCtx, 0); + for (; i < size && i >= 0; i += step) { if (pCtx->hasNull && isNull((const char*) &val[i], pCtx->inputType)) { continue; } - pInfo->dOutput += ((val[i] + pInfo->lastValue) / 2) * (primaryKey[i] - pInfo->lastKey); + pInfo->dOutput += ((val[i] + pInfo->lastValue) / 2) * (primaryKey[i + tsIndex] - pInfo->lastKey); pInfo->lastValue = val[i]; - pInfo->lastKey = primaryKey[i]; + pInfo->lastKey = primaryKey[i + tsIndex]; } break; } case TSDB_DATA_TYPE_SMALLINT: { - int16_t *val = (int16_t*) GET_INPUT_CHAR_INDEX(pCtx, index); - for (; i < size; i++) { + int16_t *val = (int16_t*) GET_INPUT_CHAR_INDEX(pCtx, 0); + for (; i < size && i >= 0; i += step) { if (pCtx->hasNull && isNull((const char*) &val[i], pCtx->inputType)) { continue; } - pInfo->dOutput += ((val[i] + pInfo->lastValue) / 2) * (primaryKey[i] - pInfo->lastKey); + pInfo->dOutput += ((val[i] + pInfo->lastValue) / 2) * (primaryKey[i + tsIndex] - pInfo->lastKey); pInfo->lastValue = val[i]; - pInfo->lastKey = primaryKey[i]; + pInfo->lastKey = primaryKey[i + tsIndex]; } break; } case TSDB_DATA_TYPE_INT: { - int32_t *val = (int32_t*) GET_INPUT_CHAR_INDEX(pCtx, index); - for (; i < size; i++) { + int32_t *val = (int32_t*) GET_INPUT_CHAR_INDEX(pCtx, 0); + for (; i < size && i >= 0; i += step) { if (pCtx->hasNull && isNull((const char*) &val[i], pCtx->inputType)) { continue; } - pInfo->dOutput += ((val[i] + pInfo->lastValue) / 2) * (primaryKey[i] - pInfo->lastKey); + pInfo->dOutput += ((val[i] + pInfo->lastValue) / 2) * (primaryKey[i + tsIndex] - pInfo->lastKey); pInfo->lastValue = val[i]; - pInfo->lastKey = primaryKey[i]; + pInfo->lastKey = primaryKey[i + tsIndex]; } break; } case TSDB_DATA_TYPE_BIGINT: { - int64_t *val = (int64_t*) GET_INPUT_CHAR_INDEX(pCtx, index); - for (; i < size; i++) { + int64_t *val = (int64_t*) GET_INPUT_CHAR_INDEX(pCtx, 0); + for (; i < size && i >= 0; i += step) { if (pCtx->hasNull && isNull((const char*) &val[i], pCtx->inputType)) { continue; } - pInfo->dOutput += ((val[i] + pInfo->lastValue) / 2) * (primaryKey[i] - pInfo->lastKey); + pInfo->dOutput += ((val[i] + pInfo->lastValue) / 2) * (primaryKey[i + tsIndex] - pInfo->lastKey); pInfo->lastValue = (double) val[i]; - pInfo->lastKey = primaryKey[i]; + pInfo->lastKey = primaryKey[i + tsIndex]; } break; } case TSDB_DATA_TYPE_FLOAT: { - float *val = (float*) GET_INPUT_CHAR_INDEX(pCtx, index); - for (; i < size; i++) { + float *val = (float*) GET_INPUT_CHAR_INDEX(pCtx, 0); + for (; i < size && i >= 0; i += step) { if (pCtx->hasNull && isNull((const char*) &val[i], pCtx->inputType)) { continue; } - pInfo->dOutput += ((val[i] + pInfo->lastValue) / 2) * (primaryKey[i] - pInfo->lastKey); + pInfo->dOutput += ((val[i] + pInfo->lastValue) / 2) * (primaryKey[i + tsIndex] - pInfo->lastKey); pInfo->lastValue = val[i]; - pInfo->lastKey = primaryKey[i]; + pInfo->lastKey = primaryKey[i + tsIndex]; } break; } case TSDB_DATA_TYPE_DOUBLE: { - double *val = (double*) GET_INPUT_CHAR_INDEX(pCtx, index); - for (; i < size; i++) { + double *val = (double*) GET_INPUT_CHAR_INDEX(pCtx, 0); + for (; i < size && i >= 0; i += step) { if (pCtx->hasNull && isNull((const char*) &val[i], pCtx->inputType)) { continue; } - pInfo->dOutput += ((val[i] + pInfo->lastValue) / 2) * (primaryKey[i] - pInfo->lastKey); + pInfo->dOutput += ((val[i] + pInfo->lastValue) / 2) * (primaryKey[i + tsIndex] - pInfo->lastKey); pInfo->lastValue = val[i]; - pInfo->lastKey = primaryKey[i]; + pInfo->lastKey = primaryKey[i + tsIndex]; } break; } @@ -3764,16 +3768,13 @@ static void twa_function(SQLFunctionCtx *pCtx) { STwaInfo * pInfo = GET_ROWCELL_INTERBUF(pResInfo); // skip null value - int32_t i = 0; + int32_t step = GET_FORWARD_DIRECTION_FACTOR(pCtx->order); + int32_t i = (pCtx->order == TSDB_ORDER_ASC)? 0:(pCtx->size - 1); while (pCtx->hasNull && i < pCtx->size && isNull((char *)data + pCtx->inputBytes * i, pCtx->inputType)) { - i++; + i += step; } - - if (i >= pCtx->size) { - return; - } - - int32_t notNullElems = twa_function_impl(pCtx, pCtx->startOffset, pCtx->size); + + int32_t notNullElems = twa_function_impl(pCtx, pCtx->startOffset, i, pCtx->size); SET_VAL(pCtx, notNullElems, 1); if (notNullElems > 0) { @@ -3791,11 +3792,136 @@ static void twa_function_f(SQLFunctionCtx *pCtx, int32_t index) { return; } - int32_t notNullElems = twa_function_impl(pCtx, index, 1); + int32_t notNullElems = 0; + TSKEY *primaryKey = pCtx->ptsList; + + SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); + + STwaInfo *pInfo = GET_ROWCELL_INTERBUF(pResInfo); + int32_t i = pCtx->startOffset; + int32_t size = pCtx->size; + + if (pCtx->start.key != INT64_MIN) { + assert(pInfo->lastKey == INT64_MIN); + + pInfo->lastKey = primaryKey[index]; + GET_TYPED_DATA(pInfo->lastValue, double, pCtx->inputType, GET_INPUT_CHAR_INDEX(pCtx, index)); + + pInfo->dOutput += ((pInfo->lastValue + pCtx->start.val) / 2) * (pInfo->lastKey - pCtx->start.key); + + pInfo->hasResult = DATA_SET_FLAG; + pInfo->win.skey = pCtx->start.key; + notNullElems++; + i += 1; + } else if (pInfo->lastKey == INT64_MIN) { + pInfo->lastKey = primaryKey[index]; + GET_TYPED_DATA(pInfo->lastValue, double, pCtx->inputType, GET_INPUT_CHAR_INDEX(pCtx, index)); + + pInfo->hasResult = DATA_SET_FLAG; + pInfo->win.skey = pInfo->lastKey; + notNullElems++; + i += 1; + } + + // calculate the value of + switch(pCtx->inputType) { + case TSDB_DATA_TYPE_TINYINT: { + int8_t *val = (int8_t*) GET_INPUT_CHAR_INDEX(pCtx, index); + for (; i < size; i++) { + if (pCtx->hasNull && isNull((const char*) &val[i], pCtx->inputType)) { + continue; + } + + pInfo->dOutput += ((val[i] + pInfo->lastValue) / 2) * (primaryKey[i + index] - pInfo->lastKey); + pInfo->lastValue = val[i]; + pInfo->lastKey = primaryKey[i + index]; + } + break; + } + case TSDB_DATA_TYPE_SMALLINT: { + int16_t *val = (int16_t*) GET_INPUT_CHAR_INDEX(pCtx, index); + for (; i < size; i++) { + if (pCtx->hasNull && isNull((const char*) &val[i], pCtx->inputType)) { + continue; + } + + pInfo->dOutput += ((val[i] + pInfo->lastValue) / 2) * (primaryKey[i + index] - pInfo->lastKey); + pInfo->lastValue = val[i]; + pInfo->lastKey = primaryKey[i + index]; + } + break; + } + case TSDB_DATA_TYPE_INT: { + int32_t *val = (int32_t*) GET_INPUT_CHAR_INDEX(pCtx, index); + for (; i < size; i++) { + if (pCtx->hasNull && isNull((const char*) &val[i], pCtx->inputType)) { + continue; + } + + pInfo->dOutput += ((val[i] + pInfo->lastValue) / 2) * (primaryKey[i + index] - pInfo->lastKey); + pInfo->lastValue = val[i]; + pInfo->lastKey = primaryKey[i + index]; + } + break; + } + case TSDB_DATA_TYPE_BIGINT: { + int64_t *val = (int64_t*) GET_INPUT_CHAR_INDEX(pCtx, index); + for (; i < size; i++) { + if (pCtx->hasNull && isNull((const char*) &val[i], pCtx->inputType)) { + continue; + } + + pInfo->dOutput += ((val[i] + pInfo->lastValue) / 2) * (primaryKey[i + index] - pInfo->lastKey); + pInfo->lastValue = (double) val[i]; + pInfo->lastKey = primaryKey[i + index]; + } + break; + } + case TSDB_DATA_TYPE_FLOAT: { + float *val = (float*) GET_INPUT_CHAR_INDEX(pCtx, index); + for (; i < size; i++) { + if (pCtx->hasNull && isNull((const char*) &val[i], pCtx->inputType)) { + continue; + } + + pInfo->dOutput += ((val[i] + pInfo->lastValue) / 2) * (primaryKey[i + index] - pInfo->lastKey); + pInfo->lastValue = val[i]; + pInfo->lastKey = primaryKey[i + index]; + } + break; + } + case TSDB_DATA_TYPE_DOUBLE: { + double *val = (double*) GET_INPUT_CHAR_INDEX(pCtx, index); + for (; i < size; i++) { + if (pCtx->hasNull && isNull((const char*) &val[i], pCtx->inputType)) { + continue; + } + + pInfo->dOutput += ((val[i] + pInfo->lastValue) / 2) * (primaryKey[i + index] - pInfo->lastKey); + pInfo->lastValue = val[i]; + pInfo->lastKey = primaryKey[i + index]; + } + break; + } + default: assert(0); + } + + // the last interpolated time window value + if (pCtx->end.key != INT64_MIN) { + pInfo->dOutput += ((pInfo->lastValue + pCtx->end.val) / 2) * (pCtx->end.key - pInfo->lastKey); + pInfo->lastValue = pCtx->end.val; + pInfo->lastKey = pCtx->end.key; + } + + pInfo->win.ekey = pInfo->lastKey; + SET_VAL(pCtx, notNullElems, 1); + if (notNullElems > 0) { + pResInfo->hasResult = DATA_SET_FLAG; + } + if (pCtx->stableQuery) { - SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); memcpy(pCtx->aOutputBuf, GET_ROWCELL_INTERBUF(pResInfo), sizeof(STwaInfo)); } } diff --git a/src/client/src/tscLocalMerge.c b/src/client/src/tscLocalMerge.c index 2cd37013c5..9fdadfa957 100644 --- a/src/client/src/tscLocalMerge.c +++ b/src/client/src/tscLocalMerge.c @@ -721,10 +721,16 @@ int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOr // final result depends on the fields number memset(pSchema, 0, sizeof(SSchema) * size); + for (int32_t i = 0; i < size; ++i) { SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, i); - SSchema *p1 = tscGetTableColumnSchema(pTableMetaInfo->pTableMeta, pExpr->colInfo.colIndex); + SSchema p1 = {0}; + if (pExpr->colInfo.colIndex != TSDB_TBNAME_COLUMN_INDEX) { + p1 = *tscGetTableColumnSchema(pTableMetaInfo->pTableMeta, pExpr->colInfo.colIndex); + } else { + p1 = tGetTableNameColumnSchema(); + } int32_t inter = 0; int16_t type = -1; @@ -743,7 +749,8 @@ int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOr functionId = TSDB_FUNC_LAST; } - getResultDataInfo(p1->type, p1->bytes, functionId, 0, &type, &bytes, &inter, 0, false); + int32_t ret = getResultDataInfo(p1.type, p1.bytes, functionId, 0, &type, &bytes, &inter, 0, false); + assert(ret == TSDB_CODE_SUCCESS); } pSchema[i].type = (uint8_t)type; diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index 309d354849..abbe26204d 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -5307,15 +5307,18 @@ int32_t parseLimitClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t clauseIn // keep original limitation value in globalLimit pQueryInfo->clauseLimit = pQueryInfo->limit.limit; - pQueryInfo->prjOffset = pQueryInfo->limit.offset; + pQueryInfo->prjOffset = pQueryInfo->limit.offset; + pQueryInfo->tableLimit = -1; if (tscOrderedProjectionQueryOnSTable(pQueryInfo, 0)) { /* - * the limitation/offset value should be removed during retrieve data from virtual node, - * since the global order are done in client side, so the limitation should also - * be done at the client side. + * the offset value should be removed during retrieve data from virtual node, since the + * global order are done in client side, so the offset is applied at the client side + * However, note that the maximum allowed number of result for each table should be less + * than or equal to the value of limit. */ if (pQueryInfo->limit.limit > 0) { + pQueryInfo->tableLimit = pQueryInfo->limit.limit + pQueryInfo->limit.offset; pQueryInfo->limit.limit = -1; } diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 9e8d3d339e..5e3acaa0b7 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -684,6 +684,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { pQueryMsg->tagNameRelType = htons(pQueryInfo->tagCond.relType); pQueryMsg->numOfTags = htonl(numOfTags); pQueryMsg->queryType = htonl(pQueryInfo->type); + pQueryMsg->tableLimit = htobe64(pQueryInfo->tableLimit); size_t numOfOutput = tscSqlExprNumOfExprs(pQueryInfo); pQueryMsg->numOfOutput = htons((int16_t)numOfOutput); // this is the stage one output column number diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index b77db69c46..b4d3bec958 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -476,19 +476,21 @@ typedef struct { int16_t numOfGroupCols; // num of group by columns int16_t orderByIdx; int16_t orderType; // used in group by xx order by xxx + int64_t tableLimit; // limit the number of rows for each table, used in order by + limit in stable projection query. + int16_t prjOrder; // global order in super table projection query. int64_t limit; int64_t offset; uint32_t queryType; // denote another query process int16_t numOfOutput; // final output columns numbers int16_t tagNameRelType; // relation of tag criteria and tbname criteria - int16_t fillType; // interpolate type - uint64_t fillVal; // default value array list + int16_t fillType; // interpolate type + uint64_t fillVal; // default value array list int32_t secondStageOutput; - int32_t tsOffset; // offset value in current msg body, NOTE: ts list is compressed - int32_t tsLen; // total length of ts comp block - int32_t tsNumOfBlocks; // ts comp block numbers - int32_t tsOrder; // ts comp block order - int32_t numOfTags; // number of tags columns involved + int32_t tsOffset; // offset value in current msg body, NOTE: ts list is compressed + int32_t tsLen; // total length of ts comp block + int32_t tsNumOfBlocks; // ts comp block numbers + int32_t tsOrder; // ts comp block order + int32_t numOfTags; // number of tags columns involved SColumnInfo colList[]; } SQueryTableMsg; diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index 71fc01923a..b73f7ce3f5 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -83,16 +83,15 @@ typedef struct SResultRec { int32_t threshold; // result size threshold in rows. } SResultRec; -typedef struct SWindowResInfo { - SResultRow** pResult; // result list - int16_t type:8; // data type for hash key - int32_t size:24; // number of result set - int32_t threshold; // threshold to halt query and return the generated results. - int32_t capacity; // max capacity - int32_t curIndex; // current start active index - int64_t startTime; // start time of the first time window for sliding query - int64_t prevSKey; // previous (not completed) sliding window start key -} SWindowResInfo; +typedef struct SResultRowInfo { + SResultRow** pResult; // result list + int16_t type:8; // data type for hash key + int32_t size:24; // number of result set + int32_t capacity; // max capacity + int32_t curIndex; // current start active index + int64_t startTime; // start time of the first time window for sliding query + int64_t prevSKey; // previous (not completed) sliding window start key +} SResultRowInfo; typedef struct SColumnFilterElem { int16_t bytes; // column length @@ -115,7 +114,7 @@ typedef struct STableQueryInfo { STimeWindow win; STSCursor cur; void* pTable; // for retrieve the page id list - SWindowResInfo windowResInfo; + SResultRowInfo windowResInfo; } STableQueryInfo; typedef struct SQueryCostInfo { @@ -179,7 +178,7 @@ typedef struct SQueryRuntimeEnv { uint16_t* offset; uint16_t scanFlag; // denotes reversed scan of data or not SFillInfo* pFillInfo; - SWindowResInfo windowResInfo; + SResultRowInfo windowResInfo; STSBuf* pTSBuf; STSCursor cur; SQueryCostInfo summary; @@ -190,6 +189,7 @@ typedef struct SQueryRuntimeEnv { bool groupbyNormalCol; // denote if this is a groupby normal column query bool hasTagResults; // if there are tag values in final result or not bool timeWindowInterpo;// if the time window start/end required interpolation + bool queryWindowIdentical; // all query time windows are identical for all tables in one group int32_t interBufSize; // intermediate buffer sizse int32_t prevGroupId; // previous executed group id SDiskbasedResultBuf* pResultBuf; // query result buffer based on blocked-wised disk file @@ -217,7 +217,8 @@ typedef struct SQInfo { STableGroupInfo tableGroupInfo; // table list SArray STableGroupInfo tableqinfoGroupInfo; // this is a group array list, including SArray structure SQueryRuntimeEnv runtimeEnv; - SArray* arrTableIdInfo; +// SArray* arrTableIdInfo; + SHashObj* arrTableIdInfo; int32_t groupIndex; /* diff --git a/src/query/inc/qUtil.h b/src/query/inc/qUtil.h index 8b84ac0182..fb71c8a5fe 100644 --- a/src/query/inc/qUtil.h +++ b/src/query/inc/qUtil.h @@ -30,19 +30,19 @@ void clearResultRow(SQueryRuntimeEnv* pRuntimeEnv, SResultRow* pRow, int16_t typ void copyResultRow(SQueryRuntimeEnv* pRuntimeEnv, SResultRow* dst, const SResultRow* src, int16_t type); SResultRowCellInfo* getResultCell(SQueryRuntimeEnv* pRuntimeEnv, const SResultRow* pRow, int32_t index); -int32_t initWindowResInfo(SWindowResInfo* pWindowResInfo, int32_t size, int32_t threshold, int16_t type); +int32_t initWindowResInfo(SResultRowInfo* pWindowResInfo, int32_t size, int16_t type); -void cleanupTimeWindowInfo(SWindowResInfo* pWindowResInfo); -void resetTimeWindowInfo(SQueryRuntimeEnv* pRuntimeEnv, SWindowResInfo* pWindowResInfo); +void cleanupTimeWindowInfo(SResultRowInfo* pWindowResInfo); +void resetTimeWindowInfo(SQueryRuntimeEnv* pRuntimeEnv, SResultRowInfo* pWindowResInfo); void clearFirstNWindowRes(SQueryRuntimeEnv *pRuntimeEnv, int32_t num); void clearClosedTimeWindow(SQueryRuntimeEnv* pRuntimeEnv); -int32_t numOfClosedTimeWindow(SWindowResInfo* pWindowResInfo); -void closeTimeWindow(SWindowResInfo* pWindowResInfo, int32_t slot); -void closeAllTimeWindow(SWindowResInfo* pWindowResInfo); -void removeRedundantWindow(SWindowResInfo *pWindowResInfo, TSKEY lastKey, int32_t order); +int32_t numOfClosedTimeWindow(SResultRowInfo* pWindowResInfo); +void closeTimeWindow(SResultRowInfo* pWindowResInfo, int32_t slot); +void closeAllTimeWindow(SResultRowInfo* pWindowResInfo); +void removeRedundantWindow(SResultRowInfo *pWindowResInfo, TSKEY lastKey, int32_t order); -static FORCE_INLINE SResultRow *getResultRow(SWindowResInfo *pWindowResInfo, int32_t slot) { +static FORCE_INLINE SResultRow *getResultRow(SResultRowInfo *pWindowResInfo, int32_t slot) { assert(pWindowResInfo != NULL && slot >= 0 && slot < pWindowResInfo->size); return pWindowResInfo->pResult[slot]; } @@ -50,7 +50,7 @@ static FORCE_INLINE SResultRow *getResultRow(SWindowResInfo *pWindowResInfo, int #define curTimeWindowIndex(_winres) ((_winres)->curIndex) #define GET_ROW_PARAM_FOR_MULTIOUTPUT(_q, tbq, sq) (((tbq) && (!sq))? (_q)->pExpr1[1].base.arg->argValue.i64:1) -bool isWindowResClosed(SWindowResInfo *pWindowResInfo, int32_t slot); +bool isWindowResClosed(SResultRowInfo *pWindowResInfo, int32_t slot); int32_t initResultRow(SResultRow *pResultRow); diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index ec70280c7f..0c07149e8e 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -196,6 +196,9 @@ static int32_t flushFromResultBuf(SQueryRuntimeEnv* pRuntimeEnv, SGroupResInfo* static int32_t checkForQueryBuf(size_t numOfTables); static void releaseQueryBuf(size_t numOfTables); static int32_t binarySearchForKey(char *pValue, int num, TSKEY key, int order); +static void doRowwiseTimeWindowInterpolation(SQueryRuntimeEnv* pRuntimeEnv, SArray* pDataBlock, TSKEY prevTs, int32_t prevRowIndex, TSKEY curTs, int32_t curRowIndex, TSKEY windowKey, int32_t type); +static STsdbQueryCond createTsdbQueryCond(SQuery* pQuery); +static STableIdInfo createTableIdInfo(SQuery* pQuery); bool doFilterData(SQuery *pQuery, int32_t elemPos) { for (int32_t k = 0; k < pQuery->numOfFilterCols; ++k) { @@ -458,7 +461,7 @@ static bool hasNullValue(SColIndex* pColIndex, SDataStatis *pStatis, SDataStatis return true; } -static SResultRow *doPrepareResultRowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SWindowResInfo *pWindowResInfo, char *pData, +static SResultRow *doPrepareResultRowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pWindowResInfo, char *pData, int16_t bytes, bool masterscan, uint64_t uid) { SET_RES_WINDOW_KEY(pRuntimeEnv->keyBuf, pData, bytes, uid); int32_t *p1 = @@ -515,7 +518,7 @@ static SResultRow *doPrepareResultRowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SWin } // get the correct time window according to the handled timestamp -static STimeWindow getActiveTimeWindow(SWindowResInfo *pWindowResInfo, int64_t ts, SQuery *pQuery) { +static STimeWindow getActiveTimeWindow(SResultRowInfo *pWindowResInfo, int64_t ts, SQuery *pQuery) { STimeWindow w = {0}; if (pWindowResInfo->curIndex == -1) { // the first window, from the previous stored value @@ -608,7 +611,7 @@ static int32_t addNewWindowResultBuf(SResultRow *pWindowRes, SDiskbasedResultBuf return 0; } -static int32_t setWindowOutputBufByKey(SQueryRuntimeEnv *pRuntimeEnv, SWindowResInfo *pWindowResInfo, SDataBlockInfo* pBockInfo, +static int32_t setWindowOutputBufByKey(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pWindowResInfo, SDataBlockInfo* pBockInfo, STimeWindow *win, bool masterscan, bool* newWind, SResultRow** pResult) { assert(win->skey <= win->ekey); SDiskbasedResultBuf *pResultBuf = pRuntimeEnv->pResultBuf; @@ -641,7 +644,7 @@ static int32_t setWindowOutputBufByKey(SQueryRuntimeEnv *pRuntimeEnv, SWindowRes return TSDB_CODE_SUCCESS; } -static bool getResultRowStatus(SWindowResInfo *pWindowResInfo, int32_t slot) { +static bool getResultRowStatus(SResultRowInfo *pWindowResInfo, int32_t slot) { assert(slot >= 0 && slot < pWindowResInfo->size); return pWindowResInfo->pResult[slot]->closed; } @@ -660,7 +663,7 @@ static void setResultRowInterpo(SResultRow* pResult, SResultTsInterpType type) { } } -static bool isResultRowInterpo(SResultRow* pResult, SResultTsInterpType type) { +static bool resultRowInterpolated(SResultRow* pResult, SResultTsInterpType type) { assert(pResult != NULL && (type == RESULT_ROW_START_INTERP || type == RESULT_ROW_END_INTERP)); if (type == RESULT_ROW_START_INTERP) { return pResult->startInterp == true; @@ -700,7 +703,7 @@ static FORCE_INLINE int32_t getForwardStepsInBlock(int32_t numOfRows, __block_se /** * NOTE: the query status only set for the first scan of master scan. */ -static int32_t doCheckQueryCompleted(SQueryRuntimeEnv *pRuntimeEnv, TSKEY lastKey, SWindowResInfo *pWindowResInfo) { +static int32_t doCheckQueryCompleted(SQueryRuntimeEnv *pRuntimeEnv, TSKEY lastKey, SResultRowInfo *pWindowResInfo) { SQuery *pQuery = pRuntimeEnv->pQuery; if (pRuntimeEnv->scanFlag != MASTER_SCAN) { return pWindowResInfo->size; @@ -758,7 +761,7 @@ static int32_t doCheckQueryCompleted(SQueryRuntimeEnv *pRuntimeEnv, TSKEY lastKe pWindowResInfo->prevSKey = pWindowResInfo->pResult[pWindowResInfo->curIndex]->win.skey; // the number of completed slots are larger than the threshold, return current generated results to client. - if (numOfClosed > pWindowResInfo->threshold) { + if (numOfClosed > pQuery->rec.threshold) { qDebug("QInfo:%p total result window:%d closed:%d, reached the output threshold %d, return", GET_QINFO_ADDR(pRuntimeEnv), pWindowResInfo->size, numOfClosed, pQuery->rec.threshold); @@ -989,7 +992,7 @@ static char *getDataBlock(SQueryRuntimeEnv *pRuntimeEnv, SArithmeticSupport *sas if (functionId == TSDB_FUNC_ARITHM) { sas->pArithExpr = &pQuery->pExpr1[col]; - sas->offset = (QUERY_IS_ASC_QUERY(pQuery)) ? pQuery->pos : pQuery->pos - (size - 1); + sas->offset = (QUERY_IS_ASC_QUERY(pQuery))? pQuery->pos : pQuery->pos - (size - 1); sas->colList = pQuery->colList; sas->numOfCols = pQuery->numOfCols; sas->data = calloc(pQuery->numOfCols, POINTER_BYTES); @@ -1032,85 +1035,89 @@ static char *getDataBlock(SQueryRuntimeEnv *pRuntimeEnv, SArithmeticSupport *sas return dataBlock; } -// window start key interpolation -static bool setTimeWindowInterpolationStartTs(SQueryRuntimeEnv* pRuntimeEnv, int32_t pos, SArray* pDataBlock, TSKEY* tsCols, STimeWindow* win) { - SQuery* pQuery = pRuntimeEnv->pQuery; - - TSKEY start = tsCols[pos]; - TSKEY lastTs = *(TSKEY *) pRuntimeEnv->prevRow[0]; - TSKEY prevTs = (pos == 0)? lastTs : tsCols[pos - 1]; - - // if lastTs == INT64_MIN, it is the first block, no need to do the start time interpolation - if (((lastTs != INT64_MIN && pos >= 0) || (lastTs == INT64_MIN && pos > 0)) && win->skey > lastTs && - win->skey < start) { - - for (int32_t k = 0; k < pQuery->numOfCols; ++k) { - SColumnInfoData *pColInfo = taosArrayGet(pDataBlock, k); - if (k == 0 && pColInfo->info.colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) { - assert(pColInfo->info.type == TSDB_DATA_TYPE_TIMESTAMP); - continue; - } - - double v1 = 0, v2 = 0, v = 0; - - char *prevVal = pos == 0 ? pRuntimeEnv->prevRow[k] : ((char*)pColInfo->pData) + (pos - 1) * pColInfo->info.bytes; - - GET_TYPED_DATA(v1, double, pColInfo->info.type, (char *)prevVal); - GET_TYPED_DATA(v2, double, pColInfo->info.type, (char *)pColInfo->pData + pos * pColInfo->info.bytes); - - SPoint point1 = (SPoint){.key = prevTs, .val = &v1}; - SPoint point2 = (SPoint){.key = start, .val = &v2}; - SPoint point = (SPoint){.key = win->skey, .val = &v}; - taosGetLinearInterpolationVal(TSDB_DATA_TYPE_DOUBLE, &point1, &point2, &point); - pRuntimeEnv->pCtx[k].start.key = point.key; - pRuntimeEnv->pCtx[k].start.val = v; +static void setNotInterpoWindowKey(SQLFunctionCtx* pCtx, int32_t numOfOutput, int32_t type) { + if (type == RESULT_ROW_START_INTERP) { + for (int32_t k = 0; k < numOfOutput; ++k) { + pCtx[k].start.key = INT64_MIN; } - - return true; } else { - for (int32_t k = 0; k < pQuery->numOfCols; ++k) { - pRuntimeEnv->pCtx[k].start.key = INT64_MIN; + for (int32_t k = 0; k < numOfOutput; ++k) { + pCtx[k].end.key = INT64_MIN; } - - return false; } } -static bool setTimeWindowInterpolationEndTs(SQueryRuntimeEnv* pRuntimeEnv, int32_t pos, SArray* pDataBlock, TSKEY* tsCols, TSKEY ekey, STimeWindow* win) { +//static double getTSWindowInterpoVal(SColumnInfoData* pColInfo, int16_t srcColIndex, int16_t rowIndex, TSKEY key, char** prevRow, TSKEY* tsCols, int32_t step) { +// TSKEY start = tsCols[rowIndex]; +// TSKEY prevTs = (rowIndex == 0)? *(TSKEY *) prevRow[0] : tsCols[rowIndex - step]; +// +// double v1 = 0, v2 = 0, v = 0; +// char *prevVal = (rowIndex == 0)? prevRow[srcColIndex] : ((char*)pColInfo->pData) + (rowIndex - step) * pColInfo->info.bytes; +// +// GET_TYPED_DATA(v1, double, pColInfo->info.type, (char *)prevVal); +// GET_TYPED_DATA(v2, double, pColInfo->info.type, (char *)pColInfo->pData + rowIndex * pColInfo->info.bytes); +// +// SPoint point1 = (SPoint){.key = prevTs, .val = &v1}; +// SPoint point2 = (SPoint){.key = start, .val = &v2}; +// SPoint point = (SPoint){.key = key, .val = &v}; +// taosGetLinearInterpolationVal(TSDB_DATA_TYPE_DOUBLE, &point1, &point2, &point); +// +// return v; +//} + +// window start key interpolation +static bool setTimeWindowInterpolationStartTs(SQueryRuntimeEnv* pRuntimeEnv, int32_t pos, int32_t numOfRows, SArray* pDataBlock, TSKEY* tsCols, STimeWindow* win) { SQuery* pQuery = pRuntimeEnv->pQuery; - TSKEY trueEndKey = tsCols[pos]; - if (win->ekey < ekey && win->ekey != trueEndKey) { - int32_t nextIndex = pos + 1; - TSKEY next = tsCols[nextIndex]; - - for (int32_t k = 0; k < pQuery->numOfCols; ++k) { - SColumnInfoData *pColInfo = taosArrayGet(pDataBlock, k); - if (k == 0 && pColInfo->info.type == TSDB_DATA_TYPE_TIMESTAMP && - pColInfo->info.colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) { - continue; - } - - double v1 = 0, v2 = 0, v = 0; - GET_TYPED_DATA(v1, double, pColInfo->info.type, (char *)pColInfo->pData + pos * pColInfo->info.bytes); - GET_TYPED_DATA(v2, double, pColInfo->info.type, (char *)pColInfo->pData + nextIndex * pColInfo->info.bytes); - - SPoint point1 = (SPoint){.key = trueEndKey, .val = &v1}; - SPoint point2 = (SPoint){.key = next, .val = &v2}; - SPoint point = (SPoint){.key = win->ekey, .val = &v}; - taosGetLinearInterpolationVal(TSDB_DATA_TYPE_DOUBLE, &point1, &point2, &point); - pRuntimeEnv->pCtx[k].end.key = point.key; - pRuntimeEnv->pCtx[k].end.val = v; - } + TSKEY curTs = tsCols[pos]; + TSKEY lastTs = *(TSKEY *) pRuntimeEnv->prevRow[0]; + // lastTs == INT64_MIN and pos == 0 means this is the first time window, interpolation is not needed. + // start exactly from this point, no need to do interpolation + TSKEY key = QUERY_IS_ASC_QUERY(pQuery)? win->skey:win->ekey; + if (key == curTs) { + setNotInterpoWindowKey(pRuntimeEnv->pCtx, pQuery->numOfOutput, RESULT_ROW_START_INTERP); return true; - } else { // current time window does not ended in current data block, do nothing - for (int32_t k = 0; k < pQuery->numOfCols; ++k) { - pRuntimeEnv->pCtx[k].end.key = INT64_MIN; - } + } + if (lastTs == INT64_MIN && ((pos == 0 && QUERY_IS_ASC_QUERY(pQuery)) || (pos == (numOfRows - 1) && !QUERY_IS_ASC_QUERY(pQuery)))) { + setNotInterpoWindowKey(pRuntimeEnv->pCtx, pQuery->numOfOutput, RESULT_ROW_START_INTERP); + return true; + } + + int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); + TSKEY prevTs = ((pos == 0 && QUERY_IS_ASC_QUERY(pQuery)) || (pos == (numOfRows - 1) && !QUERY_IS_ASC_QUERY(pQuery)))? + lastTs:tsCols[pos - step]; + + doRowwiseTimeWindowInterpolation(pRuntimeEnv, pDataBlock, prevTs, pos - step, curTs, pos, key, RESULT_ROW_START_INTERP); + return true; +} + +static bool setTimeWindowInterpolationEndTs(SQueryRuntimeEnv* pRuntimeEnv, int32_t endRowIndex, SArray* pDataBlock, TSKEY* tsCols, TSKEY blockEkey, STimeWindow* win) { + SQuery* pQuery = pRuntimeEnv->pQuery; + TSKEY actualEndKey = tsCols[endRowIndex]; + + TSKEY key = QUERY_IS_ASC_QUERY(pQuery)? win->ekey:win->skey; + + // not ended in current data block, do not invoke interpolation + if ((key > blockEkey && QUERY_IS_ASC_QUERY(pQuery)) || (key < blockEkey && !QUERY_IS_ASC_QUERY(pQuery))) { + setNotInterpoWindowKey(pRuntimeEnv->pCtx, pQuery->numOfOutput, RESULT_ROW_END_INTERP); return false; } + + // there is actual end point of current time window, no interpolation need + if (key == actualEndKey) { + setNotInterpoWindowKey(pRuntimeEnv->pCtx, pQuery->numOfOutput, RESULT_ROW_END_INTERP); + return true; + } + + int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); + int32_t nextRowIndex = endRowIndex + step; + assert(nextRowIndex >= 0); + + TSKEY nextKey = tsCols[nextRowIndex]; + doRowwiseTimeWindowInterpolation(pRuntimeEnv, pDataBlock, actualEndKey, endRowIndex, nextKey, nextRowIndex, key, RESULT_ROW_END_INTERP); + return true; } static void saveDataBlockLastRow(SQueryRuntimeEnv* pRuntimeEnv, SDataBlockInfo* pDataBlockInfo, SArray* pDataBlock) { @@ -1119,10 +1126,10 @@ static void saveDataBlockLastRow(SQueryRuntimeEnv* pRuntimeEnv, SDataBlockInfo* } SQuery* pQuery = pRuntimeEnv->pQuery; + int32_t rowIndex = QUERY_IS_ASC_QUERY(pQuery)? pDataBlockInfo->rows-1:0; for (int32_t k = 0; k < pQuery->numOfCols; ++k) { SColumnInfoData *pColInfo = taosArrayGet(pDataBlock, k); - memcpy(pRuntimeEnv->prevRow[k], ((char*)pColInfo->pData) + (pColInfo->info.bytes * (pDataBlockInfo->rows - 1)), - pColInfo->info.bytes); + memcpy(pRuntimeEnv->prevRow[k], ((char*)pColInfo->pData) + (pColInfo->info.bytes * rowIndex), pColInfo->info.bytes); } } @@ -1150,7 +1157,7 @@ static TSKEY getStartTsKey(SQuery* pQuery, SDataBlockInfo* pDataBlockInfo, TSKEY * such as count/min/max etc. */ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pStatis, SDataBlockInfo *pDataBlockInfo, - SWindowResInfo *pWindowResInfo, __block_search_fn_t searchFn, SArray *pDataBlock) { + SResultRowInfo *pWindowResInfo, __block_search_fn_t searchFn, SArray *pDataBlock) { SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx; bool masterScan = IS_MASTER_SCAN(pRuntimeEnv); @@ -1174,11 +1181,13 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis * int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); if (QUERY_IS_INTERVAL_QUERY(pQuery)) { + int32_t prevIndex = curTimeWindowIndex(pWindowResInfo); + TSKEY ts = getStartTsKey(pQuery, pDataBlockInfo, tsCols, step); + STimeWindow win = getActiveTimeWindow(pWindowResInfo, ts, pQuery); bool hasTimeWindow = false; SResultRow* pResult = NULL; - STimeWindow win = getActiveTimeWindow(pWindowResInfo, ts, pQuery); int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pDataBlockInfo, &win, masterScan, &hasTimeWindow, &pResult); if (ret != TSDB_CODE_SUCCESS) { tfree(sasArray); @@ -1193,20 +1202,47 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis * TSKEY ekey = reviseWindowEkey(pQuery, &win); forwardStep = getNumOfRowsInTimeWindow(pQuery, pDataBlockInfo, tsCols, pQuery->pos, ekey, searchFn, true); + // prev time window not interpolation yet. + int32_t curIndex = curTimeWindowIndex(pWindowResInfo); + if (prevIndex != -1 && prevIndex < curIndex) { + for(int32_t j = prevIndex; j < curIndex; ++j) { + SResultRow *pRes = pWindowResInfo->pResult[j]; + + STimeWindow w = pRes->win; + ret = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pDataBlockInfo, &w, masterScan, &hasTimeWindow, &pResult); + assert(ret == TSDB_CODE_SUCCESS && !resultRowInterpolated(pResult, RESULT_ROW_END_INTERP)); + + int32_t p = QUERY_IS_ASC_QUERY(pQuery)? 0:pDataBlockInfo->rows-1; + doRowwiseTimeWindowInterpolation(pRuntimeEnv, pDataBlock, *(TSKEY*) pRuntimeEnv->prevRow[0], -1, tsCols[0], p, w.ekey, RESULT_ROW_END_INTERP); + setResultRowInterpo(pResult, RESULT_ROW_END_INTERP); + setNotInterpoWindowKey(pRuntimeEnv->pCtx, pQuery->numOfOutput, RESULT_ROW_START_INTERP); + + bool closed = getResultRowStatus(pWindowResInfo, curTimeWindowIndex(pWindowResInfo)); + doBlockwiseApplyFunctions(pRuntimeEnv, closed, &w, startPos, 0, tsCols, pDataBlockInfo->rows); + } + + // restore current time window + ret = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pDataBlockInfo, &win, masterScan, &hasTimeWindow, &pResult); + assert (ret == TSDB_CODE_SUCCESS); // null data, too many state code + } + // window start key interpolation if (pRuntimeEnv->timeWindowInterpo) { - bool alreadyInterp = isResultRowInterpo(pResult, RESULT_ROW_START_INTERP); - if (!alreadyInterp) { - bool interp = setTimeWindowInterpolationStartTs(pRuntimeEnv, pQuery->pos, pDataBlock, tsCols, &win); + bool done = resultRowInterpolated(pResult, RESULT_ROW_START_INTERP); + if (!done) { + int32_t startRowIndex = pQuery->pos; + bool interp = setTimeWindowInterpolationStartTs(pRuntimeEnv, startRowIndex, pDataBlockInfo->rows, pDataBlock, tsCols, &win); if (interp) { setResultRowInterpo(pResult, RESULT_ROW_START_INTERP); } } - alreadyInterp = isResultRowInterpo(pResult, RESULT_ROW_END_INTERP); - if (!alreadyInterp) { - bool interp = setTimeWindowInterpolationEndTs(pRuntimeEnv, pQuery->pos + forwardStep - 1, pDataBlock, tsCols, - pDataBlockInfo->window.ekey, &win); + done = resultRowInterpolated(pResult, RESULT_ROW_END_INTERP); + if (!done) { + int32_t endRowIndex = pQuery->pos + (forwardStep - 1) * step; + + TSKEY endKey = QUERY_IS_ASC_QUERY(pQuery)? pDataBlockInfo->window.ekey:pDataBlockInfo->window.skey; + bool interp = setTimeWindowInterpolationEndTs(pRuntimeEnv, endRowIndex, pDataBlock, tsCols, endKey, &win); if (interp) { setResultRowInterpo(pResult, RESULT_ROW_END_INTERP); } @@ -1243,17 +1279,20 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis * // window start(end) key interpolation if (pRuntimeEnv->timeWindowInterpo) { - bool alreadyInterp = isResultRowInterpo(pResult, RESULT_ROW_START_INTERP); - if (!alreadyInterp) { - bool interp = setTimeWindowInterpolationStartTs(pRuntimeEnv, startPos, pDataBlock, tsCols, &nextWin); + bool done = resultRowInterpolated(pResult, RESULT_ROW_START_INTERP); + if (!done) { + int32_t startRowIndex = startPos; + bool interp = setTimeWindowInterpolationStartTs(pRuntimeEnv, startRowIndex, pDataBlockInfo->rows, pDataBlock, tsCols, &nextWin); if (interp) { setResultRowInterpo(pResult, RESULT_ROW_START_INTERP); } } - alreadyInterp = isResultRowInterpo(pResult, RESULT_ROW_END_INTERP); - if (!alreadyInterp) { - bool interp = setTimeWindowInterpolationEndTs(pRuntimeEnv, startPos + forwardStep - 1, pDataBlock, tsCols, pDataBlockInfo->window.ekey, &nextWin); + done = resultRowInterpolated(pResult, RESULT_ROW_END_INTERP); + if (!done) { + int32_t endRowIndex = startPos + (forwardStep - 1)*step; + TSKEY endKey = QUERY_IS_ASC_QUERY(pQuery)? pDataBlockInfo->window.ekey:pDataBlockInfo->window.skey; + bool interp = setTimeWindowInterpolationEndTs(pRuntimeEnv, endRowIndex, pDataBlock, tsCols, endKey, &nextWin); if (interp) { setResultRowInterpo(pResult, RESULT_ROW_END_INTERP); } @@ -1459,8 +1498,84 @@ static bool functionNeedToExecute(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx return true; } +void doRowwiseTimeWindowInterpolation(SQueryRuntimeEnv* pRuntimeEnv, SArray* pDataBlock, TSKEY prevTs, int32_t prevRowIndex, TSKEY curTs, int32_t curRowIndex, TSKEY windowKey, int32_t type) { + SQuery* pQuery = pRuntimeEnv->pQuery; + for (int32_t k = 0; k < pQuery->numOfOutput; ++k) { + int32_t functionId = pQuery->pExpr1[k].base.functionId; + if (functionId != TSDB_FUNC_TWA) { + pRuntimeEnv->pCtx[k].start.key = INT64_MIN; + continue; + } + + SColIndex* pColIndex = &pQuery->pExpr1[k].base.colInfo; + int16_t index = pColIndex->colIndex; + SColumnInfoData* pColInfo = taosArrayGet(pDataBlock, index); + + assert(pColInfo->info.colId == pColIndex->colId && curTs != windowKey); + double v1 = 0, v2 = 0, v = 0; + + if (prevRowIndex == -1) { + GET_TYPED_DATA(v1, double, pColInfo->info.type, (char *)pRuntimeEnv->prevRow[k]); + } else { + GET_TYPED_DATA(v1, double, pColInfo->info.type, (char *)pColInfo->pData + prevRowIndex * pColInfo->info.bytes); + } + + GET_TYPED_DATA(v2, double, pColInfo->info.type, (char *)pColInfo->pData + curRowIndex * pColInfo->info.bytes); + + SPoint point1 = (SPoint){.key = prevTs, .val = &v1}; + SPoint point2 = (SPoint){.key = curTs, .val = &v2}; + SPoint point = (SPoint){.key = windowKey, .val = &v}; + taosGetLinearInterpolationVal(TSDB_DATA_TYPE_DOUBLE, &point1, &point2, &point); + + if (type == RESULT_ROW_START_INTERP) { + pRuntimeEnv->pCtx[k].start.key = point.key; + pRuntimeEnv->pCtx[k].start.val = v; + } else { + pRuntimeEnv->pCtx[k].end.key = point.key; + pRuntimeEnv->pCtx[k].end.val = v; + } + } +} + +static void setTimeWindowSKeyInterp(SQueryRuntimeEnv* pRuntimeEnv, SArray* pDataBlock, TSKEY prevTs, int32_t prevRowIndex, TSKEY ts, int32_t offset, SResultRow* pResult, STimeWindow* win) { + SQuery* pQuery = pRuntimeEnv->pQuery; + + bool done = resultRowInterpolated(pResult, RESULT_ROW_START_INTERP); + if (!done) { + TSKEY key = QUERY_IS_ASC_QUERY(pQuery)? win->skey:win->ekey; + if (key == ts) { + setResultRowInterpo(pResult, RESULT_ROW_START_INTERP); + } else if (prevTs != INT64_MIN && ((QUERY_IS_ASC_QUERY(pQuery) && prevTs < key) || (!QUERY_IS_ASC_QUERY(pQuery) && prevTs > key))) { + doRowwiseTimeWindowInterpolation(pRuntimeEnv, pDataBlock, prevTs, prevRowIndex, ts, offset, key, RESULT_ROW_START_INTERP); + setResultRowInterpo(pResult, RESULT_ROW_START_INTERP); + } else { + setNotInterpoWindowKey(pRuntimeEnv->pCtx, pQuery->numOfOutput, RESULT_ROW_START_INTERP); + } + + setNotInterpoWindowKey(pRuntimeEnv->pCtx, pQuery->numOfOutput, RESULT_ROW_END_INTERP); + for (int32_t k = 0; k < pQuery->numOfOutput; ++k) { + pRuntimeEnv->pCtx[k].size = 1; + } + } else { + setNotInterpoWindowKey(pRuntimeEnv->pCtx, pQuery->numOfOutput, RESULT_ROW_START_INTERP); + } +} + +static void setTimeWindowEKeyInterp(SQueryRuntimeEnv* pRuntimeEnv, SArray* pDataBlock, TSKEY prevTs, int32_t prevRowIndex, TSKEY ts, int32_t offset, SResultRow* pResult, STimeWindow* win) { + SQuery* pQuery = pRuntimeEnv->pQuery; + + TSKEY key = QUERY_IS_ASC_QUERY(pQuery)? win->ekey:win->skey; + doRowwiseTimeWindowInterpolation(pRuntimeEnv, pDataBlock, prevTs, prevRowIndex, ts, offset, key, RESULT_ROW_END_INTERP); + setResultRowInterpo(pResult, RESULT_ROW_END_INTERP); + + setNotInterpoWindowKey(pRuntimeEnv->pCtx, pQuery->numOfOutput, RESULT_ROW_START_INTERP); + for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { + pRuntimeEnv->pCtx[i].size = 0; + } +} + static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pStatis, SDataBlockInfo *pDataBlockInfo, - SWindowResInfo *pWindowResInfo, SArray *pDataBlock) { + SResultRowInfo *pWindowResInfo, SArray *pDataBlock) { SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx; bool masterScan = IS_MASTER_SCAN(pRuntimeEnv); @@ -1489,6 +1604,7 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS for (int32_t k = 0; k < pQuery->numOfOutput; ++k) { char *dataBlock = getDataBlock(pRuntimeEnv, &sasArray[k], k, pDataBlockInfo->rows, pDataBlock); setExecParams(pQuery, &pCtx[k], dataBlock, tsCols, pDataBlockInfo, pStatis, &sasArray[k], k, pQInfo->vgId); + pCtx[k].size = 1; } // set the input column data @@ -1508,7 +1624,8 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS } int32_t offset = -1; -// TSKEY prev = -1; + TSKEY prevTs = *(TSKEY*) pRuntimeEnv->prevRow[0]; + int32_t prevRowIndex = -1; for (int32_t j = 0; j < pDataBlockInfo->rows; ++j) { offset = GET_COL_DATA_POS(pQuery, j, step); @@ -1530,7 +1647,9 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS // interval window query, decide the time window according to the primary timestamp if (QUERY_IS_INTERVAL_QUERY(pQuery)) { - int64_t ts = tsCols[offset]; + int32_t prevWindowIndex = curTimeWindowIndex(pWindowResInfo); + int64_t ts = tsCols[offset]; + STimeWindow win = getActiveTimeWindow(pWindowResInfo, ts, pQuery); bool hasTimeWindow = false; @@ -1543,27 +1662,35 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS if (!hasTimeWindow) { continue; } -/* + // window start key interpolation if (pRuntimeEnv->timeWindowInterpo) { - bool alreadyInterp = isResultRowInterpo(pResult, RESULT_ROW_START_INTERP); - if (!alreadyInterp) { - bool interp = setTimeWindowInterpolationStartTs(pRuntimeEnv, pos, pDataBlock, tsCols, &win); - if (interp) { - setResultRowInterpo(pResult, RESULT_ROW_START_INTERP); + // check for the time window end time interpolation + int32_t curIndex = curTimeWindowIndex(pWindowResInfo); + if (prevWindowIndex != -1 && prevWindowIndex < curIndex) { + for (int32_t k = prevWindowIndex; k < curIndex; ++k) { + SResultRow *pRes = pWindowResInfo->pResult[k]; + + ret = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pDataBlockInfo, &pRes->win, masterScan, &hasTimeWindow, &pResult); + assert(ret == TSDB_CODE_SUCCESS && !resultRowInterpolated(pResult, RESULT_ROW_END_INTERP)); + + setTimeWindowEKeyInterp(pRuntimeEnv, pDataBlock, prevTs, prevRowIndex, ts, offset, pResult, &pRes->win); + + bool closed = getResultRowStatus(pWindowResInfo, curTimeWindowIndex(pWindowResInfo)); + doRowwiseApplyFunctions(pRuntimeEnv, closed, &pRes->win, offset); + } + + // restore current time window + ret = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pDataBlockInfo, &win, masterScan, &hasTimeWindow, + &pResult); + if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code + continue; } } - alreadyInterp = isResultRowInterpo(pResult, RESULT_ROW_END_INTERP); - if (!alreadyInterp) { - bool interp = setTimeWindowInterpolationEndTs(pRuntimeEnv, pQuery->pos + forwardStep - 1, pDataBlock, tsCols, - pDataBlockInfo->window.ekey, &win); - if (interp) { - setResultRowInterpo(pResult, RESULT_ROW_END_INTERP); - } - } + setTimeWindowSKeyInterp(pRuntimeEnv, pDataBlock, prevTs, prevRowIndex, ts, offset, pResult, &win); } -*/ + bool closed = getResultRowStatus(pWindowResInfo, curTimeWindowIndex(pWindowResInfo)); doRowwiseApplyFunctions(pRuntimeEnv, closed, &win, offset); @@ -1588,26 +1715,7 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS } if (hasTimeWindow) { -/* - // window start(end) key interpolation - if (pRuntimeEnv->timeWindowInterpo) { - bool alreadyInterp = isResultRowInterpo(pResult, RESULT_ROW_START_INTERP); - if (!alreadyInterp) { - bool interp = setTimeWindowInterpolationStartTs(pRuntimeEnv, startPos, pDataBlock, tsCols, &nextWin); - if (interp) { - setResultRowInterpo(pResult, RESULT_ROW_START_INTERP); - } - } - - alreadyInterp = isResultRowInterpo(pResult, RESULT_ROW_END_INTERP); - if (!alreadyInterp) { - bool interp = setTimeWindowInterpolationEndTs(pRuntimeEnv, startPos + forwardStep - 1, pDataBlock, tsCols, pDataBlockInfo->window.ekey, &nextWin); - if (interp) { - setResultRowInterpo(pResult, RESULT_ROW_END_INTERP); - } - } - } -*/ + setTimeWindowSKeyInterp(pRuntimeEnv, pDataBlock, prevTs, prevRowIndex, ts, offset, pResult, &nextWin); closed = getResultRowStatus(pWindowResInfo, curTimeWindowIndex(pWindowResInfo)); doRowwiseApplyFunctions(pRuntimeEnv, closed, &nextWin, offset); } @@ -1633,7 +1741,8 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS } } -// prev = tsCols[offset]; + prevTs = tsCols[offset]; + prevRowIndex = offset; if (pRuntimeEnv->pTSBuf != NULL) { // if timestamp filter list is empty, quit current query @@ -1672,7 +1781,7 @@ static int32_t tableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBl SQuery *pQuery = pRuntimeEnv->pQuery; STableQueryInfo* pTableQInfo = pQuery->current; - SWindowResInfo* pWindowResInfo = &pRuntimeEnv->windowResInfo; + SResultRowInfo* pWindowResInfo = &pRuntimeEnv->windowResInfo; if (pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTSBuf != NULL || pRuntimeEnv->groupbyNormalCol) { rowwiseApplyFunctions(pRuntimeEnv, pStatis, pDataBlockInfo, pWindowResInfo, pDataBlock); @@ -2496,7 +2605,7 @@ static bool overlapWithTimeWindow(SQuery* pQuery, SDataBlockInfo* pBlockInfo) { return false; } -int32_t loadDataBlockOnDemand(SQueryRuntimeEnv *pRuntimeEnv, SWindowResInfo * pWindowResInfo, void* pQueryHandle, SDataBlockInfo* pBlockInfo, SDataStatis **pStatis, SArray** pDataBlock, uint32_t* status) { +int32_t loadDataBlockOnDemand(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo * pWindowResInfo, void* pQueryHandle, SDataBlockInfo* pBlockInfo, SDataStatis **pStatis, SArray** pDataBlock, uint32_t* status) { SQuery *pQuery = pRuntimeEnv->pQuery; *status = BLK_DATA_NO_NEEDED; @@ -2674,6 +2783,7 @@ static void ensureOutputBufferSimple(SQueryRuntimeEnv* pRuntimeEnv, int32_t capa pQuery->rec.capacity = capacity; } +// TODO merge with enuserOutputBufferSimple static void ensureOutputBuffer(SQueryRuntimeEnv* pRuntimeEnv, SDataBlockInfo* pBlockInfo) { // in case of prj/diff query, ensure the output buffer is sufficient to accommodate the results of current block SQuery* pQuery = pRuntimeEnv->pQuery; @@ -2718,7 +2828,7 @@ static void doSetInitialTimewindow(SQueryRuntimeEnv* pRuntimeEnv, SDataBlockInfo if (QUERY_IS_INTERVAL_QUERY(pQuery) && pRuntimeEnv->windowResInfo.prevSKey == TSKEY_INITIAL_VAL) { STimeWindow w = TSWINDOW_INITIALIZER; - SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo; + SResultRowInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo; if (QUERY_IS_ASC_QUERY(pQuery)) { getAlignQueryTimeWindow(pQuery, pBlockInfo->window.skey, pBlockInfo->window.skey, pQuery->window.ekey, &w); @@ -3082,14 +3192,14 @@ int32_t tableResultComparFn(const void *pLeft, const void *pRight, void *param) return -1; } - SWindowResInfo *pWindowResInfo1 = &supporter->pTableQueryInfo[left]->windowResInfo; + SResultRowInfo *pWindowResInfo1 = &supporter->pTableQueryInfo[left]->windowResInfo; SResultRow * pWindowRes1 = getResultRow(pWindowResInfo1, leftPos); tFilePage *page1 = getResBufPage(pRuntimeEnv->pResultBuf, pWindowRes1->pageId); char *b1 = getPosInResultPage(pRuntimeEnv, PRIMARYKEY_TIMESTAMP_COL_INDEX, pWindowRes1, page1); TSKEY leftTimestamp = GET_INT64_VAL(b1); - SWindowResInfo *pWindowResInfo2 = &supporter->pTableQueryInfo[right]->windowResInfo; + SResultRowInfo *pWindowResInfo2 = &supporter->pTableQueryInfo[right]->windowResInfo; SResultRow * pWindowRes2 = getResultRow(pWindowResInfo2, rightPos); tFilePage *page2 = getResBufPage(pRuntimeEnv->pResultBuf, pWindowRes2->pageId); @@ -3329,7 +3439,7 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) { int32_t pos = pTree->pNode[0].index; - SWindowResInfo *pWindowResInfo = &pTableList[pos]->windowResInfo; + SResultRowInfo *pWindowResInfo = &pTableList[pos]->windowResInfo; SResultRow *pWindowRes = getResultRow(pWindowResInfo, cs.position[pos]); tFilePage *page = getResBufPage(pRuntimeEnv->pResultBuf, pWindowRes->pageId); @@ -3477,17 +3587,9 @@ static void updateTableQueryInfoForReverseScan(SQuery *pQuery, STableQueryInfo * // order has changed already int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); - - // TODO validate the assertion -// if (!QUERY_IS_ASC_QUERY(pQuery)) { -// assert(pTableQueryInfo->win.ekey >= pTableQueryInfo->lastKey + step); -// } else { -// assert(pTableQueryInfo->win.ekey <= pTableQueryInfo->lastKey + step); -// } - if (pTableQueryInfo->lastKey == pTableQueryInfo->win.skey) { // do nothing, no results - } else { + } else {// NOTE: even win.skey != lastKey, the results may not generated. pTableQueryInfo->win.ekey = pTableQueryInfo->lastKey + step; } @@ -3501,7 +3603,7 @@ static void updateTableQueryInfoForReverseScan(SQuery *pQuery, STableQueryInfo * pTableQueryInfo->windowResInfo.curIndex = pTableQueryInfo->windowResInfo.size - 1; } -static void disableFuncInReverseScanImpl(SQueryRuntimeEnv* pRuntimeEnv, SWindowResInfo *pWindowResInfo, int32_t order) { +static void disableFuncInReverseScanImpl(SQueryRuntimeEnv* pRuntimeEnv, SResultRowInfo *pWindowResInfo, int32_t order) { SQuery* pQuery = pRuntimeEnv->pQuery; for (int32_t i = 0; i < pWindowResInfo->size; ++i) { @@ -3533,7 +3635,7 @@ void disableFuncInReverseScan(SQInfo *pQInfo) { int32_t order = pQuery->order.order; // group by normal columns and interval query on normal table - SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo; + SResultRowInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo; if (pRuntimeEnv->groupbyNormalCol || QUERY_IS_INTERVAL_QUERY(pQuery)) { disableFuncInReverseScanImpl(pRuntimeEnv, pWindowResInfo, order); } else { // for simple result of table query, @@ -3724,7 +3826,7 @@ bool needScanDataBlocksAgain(SQueryRuntimeEnv *pRuntimeEnv) { bool toContinue = false; if (pRuntimeEnv->groupbyNormalCol || QUERY_IS_INTERVAL_QUERY(pQuery)) { // for each group result, call the finalize function for each column - SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo; + SResultRowInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo; for (int32_t i = 0; i < pWindowResInfo->size; ++i) { SResultRow *pResult = getResultRow(pWindowResInfo, i); @@ -3809,13 +3911,7 @@ static void setEnvBeforeReverseScan(SQueryRuntimeEnv *pRuntimeEnv, SQueryStatusI SET_REVERSE_SCAN_FLAG(pRuntimeEnv); - STsdbQueryCond cond = { - .order = pQuery->order.order, - .colList = pQuery->colList, - .numOfCols = pQuery->numOfCols, - }; - - TIME_WINDOW_COPY(cond.twindow, pQuery->window); + STsdbQueryCond cond = createTsdbQueryCond(pQuery); setQueryStatus(pQuery, QUERY_NOT_COMPLETED); switchCtxOrder(pRuntimeEnv); @@ -3884,6 +3980,8 @@ void scanOneTableDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, TSKEY start) { // do nothing if no data blocks are found qualified during scan if (qstatus.lastKey != pTableQueryInfo->lastKey) { qstatus.curWindow.ekey = pTableQueryInfo->lastKey - step; + } else { // the lastkey does not increase, which means no data checked yet + qDebug("QInfo:%p no results generated in this scan", pQInfo); } qstatus.lastKey = pTableQueryInfo->lastKey; @@ -3898,18 +3996,11 @@ void scanOneTableDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, TSKEY start) { break; } - STsdbQueryCond cond = { - .order = pQuery->order.order, - .colList = pQuery->colList, - .numOfCols = pQuery->numOfCols, - }; - - TIME_WINDOW_COPY(cond.twindow, qstatus.curWindow); - if (pRuntimeEnv->pSecQueryHandle != NULL) { tsdbCleanupQueryHandle(pRuntimeEnv->pSecQueryHandle); } + STsdbQueryCond cond = createTsdbQueryCond(pQuery); restoreTimeWindow(&pQInfo->tableGroupInfo, &cond); pRuntimeEnv->pSecQueryHandle = tsdbQueryTables(pQInfo->tsdb, &cond, &pQInfo->tableGroupInfo, pQInfo, &pQInfo->memRef); if (pRuntimeEnv->pSecQueryHandle == NULL) { @@ -3947,7 +4038,7 @@ void finalizeQueryResult(SQueryRuntimeEnv *pRuntimeEnv) { if (pRuntimeEnv->groupbyNormalCol || QUERY_IS_INTERVAL_QUERY(pQuery)) { // for each group result, call the finalize function for each column - SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo; + SResultRowInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo; if (pRuntimeEnv->groupbyNormalCol) { closeAllTimeWindow(pWindowResInfo); } @@ -4003,9 +4094,8 @@ static STableQueryInfo *createTableQueryInfo(SQueryRuntimeEnv *pRuntimeEnv, void // set more initial size of interval/groupby query if (QUERY_IS_INTERVAL_QUERY(pQuery) || pRuntimeEnv->groupbyNormalCol) { - int32_t initialSize = 16; - int32_t initialThreshold = 100; - int32_t code = initWindowResInfo(&pTableQueryInfo->windowResInfo, initialSize, initialThreshold, TSDB_DATA_TYPE_INT); + int32_t initialSize = 128; + int32_t code = initWindowResInfo(&pTableQueryInfo->windowResInfo, initialSize, TSDB_DATA_TYPE_INT); if (code != TSDB_CODE_SUCCESS) { return NULL; } @@ -4032,7 +4122,7 @@ void destroyTableQueryInfoImpl(STableQueryInfo *pTableQueryInfo) { void setExecutionContext(SQInfo *pQInfo, int32_t groupIndex, TSKEY nextKey) { SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; STableQueryInfo *pTableQueryInfo = pRuntimeEnv->pQuery->current; - SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo; + SResultRowInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo; // lastKey needs to be updated pTableQueryInfo->lastKey = nextKey; @@ -4200,7 +4290,7 @@ void setIntervalQueryRange(SQInfo *pQInfo, TSKEY key) { * operations involve. */ STimeWindow w = TSWINDOW_INITIALIZER; - SWindowResInfo *pWindowResInfo = &pTableQueryInfo->windowResInfo; + SResultRowInfo *pWindowResInfo = &pTableQueryInfo->windowResInfo; TSKEY sk = MIN(win.skey, win.ekey); TSKEY ek = MAX(win.skey, win.ekey); @@ -4244,7 +4334,7 @@ bool needPrimaryTimestampCol(SQuery *pQuery, SDataBlockInfo *pDataBlockInfo) { return loadPrimaryTS; } -static int32_t doCopyToSData(SQInfo *pQInfo, SWindowResInfo *pResultInfo, int32_t orderType) { +static int32_t doCopyToSData(SQInfo *pQInfo, SResultRowInfo *pResultInfo, int32_t orderType) { SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; SQuery * pQuery = pRuntimeEnv->pQuery; @@ -4321,7 +4411,7 @@ static int32_t doCopyToSData(SQInfo *pQInfo, SWindowResInfo *pResultInfo, int32_ * @param pQInfo * @param result */ -void copyFromWindowResToSData(SQInfo *pQInfo, SWindowResInfo *pResultInfo) { +void copyFromWindowResToSData(SQInfo *pQInfo, SResultRowInfo *pResultInfo) { SQuery *pQuery = pQInfo->runtimeEnv.pQuery; int32_t orderType = (pQuery->pGroupbyExpr != NULL) ? pQuery->pGroupbyExpr->orderType : TSDB_ORDER_ASC; @@ -4360,7 +4450,7 @@ static void stableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBloc SQuery * pQuery = pRuntimeEnv->pQuery; STableQueryInfo* pTableQueryInfo = pQuery->current; - SWindowResInfo * pWindowResInfo = &pTableQueryInfo->windowResInfo; + SResultRowInfo * pWindowResInfo = &pTableQueryInfo->windowResInfo; pQuery->pos = QUERY_IS_ASC_QUERY(pQuery)? 0 : pDataBlockInfo->rows - 1; if (pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTSBuf != NULL || pRuntimeEnv->groupbyNormalCol) { @@ -4434,16 +4524,19 @@ static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data } } - int32_t numOfTables = (int32_t)taosArrayGetSize(pQInfo->arrTableIdInfo); + int32_t numOfTables = (int32_t) taosHashGetSize(pQInfo->arrTableIdInfo); *(int32_t*)data = htonl(numOfTables); data += sizeof(int32_t); - for(int32_t i = 0; i < numOfTables; i++) { - STableIdInfo* pSrc = taosArrayGet(pQInfo->arrTableIdInfo, i); + + STableIdInfo* item = taosHashIterate(pQInfo->arrTableIdInfo, NULL); + while(item) { STableIdInfo* pDst = (STableIdInfo*)data; - pDst->uid = htobe64(pSrc->uid); - pDst->tid = htonl(pSrc->tid); - pDst->key = htobe64(pSrc->key); + pDst->uid = htobe64(item->uid); + pDst->tid = htonl(item->tid); + pDst->key = htobe64(item->key); + data += sizeof(STableIdInfo); + item = taosHashIterate(pQInfo->arrTableIdInfo, item); } // Check if query is completed or not for stable query or normal table query respectively. @@ -4605,7 +4698,7 @@ void skipBlocks(SQueryRuntimeEnv *pRuntimeEnv) { static TSKEY doSkipIntervalProcess(SQueryRuntimeEnv* pRuntimeEnv, STimeWindow* win, SDataBlockInfo* pBlockInfo, STableQueryInfo* pTableQueryInfo) { SQuery *pQuery = pRuntimeEnv->pQuery; - SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo; + SResultRowInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo; assert(pQuery->limit.offset == 0); STimeWindow tw = *win; @@ -4655,7 +4748,23 @@ static TSKEY doSkipIntervalProcess(SQueryRuntimeEnv* pRuntimeEnv, STimeWindow* w static bool skipTimeInterval(SQueryRuntimeEnv *pRuntimeEnv, TSKEY* start) { SQuery *pQuery = pRuntimeEnv->pQuery; - *start = pQuery->current->lastKey; + + // get the first unclosed time window + bool assign = false; + for(int32_t i = 0; i < pRuntimeEnv->windowResInfo.size; ++i) { + if (pRuntimeEnv->windowResInfo.pResult[i]->closed) { + continue; + } + + assign = true; + *start = pRuntimeEnv->windowResInfo.pResult[i]->win.skey; + } + + if (!assign) { + *start = pQuery->current->lastKey; + } + + assert(*start <= pQuery->current->lastKey); // if queried with value filter, do NOT forward query start position if (pQuery->limit.offset <= 0 || pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTSBuf != NULL || pRuntimeEnv->pFillInfo != NULL) { @@ -4671,7 +4780,7 @@ static bool skipTimeInterval(SQueryRuntimeEnv *pRuntimeEnv, TSKEY* start) { STimeWindow w = TSWINDOW_INITIALIZER; - SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo; + SResultRowInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo; STableQueryInfo *pTableQueryInfo = pQuery->current; SDataBlockInfo blockInfo = SDATA_BLOCK_INITIALIZER; @@ -4770,13 +4879,7 @@ static int32_t setupQueryHandle(void* tsdb, SQInfo* pQInfo, bool isSTableQuery) return TSDB_CODE_SUCCESS; } - STsdbQueryCond cond = { - .order = pQuery->order.order, - .colList = pQuery->colList, - .numOfCols = pQuery->numOfCols, - }; - - TIME_WINDOW_COPY(cond.twindow, pQuery->window); + STsdbQueryCond cond = createTsdbQueryCond(pQuery); if (!isSTableQuery && (pQInfo->tableqinfoGroupInfo.numOfTables == 1) @@ -4893,20 +4996,13 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bo if (!QUERY_IS_INTERVAL_QUERY(pQuery)) { int16_t type = TSDB_DATA_TYPE_NULL; - int32_t threshold = 0; - if (pRuntimeEnv->groupbyNormalCol) { // group by columns not tags; type = getGroupbyColumnType(pQuery, pQuery->pGroupbyExpr); - threshold = 4000; } else { type = TSDB_DATA_TYPE_INT; // group id - threshold = (int32_t)(GET_NUM_OF_TABLEGROUP(pQInfo)); - if (threshold < 8) { - threshold = 8; - } } - code = initWindowResInfo(&pRuntimeEnv->windowResInfo, 8, threshold, type); + code = initWindowResInfo(&pRuntimeEnv->windowResInfo, 8, type); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -4926,7 +5022,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bo type = TSDB_DATA_TYPE_TIMESTAMP; } - code = initWindowResInfo(&pRuntimeEnv->windowResInfo, numOfResultRows, 1024, type); + code = initWindowResInfo(&pRuntimeEnv->windowResInfo, numOfResultRows, type); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -4984,6 +5080,20 @@ static FORCE_INLINE void setEnvForEachBlock(SQInfo* pQInfo, STableQueryInfo* pTa } } +static void doTableQueryInfoTimeWindowCheck(SQuery* pQuery, STableQueryInfo* pTableQueryInfo) { + if (QUERY_IS_ASC_QUERY(pQuery)) { + assert( + (pTableQueryInfo->win.skey <= pTableQueryInfo->win.ekey) && + (pTableQueryInfo->lastKey >= pTableQueryInfo->win.skey) && + (pTableQueryInfo->win.skey >= pQuery->window.skey && pTableQueryInfo->win.ekey <= pQuery->window.ekey)); + } else { + assert( + (pTableQueryInfo->win.skey >= pTableQueryInfo->win.ekey) && + (pTableQueryInfo->lastKey <= pTableQueryInfo->win.skey) && + (pTableQueryInfo->win.skey <= pQuery->window.skey && pTableQueryInfo->win.ekey >= pQuery->window.ekey)); + } +} + static int64_t scanMultiTableDataBlocks(SQInfo *pQInfo) { SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; SQuery* pQuery = pRuntimeEnv->pQuery; @@ -5010,17 +5120,7 @@ static int64_t scanMultiTableDataBlocks(SQInfo *pQInfo) { } pQuery->current = *pTableQueryInfo; - if (QUERY_IS_ASC_QUERY(pQuery)) { - assert( - ((*pTableQueryInfo)->win.skey <= (*pTableQueryInfo)->win.ekey) && - ((*pTableQueryInfo)->lastKey >= (*pTableQueryInfo)->win.skey) && - ((*pTableQueryInfo)->win.skey >= pQuery->window.skey && (*pTableQueryInfo)->win.ekey <= pQuery->window.ekey)); - } else { - assert( - ((*pTableQueryInfo)->win.skey >= (*pTableQueryInfo)->win.ekey) && - ((*pTableQueryInfo)->lastKey <= (*pTableQueryInfo)->win.skey) && - ((*pTableQueryInfo)->win.skey <= pQuery->window.skey && (*pTableQueryInfo)->win.ekey >= pQuery->window.ekey)); - } + doTableQueryInfoTimeWindowCheck(pQuery, *pTableQueryInfo); if (!pRuntimeEnv->groupbyNormalCol) { setEnvForEachBlock(pQInfo, *pTableQueryInfo, &blockInfo); @@ -5169,6 +5269,41 @@ static bool multiTableMultioutputHelper(SQInfo *pQInfo, int32_t index) { return true; } +STsdbQueryCond createTsdbQueryCond(SQuery* pQuery) { + STsdbQueryCond cond = { + .colList = pQuery->colList, + .order = pQuery->order.order, + .numOfCols = pQuery->numOfCols, + }; + + TIME_WINDOW_COPY(cond.twindow, pQuery->window); + return cond; +} + +static STableIdInfo createTableIdInfo(SQuery* pQuery) { + assert(pQuery != NULL && pQuery->current != NULL); + + STableIdInfo tidInfo; + STableId* id = TSDB_TABLEID(pQuery->current->pTable); + + tidInfo.uid = id->uid; + tidInfo.tid = id->tid; + tidInfo.key = pQuery->current->lastKey; + + return tidInfo; +} + +static void updateTableIdInfo(SQuery* pQuery, SHashObj* pTableIdInfo) { + STableIdInfo tidInfo = createTableIdInfo(pQuery); + STableIdInfo* idinfo = taosHashGet(pTableIdInfo, &tidInfo.tid, sizeof(tidInfo.tid)); + if (idinfo != NULL) { + assert(idinfo->tid == tidInfo.tid && idinfo->uid == tidInfo.uid); + idinfo->key = tidInfo.key; + } else { + taosHashPut(pTableIdInfo, &tidInfo.tid, sizeof(tidInfo.tid), &tidInfo, sizeof(STableIdInfo)); + } +} + /** * super table query handler * 1. super table projection query, group-by on normal columns query, ts-comp query @@ -5188,18 +5323,11 @@ static void sequentialTableProcess(SQInfo *pQInfo) { assert(pQuery->limit.offset == 0 && pQuery->limit.limit != 0); while (pQInfo->groupIndex < numOfGroups) { - SArray* group = taosArrayGetP(pQInfo->tableGroupInfo.pGroupList, pQInfo->groupIndex); + SArray *group = taosArrayGetP(pQInfo->tableGroupInfo.pGroupList, pQInfo->groupIndex); - qDebug("QInfo:%p last_row query on group:%d, total group:%" PRIzu ", current group:%p", pQInfo, pQInfo->groupIndex, - numOfGroups, group); - - STsdbQueryCond cond = { - .colList = pQuery->colList, - .order = pQuery->order.order, - .numOfCols = pQuery->numOfCols, - }; - - TIME_WINDOW_COPY(cond.twindow, pQuery->window); + qDebug("QInfo:%p point interpolation query on group:%d, total group:%" PRIzu ", current group:%p", pQInfo, + pQInfo->groupIndex, numOfGroups, group); + STsdbQueryCond cond = createTsdbQueryCond(pQuery); SArray *g1 = taosArrayInit(1, POINTER_BYTES); SArray *tx = taosArrayClone(group); @@ -5223,14 +5351,14 @@ static void sequentialTableProcess(SQInfo *pQInfo) { initCtxOutputBuf(pRuntimeEnv); - SArray* s = tsdbGetQueriedTableList(pRuntimeEnv->pQueryHandle); + SArray *s = tsdbGetQueriedTableList(pRuntimeEnv->pQueryHandle); assert(taosArrayGetSize(s) >= 1); setTagVal(pRuntimeEnv, taosArrayGetP(s, 0), pQInfo->tsdb); taosArrayDestroy(s); // here we simply set the first table as current table - SArray* first = GET_TABLEGROUP(pQInfo, pQInfo->groupIndex); + SArray *first = GET_TABLEGROUP(pQInfo, pQInfo->groupIndex); pQuery->current = taosArrayGetP(first, 0); scanOneTableDataBlocks(pRuntimeEnv, pQuery->current->lastKey); @@ -5252,19 +5380,14 @@ static void sequentialTableProcess(SQInfo *pQInfo) { break; } } - } else if (pRuntimeEnv->groupbyNormalCol) { // group-by on normal columns query + } else if (pRuntimeEnv->groupbyNormalCol) { // group-by on normal columns query while (pQInfo->groupIndex < numOfGroups) { - SArray* group = taosArrayGetP(pQInfo->tableGroupInfo.pGroupList, pQInfo->groupIndex); + SArray *group = taosArrayGetP(pQInfo->tableGroupInfo.pGroupList, pQInfo->groupIndex); - qDebug("QInfo:%p group by normal columns group:%d, total group:%" PRIzu "", pQInfo, pQInfo->groupIndex, numOfGroups); + qDebug("QInfo:%p group by normal columns group:%d, total group:%" PRIzu "", pQInfo, pQInfo->groupIndex, + numOfGroups); - STsdbQueryCond cond = { - .colList = pQuery->colList, - .order = pQuery->order.order, - .numOfCols = pQuery->numOfCols, - }; - - TIME_WINDOW_COPY(cond.twindow, pQuery->window); + STsdbQueryCond cond = createTsdbQueryCond(pQuery); SArray *g1 = taosArrayInit(1, POINTER_BYTES); SArray *tx = taosArrayClone(group); @@ -5287,7 +5410,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) { longjmp(pRuntimeEnv->env, terrno); } - SArray* s = tsdbGetQueriedTableList(pRuntimeEnv->pQueryHandle); + SArray *s = tsdbGetQueriedTableList(pRuntimeEnv->pQueryHandle); assert(taosArrayGetSize(s) >= 1); setTagVal(pRuntimeEnv, taosArrayGetP(s, 0), pQInfo->tsdb); @@ -5296,26 +5419,26 @@ static void sequentialTableProcess(SQInfo *pQInfo) { scanMultiTableDataBlocks(pQInfo); pQInfo->groupIndex += 1; - SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo; + SResultRowInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo; - // no results generated for current group, continue to try the next group + // no results generated for current group, continue to try the next group taosArrayDestroy(s); if (pWindowResInfo->size <= 0) { continue; } for (int32_t i = 0; i < pWindowResInfo->size; ++i) { - pWindowResInfo->pResult[i]->closed = true; // enable return all results for group by normal columns + pWindowResInfo->pResult[i]->closed = true; // enable return all results for group by normal columns SResultRow *pResult = pWindowResInfo->pResult[i]; for (int32_t j = 0; j < pQuery->numOfOutput; ++j) { - SResultRowCellInfo* pCell = getResultCell(pRuntimeEnv, pResult, j); + SResultRowCellInfo *pCell = getResultCell(pRuntimeEnv, pResult, j); pResult->numOfRows = (uint16_t)(MAX(pResult->numOfRows, pCell->numOfRes)); } } qDebug("QInfo:%p generated groupby columns results %d rows for group %d completed", pQInfo, pWindowResInfo->size, - pQInfo->groupIndex); + pQInfo->groupIndex); int32_t currentGroupIndex = pQInfo->groupIndex; pQuery->rec.rows = 0; @@ -5324,16 +5447,109 @@ static void sequentialTableProcess(SQInfo *pQInfo) { ensureOutputBufferSimple(pRuntimeEnv, pWindowResInfo->size); copyFromWindowResToSData(pQInfo, pWindowResInfo); - pQInfo->groupIndex = currentGroupIndex; //restore the group index + pQInfo->groupIndex = currentGroupIndex; // restore the group index assert(pQuery->rec.rows == pWindowResInfo->size); clearClosedTimeWindow(pRuntimeEnv); break; } + } else if (pRuntimeEnv->queryWindowIdentical && pRuntimeEnv->pTSBuf == NULL) { + //super table projection query with identical query time range for all tables. + SDataBlockInfo blockInfo = SDATA_BLOCK_INITIALIZER; + resetDefaultResInfoOutputBuf(pRuntimeEnv); + + SArray *group = GET_TABLEGROUP(pQInfo, 0); + assert(taosArrayGetSize(group) == pQInfo->tableqinfoGroupInfo.numOfTables && + 1 == taosArrayGetSize(pQInfo->tableqinfoGroupInfo.pGroupList)); + + void *pQueryHandle = pRuntimeEnv->pQueryHandle; + if (pQueryHandle == NULL) { + STsdbQueryCond con = createTsdbQueryCond(pQuery); + pRuntimeEnv->pQueryHandle = tsdbQueryTables(pQInfo->tsdb, &con, &pQInfo->tableGroupInfo, pQInfo, &pQInfo->memRef); + pQueryHandle = pRuntimeEnv->pQueryHandle; + } + + // skip blocks without load the actual data block from file if no filter condition present + // skipBlocks(&pQInfo->runtimeEnv); + // if (pQuery->limit.offset > 0 && pQuery->numOfFilterCols == 0) { + // setQueryStatus(pQuery, QUERY_COMPLETED); + // return; + // } + + bool hasMoreBlock = true; + SQueryCostInfo *summary = &pRuntimeEnv->summary; + while ((hasMoreBlock = tsdbNextDataBlock(pQueryHandle)) == true) { + summary->totalBlocks += 1; + + if (IS_QUERY_KILLED(pQInfo)) { + longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED); + } + + tsdbRetrieveDataBlockInfo(pQueryHandle, &blockInfo); + STableQueryInfo **pTableQueryInfo = + (STableQueryInfo **)taosHashGet(pQInfo->tableqinfoGroupInfo.map, &blockInfo.tid, sizeof(blockInfo.tid)); + if (pTableQueryInfo == NULL) { + break; + } + + pQuery->current = *pTableQueryInfo; + doTableQueryInfoTimeWindowCheck(pQuery, *pTableQueryInfo); + + if (pRuntimeEnv->hasTagResults) { + setTagVal(pRuntimeEnv, pQuery->current->pTable, pQInfo->tsdb); + } + + uint32_t status = 0; + SDataStatis *pStatis = NULL; + SArray *pDataBlock = NULL; + + int32_t ret = loadDataBlockOnDemand(pRuntimeEnv, &pQuery->current->windowResInfo, pQueryHandle, &blockInfo, + &pStatis, &pDataBlock, &status); + if (ret != TSDB_CODE_SUCCESS) { + break; + } + + assert(status != BLK_DATA_DISCARD); + ensureOutputBuffer(pRuntimeEnv, &blockInfo); + + pQuery->pos = QUERY_IS_ASC_QUERY(pQuery) ? 0 : blockInfo.rows - 1; + int32_t numOfRes = tableApplyFunctionsOnBlock(pRuntimeEnv, &blockInfo, pStatis, binarySearchForKey, pDataBlock); + + summary->totalRows += blockInfo.rows; + qDebug("QInfo:%p check data block, brange:%" PRId64 "-%" PRId64 ", numOfRows:%d, numOfRes:%d, lastKey:%" PRId64, + GET_QINFO_ADDR(pRuntimeEnv), blockInfo.window.skey, blockInfo.window.ekey, blockInfo.rows, numOfRes, + pQuery->current->lastKey); + + pQuery->rec.rows = getNumOfResult(pRuntimeEnv); + + // the flag may be set by tableApplyFunctionsOnBlock, clear it here + CLEAR_QUERY_STATUS(pQuery, QUERY_COMPLETED); + + updateTableIdInfo(pQuery, pQInfo->arrTableIdInfo); + skipResults(pRuntimeEnv); + + // the limitation of output result is reached, set the query completed + if (limitResults(pRuntimeEnv)) { + setQueryStatus(pQuery, QUERY_COMPLETED); + SET_STABLE_QUERY_OVER(pQInfo); + break; + } + + // while the output buffer is full or limit/offset is applied, query may be paused here + if (Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL|QUERY_COMPLETED)) { + break; + } + } + + if (!hasMoreBlock) { + setQueryStatus(pQuery, QUERY_COMPLETED); + SET_STABLE_QUERY_OVER(pQInfo); + } } else { /* - * 1. super table projection query, 2. ts-comp query - * if the subgroup index is larger than 0, results generated by group by tbname,k is existed. + * the following two cases handled here. + * 1. ts-comp query, and 2. the super table projection query with different query time range for each table. + * If the subgroup index is larger than 0, results generated by group by tbname,k is existed. * we need to return it to client in the first place. */ if (pQInfo->groupIndex > 0) { @@ -5396,14 +5612,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) { * to ensure that, we can reset the query range once query on a meter is completed. */ pQInfo->tableIndex++; - - STableIdInfo tidInfo = {0}; - - STableId* id = TSDB_TABLEID(pQuery->current->pTable); - tidInfo.uid = id->uid; - tidInfo.tid = id->tid; - tidInfo.key = pQuery->current->lastKey; - taosArrayPush(pQInfo->arrTableIdInfo, &tidInfo); + updateTableIdInfo(pQuery, pQInfo->arrTableIdInfo); // if the buffer is full or group by each table, we need to jump out of the loop if (Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL)) { @@ -5430,31 +5639,31 @@ static void sequentialTableProcess(SQInfo *pQInfo) { if (pQInfo->tableIndex >= pQInfo->tableqinfoGroupInfo.numOfTables) { setQueryStatus(pQuery, QUERY_COMPLETED); } - } - /* - * 1. super table projection query, group-by on normal columns query, ts-comp query - * 2. point interpolation query, last row query - * - * group-by on normal columns query and last_row query do NOT invoke the finalizer here, - * since the finalize stage will be done at the client side. - * - * projection query, point interpolation query do not need the finalizer. - * - * Only the ts-comp query requires the finalizer function to be executed here. - */ - if (isTSCompQuery(pQuery)) { - finalizeQueryResult(pRuntimeEnv); - } + /* + * 1. super table projection query, group-by on normal columns query, ts-comp query + * 2. point interpolation query, last row query + * + * group-by on normal columns query and last_row query do NOT invoke the finalizer here, + * since the finalize stage will be done at the client side. + * + * projection query, point interpolation query do not need the finalizer. + * + * Only the ts-comp query requires the finalizer function to be executed here. + */ + if (isTSCompQuery(pQuery)) { + finalizeQueryResult(pRuntimeEnv); + } - if (pRuntimeEnv->pTSBuf != NULL) { - pRuntimeEnv->cur = pRuntimeEnv->pTSBuf->cur; - } + if (pRuntimeEnv->pTSBuf != NULL) { + pRuntimeEnv->cur = pRuntimeEnv->pTSBuf->cur; + } - qDebug( - "QInfo %p numOfTables:%" PRIu64 ", index:%d, numOfGroups:%" PRIzu ", %" PRId64 " points returned, total:%" PRId64 ", offset:%" PRId64, - pQInfo, (uint64_t)pQInfo->tableqinfoGroupInfo.numOfTables, pQInfo->tableIndex, numOfGroups, pQuery->rec.rows, pQuery->rec.total, - pQuery->limit.offset); + qDebug("QInfo %p numOfTables:%" PRIu64 ", index:%d, numOfGroups:%" PRIzu ", %" PRId64 + " points returned, total:%" PRId64 ", offset:%" PRId64, + pQInfo, (uint64_t)pQInfo->tableqinfoGroupInfo.numOfTables, pQInfo->tableIndex, numOfGroups, pQuery->rec.rows, + pQuery->rec.total, pQuery->limit.offset); + } } static void doSaveContext(SQInfo *pQInfo) { @@ -5469,13 +5678,7 @@ static void doSaveContext(SQInfo *pQInfo) { SWITCH_ORDER(pRuntimeEnv->pTSBuf->cur.order); } - STsdbQueryCond cond = { - .order = pQuery->order.order, - .colList = pQuery->colList, - .numOfCols = pQuery->numOfCols, - }; - - TIME_WINDOW_COPY(cond.twindow, pQuery->window); + STsdbQueryCond cond = createTsdbQueryCond(pQuery); // clean unused handle if (pRuntimeEnv->pSecQueryHandle != NULL) { @@ -5748,13 +5951,8 @@ static void tableMultiOutputProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) qDebug("QInfo:%p query paused due to output limitation, next qrange:%" PRId64 "-%" PRId64, pQInfo, pQuery->current->lastKey, pQuery->window.ekey); } else if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) { - STableIdInfo tidInfo; - STableId* id = TSDB_TABLEID(pQuery->current->pTable); - - tidInfo.uid = id->uid; - tidInfo.tid = id->tid; - tidInfo.key = pQuery->current->lastKey; - taosArrayPush(pQInfo->arrTableIdInfo, &tidInfo); + STableIdInfo tidInfo = createTableIdInfo(pQuery); + taosHashPut(pQInfo->arrTableIdInfo, &tidInfo.tid, sizeof(tidInfo.tid), &tidInfo, sizeof(STableIdInfo)); } if (!isTSCompQuery(pQuery)) { @@ -6076,11 +6274,9 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList, pQueryMsg->interval.interval = htobe64(pQueryMsg->interval.interval); pQueryMsg->interval.sliding = htobe64(pQueryMsg->interval.sliding); pQueryMsg->interval.offset = htobe64(pQueryMsg->interval.offset); - // pQueryMsg->interval.intervalUnit = pQueryMsg->interval.intervalUnit; - // pQueryMsg->interval.slidingUnit = pQueryMsg->interval.slidingUnit; - // pQueryMsg->interval.offsetUnit = pQueryMsg->interval.offsetUnit; pQueryMsg->limit = htobe64(pQueryMsg->limit); pQueryMsg->offset = htobe64(pQueryMsg->offset); + pQueryMsg->tableLimit = htobe64(pQueryMsg->tableLimit); pQueryMsg->order = htons(pQueryMsg->order); pQueryMsg->orderColId = htons(pQueryMsg->orderColId); @@ -6752,8 +6948,6 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); } - int tableIndex = 0; - pQInfo->runtimeEnv.interBufSize = getOutputInterResultBufSize(pQuery); pQInfo->runtimeEnv.summary.tableInfoSize += (pTableGroupInfo->numOfTables * sizeof(STableQueryInfo)); @@ -6775,7 +6969,7 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou } // NOTE: pTableCheckInfo need to update the query time range and the lastKey info - pQInfo->arrTableIdInfo = taosArrayInit(tableIndex, sizeof(STableIdInfo)); + pQInfo->arrTableIdInfo = taosHashInit(pTableGroupInfo->numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK); pQInfo->dataReady = QUERY_RESULT_NOT_READY; pQInfo->rspContext = NULL; pthread_mutex_init(&pQInfo->lock, NULL); @@ -6785,10 +6979,10 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou pQuery->window = pQueryMsg->window; changeExecuteScanOrder(pQInfo, pQueryMsg, stableQuery); + pQInfo->runtimeEnv.queryWindowIdentical = true; STimeWindow window = pQuery->window; int32_t index = 0; - for(int32_t i = 0; i < numOfGroups; ++i) { SArray* pa = taosArrayGetP(pQInfo->tableGroupInfo.pGroupList, i); @@ -6803,9 +6997,12 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou for(int32_t j = 0; j < s; ++j) { STableKeyInfo* info = taosArrayGet(pa, j); - void* buf = (char*) pQInfo->pBuf + index * sizeof(STableQueryInfo); - window.skey = info->lastKey; + if (info->lastKey != pQuery->window.skey) { + pQInfo->runtimeEnv.queryWindowIdentical = false; + } + + void* buf = (char*) pQInfo->pBuf + index * sizeof(STableQueryInfo); STableQueryInfo* item = createTableQueryInfo(&pQInfo->runtimeEnv, info->pTable, window, buf); if (item == NULL) { goto _cleanup; @@ -7019,7 +7216,7 @@ static void freeQInfo(SQInfo *pQInfo) { tfree(pQInfo->pBuf); tsdbDestroyTableGroup(&pQInfo->tableGroupInfo); - taosArrayDestroy(pQInfo->arrTableIdInfo); + taosHashCleanup(pQInfo->arrTableIdInfo); pQInfo->signature = 0; @@ -7399,7 +7596,7 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co size_t size = getResultSize(pQInfo, &pQuery->rec.rows); size += sizeof(int32_t); - size += sizeof(STableIdInfo) * taosArrayGetSize(pQInfo->arrTableIdInfo); + size += sizeof(STableIdInfo) * taosHashGetSize(pQInfo->arrTableIdInfo); *contLen = (int32_t)(size + sizeof(SRetrieveTableRsp)); diff --git a/src/query/src/qUtil.c b/src/query/src/qUtil.c index 2c41d9bfc5..6c845b012f 100644 --- a/src/query/src/qUtil.c +++ b/src/query/src/qUtil.c @@ -43,51 +43,48 @@ int32_t getOutputInterResultBufSize(SQuery* pQuery) { return size; } -int32_t initWindowResInfo(SWindowResInfo *pWindowResInfo, int32_t size, int32_t threshold, int16_t type) { - pWindowResInfo->capacity = size; - pWindowResInfo->threshold = threshold; - - pWindowResInfo->type = type; - pWindowResInfo->curIndex = -1; - pWindowResInfo->size = 0; - pWindowResInfo->prevSKey = TSKEY_INITIAL_VAL; +int32_t initWindowResInfo(SResultRowInfo *pResultRowInfo, int32_t size, int16_t type) { + pResultRowInfo->capacity = size; - pWindowResInfo->pResult = calloc(pWindowResInfo->capacity, POINTER_BYTES); - if (pWindowResInfo->pResult == NULL) { + pResultRowInfo->type = type; + pResultRowInfo->curIndex = -1; + pResultRowInfo->size = 0; + pResultRowInfo->prevSKey = TSKEY_INITIAL_VAL; + + pResultRowInfo->pResult = calloc(pResultRowInfo->capacity, POINTER_BYTES); + if (pResultRowInfo->pResult == NULL) { return TSDB_CODE_QRY_OUT_OF_MEMORY; } return TSDB_CODE_SUCCESS; } -void cleanupTimeWindowInfo(SWindowResInfo *pWindowResInfo) { - if (pWindowResInfo == NULL) { +void cleanupTimeWindowInfo(SResultRowInfo *pResultRowInfo) { + if (pResultRowInfo == NULL) { return; } - if (pWindowResInfo->capacity == 0) { - assert(pWindowResInfo->pResult == NULL); + if (pResultRowInfo->capacity == 0) { + assert(pResultRowInfo->pResult == NULL); return; } - if (pWindowResInfo->type == TSDB_DATA_TYPE_BINARY || pWindowResInfo->type == TSDB_DATA_TYPE_NCHAR) { - for(int32_t i = 0; i < pWindowResInfo->size; ++i) { - tfree(pWindowResInfo->pResult[i]->key); + if (pResultRowInfo->type == TSDB_DATA_TYPE_BINARY || pResultRowInfo->type == TSDB_DATA_TYPE_NCHAR) { + for(int32_t i = 0; i < pResultRowInfo->size; ++i) { + tfree(pResultRowInfo->pResult[i]->key); } } - tfree(pWindowResInfo->pResult); + tfree(pResultRowInfo->pResult); } -void resetTimeWindowInfo(SQueryRuntimeEnv *pRuntimeEnv, SWindowResInfo *pWindowResInfo) { - if (pWindowResInfo == NULL || pWindowResInfo->capacity == 0) { +void resetTimeWindowInfo(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRowInfo) { + if (pResultRowInfo == NULL || pResultRowInfo->capacity == 0) { return; } -// assert(pWindowResInfo->size == 1); - - for (int32_t i = 0; i < pWindowResInfo->size; ++i) { - SResultRow *pWindowRes = pWindowResInfo->pResult[i]; - clearResultRow(pRuntimeEnv, pWindowRes, pWindowResInfo->type); + for (int32_t i = 0; i < pResultRowInfo->size; ++i) { + SResultRow *pWindowRes = pResultRowInfo->pResult[i]; + clearResultRow(pRuntimeEnv, pWindowRes, pResultRowInfo->type); int32_t groupIndex = 0; int64_t uid = 0; @@ -96,30 +93,30 @@ void resetTimeWindowInfo(SQueryRuntimeEnv *pRuntimeEnv, SWindowResInfo *pWindowR taosHashRemove(pRuntimeEnv->pResultRowHashTable, (const char *)pRuntimeEnv->keyBuf, GET_RES_WINDOW_KEY_LEN(sizeof(groupIndex))); } - pWindowResInfo->curIndex = -1; - pWindowResInfo->size = 0; + pResultRowInfo->curIndex = -1; + pResultRowInfo->size = 0; - pWindowResInfo->startTime = TSKEY_INITIAL_VAL; - pWindowResInfo->prevSKey = TSKEY_INITIAL_VAL; + pResultRowInfo->startTime = TSKEY_INITIAL_VAL; + pResultRowInfo->prevSKey = TSKEY_INITIAL_VAL; } void clearFirstNWindowRes(SQueryRuntimeEnv *pRuntimeEnv, int32_t num) { - SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo; - if (pWindowResInfo == NULL || pWindowResInfo->capacity == 0 || pWindowResInfo->size == 0 || num == 0) { + SResultRowInfo *pResultRowInfo = &pRuntimeEnv->windowResInfo; + if (pResultRowInfo == NULL || pResultRowInfo->capacity == 0 || pResultRowInfo->size == 0 || num == 0) { return; } - int32_t numOfClosed = numOfClosedTimeWindow(pWindowResInfo); + int32_t numOfClosed = numOfClosedTimeWindow(pResultRowInfo); assert(num >= 0 && num <= numOfClosed); - int16_t type = pWindowResInfo->type; + int16_t type = pResultRowInfo->type; int64_t uid = getResultInfoUId(pRuntimeEnv); char *key = NULL; int16_t bytes = -1; for (int32_t i = 0; i < num; ++i) { - SResultRow *pResult = pWindowResInfo->pResult[i]; + SResultRow *pResult = pResultRowInfo->pResult[i]; if (pResult->closed) { // remove the window slot from hash table getResultRowKeyInfo(pResult, type, &key, &bytes); SET_RES_WINDOW_KEY(pRuntimeEnv->keyBuf, key, bytes, uid); @@ -129,23 +126,23 @@ void clearFirstNWindowRes(SQueryRuntimeEnv *pRuntimeEnv, int32_t num) { } } - int32_t remain = pWindowResInfo->size - num; + int32_t remain = pResultRowInfo->size - num; // clear all the closed windows from the window list for (int32_t k = 0; k < remain; ++k) { - copyResultRow(pRuntimeEnv, pWindowResInfo->pResult[k], pWindowResInfo->pResult[num + k], type); + copyResultRow(pRuntimeEnv, pResultRowInfo->pResult[k], pResultRowInfo->pResult[num + k], type); } // move the unclosed window in the front of the window list - for (int32_t k = remain; k < pWindowResInfo->size; ++k) { - SResultRow *pWindowRes = pWindowResInfo->pResult[k]; - clearResultRow(pRuntimeEnv, pWindowRes, pWindowResInfo->type); + for (int32_t k = remain; k < pResultRowInfo->size; ++k) { + SResultRow *pWindowRes = pResultRowInfo->pResult[k]; + clearResultRow(pRuntimeEnv, pWindowRes, pResultRowInfo->type); } - pWindowResInfo->size = remain; + pResultRowInfo->size = remain; - for (int32_t k = 0; k < pWindowResInfo->size; ++k) { - SResultRow *pResult = pWindowResInfo->pResult[k]; + for (int32_t k = 0; k < pResultRowInfo->size; ++k) { + SResultRow *pResult = pResultRowInfo->pResult[k]; getResultRowKeyInfo(pResult, type, &key, &bytes); SET_RES_WINDOW_KEY(pRuntimeEnv->keyBuf, key, bytes, uid); @@ -153,43 +150,43 @@ void clearFirstNWindowRes(SQueryRuntimeEnv *pRuntimeEnv, int32_t num) { assert(p != NULL); int32_t v = (*p - num); - assert(v >= 0 && v <= pWindowResInfo->size); + assert(v >= 0 && v <= pResultRowInfo->size); SET_RES_WINDOW_KEY(pRuntimeEnv->keyBuf, key, bytes, uid); taosHashPut(pRuntimeEnv->pResultRowHashTable, pRuntimeEnv->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes), (char *)&v, sizeof(int32_t)); } - pWindowResInfo->curIndex = -1; + pResultRowInfo->curIndex = -1; } void clearClosedTimeWindow(SQueryRuntimeEnv *pRuntimeEnv) { - SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo; - if (pWindowResInfo == NULL || pWindowResInfo->capacity == 0 || pWindowResInfo->size == 0) { + SResultRowInfo *pResultRowInfo = &pRuntimeEnv->windowResInfo; + if (pResultRowInfo == NULL || pResultRowInfo->capacity == 0 || pResultRowInfo->size == 0) { return; } - int32_t numOfClosed = numOfClosedTimeWindow(pWindowResInfo); + int32_t numOfClosed = numOfClosedTimeWindow(pResultRowInfo); clearFirstNWindowRes(pRuntimeEnv, numOfClosed); } -int32_t numOfClosedTimeWindow(SWindowResInfo *pWindowResInfo) { +int32_t numOfClosedTimeWindow(SResultRowInfo *pResultRowInfo) { int32_t i = 0; - while (i < pWindowResInfo->size && pWindowResInfo->pResult[i]->closed) { + while (i < pResultRowInfo->size && pResultRowInfo->pResult[i]->closed) { ++i; } return i; } -void closeAllTimeWindow(SWindowResInfo *pWindowResInfo) { - assert(pWindowResInfo->size >= 0 && pWindowResInfo->capacity >= pWindowResInfo->size); +void closeAllTimeWindow(SResultRowInfo *pResultRowInfo) { + assert(pResultRowInfo->size >= 0 && pResultRowInfo->capacity >= pResultRowInfo->size); - for (int32_t i = 0; i < pWindowResInfo->size; ++i) { - if (pWindowResInfo->pResult[i]->closed) { + for (int32_t i = 0; i < pResultRowInfo->size; ++i) { + if (pResultRowInfo->pResult[i]->closed) { continue; } - pWindowResInfo->pResult[i]->closed = true; + pResultRowInfo->pResult[i]->closed = true; } } @@ -198,41 +195,41 @@ void closeAllTimeWindow(SWindowResInfo *pWindowResInfo) { * the last qualified time stamp in case of sliding query, which the sliding time is not equalled to the interval time. * NOTE: remove redundant, only when the result set order equals to traverse order */ -void removeRedundantWindow(SWindowResInfo *pWindowResInfo, TSKEY lastKey, int32_t order) { - assert(pWindowResInfo->size >= 0 && pWindowResInfo->capacity >= pWindowResInfo->size); - if (pWindowResInfo->size <= 1) { +void removeRedundantWindow(SResultRowInfo *pResultRowInfo, TSKEY lastKey, int32_t order) { + assert(pResultRowInfo->size >= 0 && pResultRowInfo->capacity >= pResultRowInfo->size); + if (pResultRowInfo->size <= 1) { return; } // get the result order - int32_t resultOrder = (pWindowResInfo->pResult[0]->win.skey < pWindowResInfo->pResult[1]->win.skey)? 1:-1; + int32_t resultOrder = (pResultRowInfo->pResult[0]->win.skey < pResultRowInfo->pResult[1]->win.skey)? 1:-1; if (order != resultOrder) { return; } int32_t i = 0; if (order == QUERY_ASC_FORWARD_STEP) { - TSKEY ekey = pWindowResInfo->pResult[i]->win.ekey; - while (i < pWindowResInfo->size && (ekey < lastKey)) { + TSKEY ekey = pResultRowInfo->pResult[i]->win.ekey; + while (i < pResultRowInfo->size && (ekey < lastKey)) { ++i; } } else if (order == QUERY_DESC_FORWARD_STEP) { - while (i < pWindowResInfo->size && (pWindowResInfo->pResult[i]->win.skey > lastKey)) { + while (i < pResultRowInfo->size && (pResultRowInfo->pResult[i]->win.skey > lastKey)) { ++i; } } - if (i < pWindowResInfo->size) { - pWindowResInfo->size = (i + 1); + if (i < pResultRowInfo->size) { + pResultRowInfo->size = (i + 1); } } -bool isWindowResClosed(SWindowResInfo *pWindowResInfo, int32_t slot) { - return (getResultRow(pWindowResInfo, slot)->closed == true); +bool isWindowResClosed(SResultRowInfo *pResultRowInfo, int32_t slot) { + return (getResultRow(pResultRowInfo, slot)->closed == true); } -void closeTimeWindow(SWindowResInfo *pWindowResInfo, int32_t slot) { - getResultRow(pWindowResInfo, slot)->closed = true; +void closeTimeWindow(SResultRowInfo *pResultRowInfo, int32_t slot) { + getResultRow(pResultRowInfo, slot)->closed = true; } void clearResultRow(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pWindowRes, int16_t type) { diff --git a/src/util/src/hash.c b/src/util/src/hash.c index 3ac54eedd8..0e3e0d3e24 100644 --- a/src/util/src/hash.c +++ b/src/util/src/hash.c @@ -151,8 +151,9 @@ static void pushfrontNodeInEntryList(SHashEntry *pEntry, SHashNode *pNode); */ SHashObj *taosHashInit(size_t capacity, _hash_fn_t fn, bool update, SHashLockTypeE type) { - if (capacity == 0 || fn == NULL) { - return NULL; + assert(fn != NULL); + if (capacity == 0) { + capacity = 4; } SHashObj *pHashObj = (SHashObj *)calloc(1, sizeof(SHashObj)); diff --git a/tests/script/general/parser/first_last.sim b/tests/script/general/parser/first_last.sim index a934d3bcab..773f92afcf 100644 --- a/tests/script/general/parser/first_last.sim +++ b/tests/script/general/parser/first_last.sim @@ -46,7 +46,8 @@ while $i < $tbNum endw $i = $i + 1 -endw +endw + $ts = $ts + 60000 $tb = $tbPrefix . 0 sql insert into $tb (ts) values ( $ts ) @@ -84,4 +85,43 @@ sleep 500 run general/parser/first_last_query.sim +print =================> insert data regression test +sql create database test keep 36500 +sql use test +sql create table tm0 (ts timestamp, k int) + +print =========================> td-2298 +$ts0 = 1537146000000 +$xs = 6000 + +$x = 0 +while $x < 5000 + $ts = $ts0 + $xs + $ts1 = $ts + $xs + $x1 = $x + 1 + + sql insert into tm0 values ( $ts , $x ) ( $ts1 , $x1 ) + $x = $x1 + $ts0 = $ts1 +endw + +system sh/exec.sh -n dnode1 -s stop -x SIGINT +sleep 3000 +system sh/exec.sh -n dnode1 -s start +print ================== server restart completed +sql connect +sleep 500 + +sql use test +sql select count(*), last(ts) from tm0 interval(1s) +if $rows != 10000 then + print expect 10000, actual: $rows + return -1 +endi + +sql select last(ts) from tm0 interval(1s) +if $rows != 10000 then + return -1 +endi + system sh/exec.sh -n dnode1 -s stop -x SIGINT \ No newline at end of file diff --git a/tests/script/general/parser/first_last_query.sim b/tests/script/general/parser/first_last_query.sim index a982f10362..52d888b04d 100644 --- a/tests/script/general/parser/first_last_query.sim +++ b/tests/script/general/parser/first_last_query.sim @@ -266,4 +266,6 @@ endi if $data14 != @test2@ then print expect test2 , actual: $data14 return -1 -endi \ No newline at end of file +endi + +sql drop table stest \ No newline at end of file diff --git a/tests/script/general/parser/function.sim b/tests/script/general/parser/function.sim new file mode 100644 index 0000000000..34e9844f71 --- /dev/null +++ b/tests/script/general/parser/function.sim @@ -0,0 +1,228 @@ +system sh/stop_dnodes.sh + +system sh/deploy.sh -n dnode1 -i 1 +system sh/cfg.sh -n dnode1 -c walLevel -v 0 +system sh/cfg.sh -n dnode1 -c tableMetaKeepTimer -v 3 +system sh/exec.sh -n dnode1 -s start +sleep 500 +sql connect + +$dbPrefix = m_func_db +$tbPrefix = m_func_tb +$mtPrefix = m_func_mt + +$tbNum = 10 +$rowNum = 5 +$totalNum = $tbNum * $rowNum +$ts0 = 1537146000000 +$delta = 600000 +print ========== alter.sim +$i = 0 +$db = $dbPrefix . $i +$mt = $mtPrefix . $i + +sql drop database if exists $db +sql create database $db +sql use $db + +print =====================================> test case for twa in single block + +sql create table t1 (ts timestamp, k float); +sql insert into t1 values('2015-08-18 00:00:00', 2.064); +sql insert into t1 values('2015-08-18 00:06:00', 2.116); +sql insert into t1 values('2015-08-18 00:12:00', 2.028); +sql insert into t1 values('2015-08-18 00:18:00', 2.126); +sql insert into t1 values('2015-08-18 00:24:00', 2.041); +sql insert into t1 values('2015-08-18 00:30:00', 2.051); + +sql select twa(k),avg(k),count(1) from t1 where ts>='2015-8-18 00:00:00' and ts<='2015-8-18 00:05:00' +if $rows != 1 then + return -1 +endi + +if $data00 != 2.063999891 then + return -1 +endi + +if $data01 != 2.063999891 then + return -1 +endi + +if $data02 != 1 then + return -1 +endi + +sql select twa(k),avg(k),count(1) from t1 where ts>='2015-8-18 00:00:00' and ts<='2015-8-18 00:07:00' +if $rows != 1 then + return -1 +endi + +if $data00 != 2.089999914 then + return -1 +endi + +if $data01 != 2.089999914 then + return -1 +endi + +if $data02 != 2 then + return -1 +endi + +sql select twa(k),avg(k),count(1) from t1 where ts>='2015-8-18 00:00:00' and ts<='2015-8-18 00:07:00' interval(1m) order by ts asc +if $rows != 2 then + return -1 +endi + +if $data00 != @15-08-18 00:00:00.000@ then + return -1 +endi + +if $data01 != 2.068333156 then + return -1 +endi + +if $data02 != 2.063999891 then + return -1 +endi + +if $data03 != 1 then + return -1 +endi + +if $data10 != @15-08-18 00:06:00.000@ then + return -1 +endi + +if $data11 != 2.115999937 then + return -1 +endi + +if $data12 != 2.115999937 then + return -1 +endi + +if $data13 != 1 then + return -1 +endi + +sql select twa(k),avg(k),count(1) from t1 where ts>='2015-8-18 00:00:00' and ts<='2015-8-18 00:07:00' interval(1m) order by ts desc; +if $rows != 2 then + return -1 +endi + +if $data00 != @15-08-18 00:06:00.00@ then + return -1 +endi + +if $data01 != 2.115999937 then + return -1 +endi + +if $data02 != 2.115999937 then + return -1 +endi + +if $data03 != 1 then + return -1 +endi + +if $data11 != 2.068333156 then + return -1 +endi + +sql select twa(k),avg(k),count(1) from t1 where ts>='2015-8-18 00:00:00' and ts<='2015-8-18 00:27:00' interval(10m) order by ts asc +if $rows != 3 then + return -1 +endi + +if $data01 != 2.088666666 then + return -1 +endi + +if $data02 != 2.089999914 then + return -1 +endi + +if $data03 != 2 then + return -1 +endi + +if $data11 != 2.077099980 then + return -1 +endi + +if $data12 != 2.077000022 then + return -1 +endi + +if $data13 != 2 then + return -1 +endi + +if $data21 != 2.069333235 then + return -1 +endi + +if $data22 != 2.040999889 then + return -1 +endi + +if $data23 != 1 then + return -1 +endi + +sql select twa(k),avg(k),count(1) from t1 where ts>='2015-8-18 00:00:00' and ts<='2015-8-18 00:27:00' interval(10m) order by ts desc +if $rows != 3 then + return -1 +endi + +if $data01 != 2.069333235 then + return -1 +endi + +if $data11 != 2.077099980 then + return -1 +endi + +if $data21 != 2.088666666 then + return -1 +endi + +sql select twa(k),avg(k),count(1) from t1 where ts>='2015-8-18 00:00:00' and ts<='2015-8-18 00:30:00' order by ts asc +if $data00 != 2.073699975 then + return -1 +endi + +if $data01 != 2.070999980 then + return -1 +endi + +if $data02 != 6 then + return -1 +endi + +sql select twa(k),avg(k),count(1) from t1 where ts>='2015-8-18 00:00:00' and ts<='2015-8-18 00:30:00' order by ts desc +if $rows != 1 then + return -1 +endi + +if $data00 != 2.073699975 then + return -1 +endi + +if $data01 != 2.070999980 then + return -1 +endi + +if $data02 != 6 then + return -1 +endi + +sql select twa(k),avg(k),count(1) from t1 where ts>='2015-8-18 00:00:00' and ts<='2015-8-18 00:30:00' interval(10m) order by ts asc +sql select twa(k),avg(k),count(1) from t1 where ts>='2015-8-18 00:00:00' and ts<='2015-8-18 00:30:00' interval(10m) order by ts desc + + +#todo add test case while column filte exists. + +select count(*),TWA(k) from tm0 where ts>='1970-1-1 13:43:00' and ts<='1970-1-1 13:44:10' interval(9s)