From dd3174df376c1b3c25da7c5eb437bfc48eb00d0b Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 1 Mar 2021 17:29:55 +0800 Subject: [PATCH] [td-2895] refactor. --- src/client/src/tscServer.c | 2 - src/client/src/tscSubquery.c | 1 + src/inc/tsdb.h | 4 +- src/query/inc/qExecutor.h | 12 +- src/query/src/qAggMain.c | 15 +- src/query/src/qExecutor.c | 634 ++++++++++++++++++++++++++--------- src/query/src/queryMain.c | 5 +- src/tsdb/src/tsdbRead.c | 62 +++- 8 files changed, 550 insertions(+), 185 deletions(-) diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 1188925600..d8e5fc88b5 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -494,8 +494,6 @@ int tscProcessSql(SSqlObj *pSql) { return pSql->res.code; } } else if (pCmd->command >= TSDB_SQL_LOCAL) { - //pSql->epSet = tscMgmtEpSet; -// } else { // local handler return (*tscProcessMsgRsp[pCmd->command])(pSql); } diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index d9cfd7d8b3..5df2dc4aba 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -497,6 +497,7 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) { SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, 0); int16_t funcId = pExpr->functionId; + // add the invisible timestamp column if ((pExpr->colInfo.colId != PRIMARYKEY_TIMESTAMP_COL_INDEX) || (funcId != TSDB_FUNC_TS && funcId != TSDB_FUNC_TS_DUMMY && funcId != TSDB_FUNC_PRJ)) { diff --git a/src/inc/tsdb.h b/src/inc/tsdb.h index 7855ff6a79..bb27cc7390 100644 --- a/src/inc/tsdb.h +++ b/src/inc/tsdb.h @@ -307,7 +307,7 @@ int32_t tsdbQuerySTableByTagCond(STsdbRepo *tsdb, uint64_t uid, TSKEY key, const SColIndex *pColIndex, int32_t numOfCols); /** - * destory the created table group list, which is generated by tag query + * destroy the created table group list, which is generated by tag query * @param pGroupList */ void tsdbDestroyTableGroup(STableGroupInfo *pGroupList); @@ -339,6 +339,8 @@ void tsdbCleanupQueryHandle(TsdbQueryHandleT queryHandle); void tsdbResetQueryHandle(TsdbQueryHandleT queryHandle, STsdbQueryCond *pCond); +void tsdbResetQueryHandleForNewTable(TsdbQueryHandleT queryHandle, STsdbQueryCond *pCond, STableGroupInfo* groupList); + /** * get the statistics of repo usage * @param repo. point to the tsdbrepo diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index e897617cd5..6c3e175856 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -241,12 +241,16 @@ typedef void (*__optr_cleanup_fn_t)(void* param, int32_t num); struct SOperatorInfo; +typedef struct { + FILE* file; // file struct pointer +} SFileResultInfo; + typedef struct SQueryRuntimeEnv { jmp_buf env; SQuery* pQuery; void* qinfo; uint16_t scanFlag; // denotes reversed scan of data or not - SFillInfo* pFillInfo; + SFillInfo* pFillInfo; // todo move to operatorInfo void* pQueryHandle; int32_t prevGroupId; // previous executed group id @@ -264,10 +268,10 @@ typedef struct SQueryRuntimeEnv { SArithmeticSupport *sasArray; SSDataBlock *outputBuf; - int32_t groupIndex; - int32_t tableIndex; + int32_t tableIndex; //TODO remove it STableGroupInfo tableqinfoGroupInfo; // this is a group array list, including SArray structure struct SOperatorInfo *proot; + struct SOperatorInfo *pTableScanner; // table scan operator SGroupResInfo groupResInfo; int64_t currentOffset; // dynamic offset value @@ -364,6 +368,8 @@ typedef struct STableScanInfo { int32_t numOfOutput; int64_t elapsedTime; + + int32_t tableIndex; } STableScanInfo; typedef struct STagScanInfo { diff --git a/src/query/src/qAggMain.c b/src/query/src/qAggMain.c index ef6da984c6..44108e895c 100644 --- a/src/query/src/qAggMain.c +++ b/src/query/src/qAggMain.c @@ -208,7 +208,7 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI if (functionId == TSDB_FUNC_TS_COMP) { *type = TSDB_DATA_TYPE_BINARY; - *bytes = sizeof(int32_t); // this results is compressed ts data + *bytes = 1; // this results is compressed ts data, only one byte *interBytes = POINTER_BYTES; return TSDB_CODE_SUCCESS; } @@ -4203,11 +4203,22 @@ static void ts_comp_finalize(SQLFunctionCtx *pCtx) { STSBuf * pTSbuf = pInfo->pTSBuf; tsBufFlush(pTSbuf); - + qDebug("total timestamp :%"PRId64, pTSbuf->numOfTotal); + + // TODO refactor transfer ownership of current file *(FILE **)pCtx->pOutput = pTSbuf->f; + pResInfo->complete = true; + + // get the file size + struct stat fStat; + if ((fstat(fileno(pTSbuf->f), &fStat) == 0)) { + pResInfo->numOfRes = fStat.st_size; + } + pTSbuf->remainOpen = true; tsBufDestroy(pTSbuf); + doFinalizer(pCtx); } diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 4ed79baf8b..4abbb111d8 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -177,6 +177,8 @@ static STableIdInfo createTableIdInfo(SQuery* pQuery); static SOperatorInfo* createBiDirectionTableScanInfo(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, int32_t repeatTime, int32_t reverseTime); static SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, int32_t repeatTime); +static SOperatorInfo* createSeqTableBlockScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv); + static void setTableScanFilterOperatorInfo(STableScanInfo* pTableScanInfo, SOperatorInfo* pDownstream); static int32_t getNumOfScanTimes(SQuery* pQuery); @@ -1178,8 +1180,6 @@ static void doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SQLFunction SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv; for (int32_t k = 0; k < pOperator->numOfOutput; ++k) { -// setBlockStatisInfo(&pCtx[k], pSDataBlock, &pOperator->pExpr[k].base.colInfo); - int32_t functionId = pCtx[k].functionId; if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) { pCtx[k].startTs = startTs;// this can be set during create the struct @@ -1197,6 +1197,172 @@ static void arithmeticApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionC } } +void doRowwiseTimeWindowInterpolation(SOperatorInfo* pOperator, SOptrBasicInfo* pInfo, + SArray *pDataBlock, TSKEY prevTs, int32_t prevRowIndex, TSKEY curTs, int32_t curRowIndex, TSKEY windowKey, + int32_t type) { + SQueryRuntimeEnv *pRuntimeEnv = pOperator->pRuntimeEnv; + SExprInfo* pExpr = pOperator->pExpr; + + SQLFunctionCtx* pCtx = pInfo->pCtx; + + for (int32_t k = 0; k < pOperator->numOfOutput; ++k) { + int32_t functionId = pCtx[k].functionId; + if (functionId != TSDB_FUNC_TWA && functionId != TSDB_FUNC_INTERP) { + pCtx[k].start.key = INT64_MIN; + continue; + } + + SColIndex * pColIndex = &pExpr[k].base.colInfo; + int16_t index = pColIndex->colIndex; + SColumnInfoData *pColInfo = taosArrayGet(pDataBlock, index); + + assert(pColInfo->info.colId == pColIndex->colId && curTs != windowKey); + double v1 = 0, v2 = 0, v = 0; + + if (prevRowIndex == -1) { + GET_TYPED_DATA(v1, double, pColInfo->info.type, (char *)pRuntimeEnv->prevRow[index]); + } else { + GET_TYPED_DATA(v1, double, pColInfo->info.type, (char *)pColInfo->pData + prevRowIndex * pColInfo->info.bytes); + } + + GET_TYPED_DATA(v2, double, pColInfo->info.type, (char *)pColInfo->pData + curRowIndex * pColInfo->info.bytes); + + SPoint point1 = (SPoint){.key = prevTs, .val = &v1}; + SPoint point2 = (SPoint){.key = curTs, .val = &v2}; + SPoint point = (SPoint){.key = windowKey, .val = &v}; + + if (functionId == TSDB_FUNC_TWA) { + taosGetLinearInterpolationVal(&point, TSDB_DATA_TYPE_DOUBLE, &point1, &point2, TSDB_DATA_TYPE_DOUBLE); + + if (type == RESULT_ROW_START_INTERP) { + pCtx[k].start.key = point.key; + pCtx[k].start.val = v; + } else { + pCtx[k].end.key = point.key; + pCtx[k].end.val = v; + } + } else { + if (type == RESULT_ROW_START_INTERP) { + pCtx[k].start.key = prevTs; + pCtx[k].start.val = v1; + + pCtx[k].end.key = curTs; + pCtx[k].end.val = v2; + } + } + } +} + +static bool setTimeWindowInterpolationStartTs(SOperatorInfo* pOperatorInfo, SQLFunctionCtx* pCtx, + int32_t pos, int32_t numOfRows, + SArray* pDataBlock, TSKEY* tsCols, STimeWindow* win, int16_t type) { + SQueryRuntimeEnv* pRuntimeEnv = pOperatorInfo->pRuntimeEnv; + SQuery* pQuery = pRuntimeEnv->pQuery; + + TSKEY curTs = tsCols[pos]; + TSKEY lastTs = *(TSKEY *) pRuntimeEnv->prevRow[0]; + + // lastTs == INT64_MIN and pos == 0 means this is the first time window, interpolation is not needed. + // start exactly from this point, no need to do interpolation + TSKEY key = QUERY_IS_ASC_QUERY(pQuery)? win->skey:win->ekey; + if (key == curTs) { + setNotInterpoWindowKey(pCtx, pOperatorInfo->numOfOutput, RESULT_ROW_START_INTERP); + return true; + } + + if (lastTs == INT64_MIN && ((pos == 0 && QUERY_IS_ASC_QUERY(pQuery)) || (pos == (numOfRows - 1) && !QUERY_IS_ASC_QUERY(pQuery)))) { + setNotInterpoWindowKey(pCtx, pOperatorInfo->numOfOutput, RESULT_ROW_START_INTERP); + return true; + } + + int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); + TSKEY prevTs = ((pos == 0 && QUERY_IS_ASC_QUERY(pQuery)) || (pos == (numOfRows - 1) && !QUERY_IS_ASC_QUERY(pQuery)))? + lastTs:tsCols[pos - step]; + + doRowwiseTimeWindowInterpolation(pOperatorInfo, pOperatorInfo->info, pDataBlock, prevTs, pos - step, curTs, pos, key, RESULT_ROW_START_INTERP); + return true; +} + +static bool setTimeWindowInterpolationEndTs(SOperatorInfo* pOperatorInfo, SQLFunctionCtx* pCtx, + int32_t endRowIndex, SArray* pDataBlock, TSKEY* tsCols, TSKEY blockEkey, STimeWindow* win) { + SQueryRuntimeEnv *pRuntimeEnv = pOperatorInfo->pRuntimeEnv; + SQuery* pQuery = pRuntimeEnv->pQuery; + int32_t numOfOutput = pOperatorInfo->numOfOutput; + + TSKEY actualEndKey = tsCols[endRowIndex]; + + TSKEY key = QUERY_IS_ASC_QUERY(pQuery)? win->ekey:win->skey; + + // not ended in current data block, do not invoke interpolation + if ((key > blockEkey && QUERY_IS_ASC_QUERY(pQuery)) || (key < blockEkey && !QUERY_IS_ASC_QUERY(pQuery))) { + setNotInterpoWindowKey(pCtx, numOfOutput, RESULT_ROW_END_INTERP); + return false; + } + + // there is actual end point of current time window, no interpolation need + if (key == actualEndKey) { + setNotInterpoWindowKey(pCtx, numOfOutput, RESULT_ROW_END_INTERP); + return true; + } + + int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); + int32_t nextRowIndex = endRowIndex + step; + assert(nextRowIndex >= 0); + + TSKEY nextKey = tsCols[nextRowIndex]; + doRowwiseTimeWindowInterpolation(pOperatorInfo, pOperatorInfo->info, pDataBlock, actualEndKey, endRowIndex, nextKey, nextRowIndex, key, RESULT_ROW_END_INTERP); + return true; +} + +static void doWindowBorderInterpolation(SOperatorInfo* pOperatorInfo, SSDataBlock* pBlock, SQLFunctionCtx* pCtx, + SResultRow* pResult, STimeWindow* win, int32_t startPos, int32_t forwardStep) { + SQueryRuntimeEnv* pRuntimeEnv = pOperatorInfo->pRuntimeEnv; + SQuery* pQuery = pRuntimeEnv->pQuery; + if (!pQuery->timeWindowInterpo) { + return; + } + + assert(pBlock != NULL); + + int32_t fillType = pQuery->fillType; + + int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); + + SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, 0); + + TSKEY *tsCols = (TSKEY *)(pColInfo->pData); + bool done = resultRowInterpolated(pResult, RESULT_ROW_START_INTERP); + if (!done) { // it is not interpolated, now start to generated the interpolated value + int32_t startRowIndex = startPos; + bool interp = setTimeWindowInterpolationStartTs(pOperatorInfo, pCtx, startRowIndex, pBlock->info.rows, pBlock->pDataBlock, + tsCols, win, fillType); + if (interp) { + setResultRowInterpo(pResult, RESULT_ROW_START_INTERP); + } + } else { + setNotInterpoWindowKey(pCtx, pQuery->numOfOutput, RESULT_ROW_START_INTERP); + } + + // point interpolation does not require the end key time window interpolation. + if (isPointInterpoQuery(pQuery)) { + return; + } + + // interpolation query does not generate the time window end interpolation + done = resultRowInterpolated(pResult, RESULT_ROW_END_INTERP); + if (!done) { + int32_t endRowIndex = startPos + (forwardStep - 1) * step; + + TSKEY endKey = QUERY_IS_ASC_QUERY(pQuery)? pBlock->info.window.ekey:pBlock->info.window.skey; + bool interp = setTimeWindowInterpolationEndTs(pOperatorInfo, pCtx, endRowIndex, pBlock->pDataBlock, tsCols, endKey, win); + if (interp) { + setResultRowInterpo(pResult, RESULT_ROW_END_INTERP); + } + } else { + setNotInterpoWindowKey(pCtx, pQuery->numOfOutput, RESULT_ROW_END_INTERP); + } +} + static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, STableIntervalOperatorInfo* pInfo, SSDataBlock* pSDataBlock, int32_t groupId) { SQueryRuntimeEnv* pRuntimeEnv = pOperatorInfo->pRuntimeEnv; @@ -1248,9 +1414,9 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul numOfOutput, pInfo->rowCellInfoOffset); assert(ret == TSDB_CODE_SUCCESS && !resultRowInterpolated(pResult, RESULT_ROW_END_INTERP)); - // int32_t p = QUERY_IS_ASC_QUERY(pQuery) ? 0 : pSDataBlock->info.rows - 1; - // doRowwiseTimeWindowInterpolation(pRuntimeEnv, pSDataBlock->pDataBlock, *(TSKEY *)pRuntimeEnv->prevRow[0], - // -1, tsCols[0], p, w.ekey, RESULT_ROW_END_INTERP); + doRowwiseTimeWindowInterpolation(pOperatorInfo, pInfo, pSDataBlock->pDataBlock, *(TSKEY *)pRuntimeEnv->prevRow[0], + -1, tsCols[startPos], startPos, w.ekey, RESULT_ROW_END_INTERP); + setResultRowInterpo(pResult, RESULT_ROW_END_INTERP); setNotInterpoWindowKey(pInfo->pCtx, pQuery->numOfOutput, RESULT_ROW_START_INTERP); @@ -1265,8 +1431,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul } // window start key interpolation - // doWindowBorderInterpolation(pRuntimeEnv, &pSDataBlock->info, pSDataBlock->pDataBlock, pResult, &win, pQuery->pos, - // forwardStep); + doWindowBorderInterpolation(pOperatorInfo, pSDataBlock, pInfo->pCtx, pResult, &win, startPos, forwardStep); doBlockwiseApplyFunctions_rv(pRuntimeEnv, pInfo->pCtx, &win, startPos, forwardStep, tsCols, pSDataBlock->info.rows, numOfOutput); @@ -1290,8 +1455,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul getNumOfRowsInTimeWindow(pQuery, &pSDataBlock->info, tsCols, startPos, ekey, binarySearchForKey, true); // window start(end) key interpolation - // doWindowBorderInterpolation(pRuntimeEnv, &pSDataBlock->info, pSDataBlock->pDataBlock, pResult, &nextWin, - // startPos, forwardStep); + doWindowBorderInterpolation(pOperatorInfo, pSDataBlock, pInfo->pCtx, pResult, &nextWin, startPos, forwardStep); doBlockwiseApplyFunctions_rv(pRuntimeEnv, pInfo->pCtx, &nextWin, startPos, forwardStep, tsCols, pSDataBlock->info.rows, numOfOutput); } @@ -2133,7 +2297,7 @@ static void* destroySQLFunctionCtx(SQLFunctionCtx* pCtx, int32_t numOfOutput) { static void calResultBufSize(SQuery* pQuery, SRspResultInfo* pResultInfo); -static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOfTables, int16_t order, int32_t vgId) { +static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOfTables) { qDebug("QInfo:%p setup runtime env", GET_QINFO_ADDR(pRuntimeEnv)); SQuery *pQuery = pRuntimeEnv->pQuery; @@ -2159,13 +2323,15 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf goto _clean; } - char* start = POINTER_BYTES * pQuery->numOfCols + (char*) pRuntimeEnv->prevRow; - pRuntimeEnv->prevRow[0] = start; - for(int32_t i = 1; i < pQuery->numOfCols; ++i) { - pRuntimeEnv->prevRow[i] = pRuntimeEnv->prevRow[i - 1] + pQuery->colList[i-1].bytes; - } + if (pQuery->numOfCols) { + char* start = POINTER_BYTES * pQuery->numOfCols + (char*) pRuntimeEnv->prevRow; + pRuntimeEnv->prevRow[0] = start; + for(int32_t i = 1; i < pQuery->numOfCols; ++i) { + pRuntimeEnv->prevRow[i] = pRuntimeEnv->prevRow[i - 1] + pQuery->colList[i-1].bytes; + } - *(int64_t*) pRuntimeEnv->prevRow[0] = INT64_MIN; + *(int64_t*) pRuntimeEnv->prevRow[0] = INT64_MIN; + } qDebug("QInfo:%p init runtime completed", GET_QINFO_ADDR(pRuntimeEnv)); @@ -2173,11 +2339,11 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf // interval (down sampling operation) if (QUERY_IS_INTERVAL_QUERY(pQuery)) { if (pQuery->stableQuery) { - pRuntimeEnv->proot = createStableIntervalOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQuery->pExpr1, pQuery->numOfOutput); - setTableScanFilterOperatorInfo(pRuntimeEnv->proot->info, pRuntimeEnv->proot); + pRuntimeEnv->proot = createStableIntervalOperatorInfo(pRuntimeEnv, pRuntimeEnv->pTableScanner, pQuery->pExpr1, pQuery->numOfOutput); + setTableScanFilterOperatorInfo(pRuntimeEnv->pTableScanner->info, pRuntimeEnv->proot); } else { - pRuntimeEnv->proot = createIntervalAggOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQuery->pExpr1, pQuery->numOfOutput); - setTableScanFilterOperatorInfo(pRuntimeEnv->proot->info, pRuntimeEnv->proot); + pRuntimeEnv->proot = createIntervalAggOperatorInfo(pRuntimeEnv, pRuntimeEnv->pTableScanner, pQuery->pExpr1, pQuery->numOfOutput); + setTableScanFilterOperatorInfo(pRuntimeEnv->pTableScanner->info, pRuntimeEnv->proot); if (pQuery->pExpr2 != NULL) { pRuntimeEnv->proot = createArithOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQuery->pExpr2, pQuery->numOfExpr2); @@ -2190,29 +2356,31 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf } } else if (pQuery->groupbyColumn) { - pRuntimeEnv->proot = createHashGroupbyOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQuery->pExpr1, pQuery->numOfOutput); - setTableScanFilterOperatorInfo(pRuntimeEnv->proot->info, pRuntimeEnv->proot); + pRuntimeEnv->proot = createHashGroupbyOperatorInfo(pRuntimeEnv, pRuntimeEnv->pTableScanner, pQuery->pExpr1, pQuery->numOfOutput); + setTableScanFilterOperatorInfo(pRuntimeEnv->pTableScanner->info, pRuntimeEnv->proot); if (pQuery->pExpr2 != NULL) { pRuntimeEnv->proot = createArithOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQuery->pExpr2, pQuery->numOfExpr2); } } else if (isFixedOutputQuery(pQuery)) { - if (!pQuery->stableQuery) { - pRuntimeEnv->proot = createAggregateOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQuery->pExpr1, pQuery->numOfOutput); + if (pQuery->stableQuery && !isTsCompQuery(pQuery)) { + pRuntimeEnv->proot = createStableAggOperatorInfo(pRuntimeEnv, pRuntimeEnv->pTableScanner, pQuery->pExpr1, pQuery->numOfOutput); } else { - pRuntimeEnv->proot = createStableAggOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQuery->pExpr1, pQuery->numOfOutput); + pRuntimeEnv->proot = createAggregateOperatorInfo(pRuntimeEnv, pRuntimeEnv->pTableScanner, pQuery->pExpr1, pQuery->numOfOutput); } - setTableScanFilterOperatorInfo(pRuntimeEnv->proot->info, pRuntimeEnv->proot); + setTableScanFilterOperatorInfo(pRuntimeEnv->pTableScanner->info, pRuntimeEnv->proot); if (pQuery->pExpr2 != NULL) { pRuntimeEnv->proot = createArithOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQuery->pExpr2, pQuery->numOfExpr2); } } else { // diff/add/multiply/subtract/division assert(pQuery->checkResultBuf == 1); - if (!onlyQueryTags(pQuery)) { - pRuntimeEnv->proot = createArithOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQuery->pExpr1, pQuery->numOfOutput); - setTableScanFilterOperatorInfo(pRuntimeEnv->proot->info, pRuntimeEnv->proot); +// if (isTsCompQuery(pQuery)) { +// pRuntimeEnv->proot = createSeqTableBlockScanOperator(pRuntimeEnv->pTableScanner, pRuntimeEnv); + /*} else*/ if (!onlyQueryTags(pQuery)) { + pRuntimeEnv->proot = createArithOperatorInfo(pRuntimeEnv, pRuntimeEnv->pTableScanner, pQuery->pExpr1, pQuery->numOfOutput); + setTableScanFilterOperatorInfo(pRuntimeEnv->pTableScanner->info, pRuntimeEnv->proot); } } @@ -2249,23 +2417,12 @@ static void doFreeQueryHandle(SQInfo* pQInfo) { assert(pMemRef->ref == 0 && pMemRef->imem == NULL && pMemRef->mem == NULL); } - - static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) { SQuery *pQuery = pRuntimeEnv->pQuery; SQInfo* pQInfo = (SQInfo*) GET_QINFO_ADDR(pRuntimeEnv); qDebug("QInfo:%p teardown runtime env", pQInfo); -// if (isTsCompQuery(pQuery)) { -// FILE *f = *(FILE **)pQuery->sdata[0]->data; -// -// if (f) { -// fclose(f); -// *(FILE **)pQuery->sdata[0]->data = NULL; -// } -// } - if (pRuntimeEnv->sasArray != NULL) { for(int32_t i = 0; i < pQuery->numOfOutput; ++i) { tfree(pRuntimeEnv->sasArray[i].data); @@ -2327,7 +2484,7 @@ static bool isFixedOutputQuery(SQuery* pQuery) { } // Note:top/bottom query is fixed output query - if (pQuery->topBotQuery || pQuery->groupbyColumn) { + if (pQuery->topBotQuery || pQuery->groupbyColumn || isTsCompQuery(pQuery)) { return true; } @@ -2759,55 +2916,104 @@ static bool overlapWithTimeWindow(SQuery* pQuery, SDataBlockInfo* pBlockInfo) { return false; } +static int32_t doTSJoinFilter(SQueryRuntimeEnv *pRuntimeEnv, TSKEY key, bool ascQuery) { + STSElem elem = tsBufGetElem(pRuntimeEnv->pTsBuf); -void filterDataBlock_rv(SSingleColumnFilterInfo *pFilterInfo, int32_t numOfFilterCols, SSDataBlock* pBlock) { +#if defined(_DEBUG_VIEW) + printf("elem in comp ts file:%" PRId64 ", key:%" PRId64 ", tag:%"PRIu64", query order:%d, ts order:%d, traverse:%d, index:%d\n", + elem.ts, key, elem.tag.i64, pQuery->order.order, pRuntimeEnv->pTsBuf->tsOrder, + pRuntimeEnv->pTsBuf->cur.order, pRuntimeEnv->pTsBuf->cur.tsIndex); +#endif + + if (ascQuery) { + if (key < elem.ts) { + return TS_JOIN_TS_NOT_EQUALS; + } else if (key > elem.ts) { + longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_INCONSISTAN); + } + } else { + if (key > elem.ts) { + return TS_JOIN_TS_NOT_EQUALS; + } else if (key < elem.ts) { + longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_INCONSISTAN); + } + } + + return TS_JOIN_TS_EQUAL; +} + +void filterDataBlock_rv(SQueryRuntimeEnv* pRuntimeEnv, SSingleColumnFilterInfo *pFilterInfo, + int32_t numOfFilterCols, SSDataBlock* pBlock, STSBuf* pTsBuf, bool ascQuery) { int32_t numOfRows = pBlock->info.rows; int8_t *p = calloc(numOfRows, sizeof(int8_t)); bool all = true; - for (int32_t i = 0; i < numOfRows; ++i) { - bool qualified = false; + if (pTsBuf != NULL) { + SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, 0); - for (int32_t k = 0; k < numOfFilterCols; ++k) { - char *pElem = (char *)pFilterInfo[k].pData + pFilterInfo[k].info.bytes * i; + TSKEY* k = (TSKEY*) pColInfoData->pData; + for (int32_t i = 0; i < numOfRows; ++i) { + int32_t offset = ascQuery? i:(numOfRows - i - 1); + int32_t ret = doTSJoinFilter(pRuntimeEnv, k[offset], ascQuery); + if (ret == TS_JOIN_TAG_NOT_EQUALS) { + break; + } else if (ret == TS_JOIN_TS_NOT_EQUALS) { + all = false; + continue; + } else { + assert(ret == TS_JOIN_TS_EQUAL); + p[offset] = true; + } - qualified = false; - for (int32_t j = 0; j < pFilterInfo[k].numOfFilters; ++j) { - SColumnFilterElem *pFilterElem = &pFilterInfo[k].pFilters[j]; + if (!tsBufNextPos(pRuntimeEnv->pTsBuf)) { + break; + } + } + } else { + for (int32_t i = 0; i < numOfRows; ++i) { + bool qualified = false; - bool isnull = isNull(pElem, pFilterInfo[k].info.type); - if (isnull) { - if (pFilterElem->fp == isNullOperator) { - qualified = true; - break; + for (int32_t k = 0; k < numOfFilterCols; ++k) { + char* pElem = (char*)pFilterInfo[k].pData + pFilterInfo[k].info.bytes * i; + + qualified = false; + for (int32_t j = 0; j < pFilterInfo[k].numOfFilters; ++j) { + SColumnFilterElem* pFilterElem = &pFilterInfo[k].pFilters[j]; + + bool isnull = isNull(pElem, pFilterInfo[k].info.type); + if (isnull) { + if (pFilterElem->fp == isNullOperator) { + qualified = true; + break; + } else { + continue; + } } else { - continue; + if (pFilterElem->fp == notNullOperator) { + qualified = true; + break; + } else if (pFilterElem->fp == isNullOperator) { + continue; + } } - } else { - if (pFilterElem->fp == notNullOperator) { + + if (pFilterElem->fp(pFilterElem, pElem, pElem, pFilterInfo[k].info.type)) { qualified = true; break; - } else if (pFilterElem->fp == isNullOperator) { - continue; } } - if (pFilterElem->fp(pFilterElem, pElem, pElem, pFilterInfo[k].info.type)) { - qualified = true; + if (!qualified) { break; } } + p[i] = qualified ? 1 : 0; if (!qualified) { - break; + all = false; } } - - p[i] = qualified ? 1 : 0; - if (!qualified) { - all = false; - } } if (!all) { @@ -2861,20 +3067,43 @@ void filterDataBlock_rv(SSingleColumnFilterInfo *pFilterInfo, int32_t numOfFilte tfree(p); } -int32_t loadDataBlockOnDemand_rv(SQueryRuntimeEnv *pRuntimeEnv, STableScanInfo* pTableScanInfo, SSDataBlock *pBlock, uint32_t *status) { +static SColumnInfo* doGetTagColumnInfoById(SColumnInfo* pTagColList, int32_t numOfTags, int16_t colId); +static void doSetTagValueInParam(void* pTable, int32_t tagColId, tVariant *tag, int16_t type, int16_t bytes); + +//TODO refactor +int32_t loadDataBlockOnDemand_rv(SQueryRuntimeEnv *pRuntimeEnv, STableScanInfo* pTableScanInfo, SSDataBlock *pBlock, + uint32_t *status) { *status = BLK_DATA_NO_NEEDED; - pBlock->pDataBlock = NULL; + pBlock->pDataBlock = NULL; pBlock->pBlockStatis = NULL; - SQuery *pQuery = pRuntimeEnv->pQuery; + SQuery* pQuery = pRuntimeEnv->pQuery; int64_t groupId = pQuery->current->groupIndex; - SQInfo* pQInfo = GET_QINFO_ADDR(pRuntimeEnv); + SQInfo* pQInfo = GET_QINFO_ADDR(pRuntimeEnv); SQueryCostInfo* pCost = &pQInfo->summary; - if (pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTsBuf > 0) { + if (pRuntimeEnv->pTsBuf != NULL && pQuery->stableQuery) { + SExprInfo* pExprInfo = &pTableScanInfo->pExpr[0]; + int16_t tagId = (int16_t)pExprInfo->base.arg->argValue.i64; + SColumnInfo* pColInfo = doGetTagColumnInfoById(pQuery->tagColList, pQuery->numOfTags, tagId); + + // compare tag first + tVariant t = {0}; + doSetTagValueInParam(pQuery->current->pTable, tagId, &t, pColInfo->type, pColInfo->bytes); + + STSElem elem = tsBufGetElem(pRuntimeEnv->pTsBuf); + if (tsBufIsValidElem(&elem) && tVariantCompare(&t, elem.tag) == 0) { + *status = BLK_DATA_ALL_NEEDED; + } else { + (*status) = BLK_DATA_DISCARD; + return TSDB_CODE_SUCCESS; + } + } + + if (pQuery->numOfFilterCols > 0) { *status = BLK_DATA_ALL_NEEDED; - } else { // check if this data block is required to load + } else { // check if this data block is required to load // Calculate all time windows that are overlapping or contain current data block. // If current data block is contained by all possible time window, do not load current data block. if (QUERY_IS_INTERVAL_QUERY(pQuery) && overlapWithTimeWindow(pQuery, &pBlock->info)) { @@ -2887,16 +3116,17 @@ int32_t loadDataBlockOnDemand_rv(SQueryRuntimeEnv *pRuntimeEnv, STableScanInfo* if (QUERY_IS_INTERVAL_QUERY(pQuery)) { SResultRow* pResult = NULL; - bool masterScan = IS_MASTER_SCAN(pRuntimeEnv); - TSKEY k = QUERY_IS_ASC_QUERY(pQuery)? pBlock->info.window.skey:pBlock->info.window.ekey; + bool masterScan = IS_MASTER_SCAN(pRuntimeEnv); + TSKEY k = QUERY_IS_ASC_QUERY(pQuery) ? pBlock->info.window.skey : pBlock->info.window.ekey; STimeWindow win = getActiveTimeWindow(pTableScanInfo->pResultRowInfo, k, pQuery); - if (setWindowOutputBufByKey(pRuntimeEnv, pTableScanInfo->pResultRowInfo, &win, masterScan, &pResult, - groupId, pTableScanInfo->pCtx, pTableScanInfo->numOfOutput, pTableScanInfo->rowCellInfoOffset) != TSDB_CODE_SUCCESS) { + if (setWindowOutputBufByKey(pRuntimeEnv, pTableScanInfo->pResultRowInfo, &win, masterScan, &pResult, groupId, + pTableScanInfo->pCtx, pTableScanInfo->numOfOutput, + pTableScanInfo->rowCellInfoOffset) != TSDB_CODE_SUCCESS) { // todo handle error in set result for timewindow } } - int32_t numOfOutput = pTableScanInfo->numOfOutput; + int32_t numOfOutput = pTableScanInfo->numOfOutput; SQLFunctionCtx* pCtx = pTableScanInfo->pCtx; for (int32_t i = 0; i < numOfOutput; ++i) { @@ -2920,16 +3150,15 @@ int32_t loadDataBlockOnDemand_rv(SQueryRuntimeEnv *pRuntimeEnv, STableScanInfo* SDataBlockInfo* pBlockInfo = &pBlock->info; if ((*status) == BLK_DATA_NO_NEEDED) { - qDebug("QInfo:%p data block discard, brange:%"PRId64 "-%"PRId64", rows:%d", pQInfo, pBlockInfo->window.skey, + qDebug("QInfo:%p data block discard, brange:%" PRId64 "-%" PRId64 ", rows:%d", pQInfo, pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows); pCost->discardBlocks += 1; } else if ((*status) == BLK_DATA_STATIS_NEEDED) { - // this function never returns error? pCost->loadBlockStatis += 1; tsdbRetrieveDataBlockStatisInfo(pTableScanInfo->pQueryHandle, &pBlock->pBlockStatis); - if (pBlock->pBlockStatis == NULL) { // data block statistics does not exist, load data block + if (pBlock->pBlockStatis == NULL) { // data block statistics does not exist, load data block pBlock->pDataBlock = tsdbRetrieveDataBlock(pTableScanInfo->pQueryHandle, NULL); pCost->totalCheckedRows += pBlock->info.rows; } @@ -2941,17 +3170,17 @@ int32_t loadDataBlockOnDemand_rv(SQueryRuntimeEnv *pRuntimeEnv, STableScanInfo* tsdbRetrieveDataBlockStatisInfo(pTableScanInfo->pQueryHandle, &pBlock->pBlockStatis); if (pQuery->topBotQuery && pBlock->pBlockStatis != NULL) { - bool load = false; for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { int32_t functionId = pTableScanInfo->pCtx[i].functionId; if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM) { - load = topbot_datablock_filter(&pTableScanInfo->pCtx[i], (char *)&(pBlock->pBlockStatis[i].min), (char *)&(pBlock->pBlockStatis[i].max)); + load = topbot_datablock_filter(&pTableScanInfo->pCtx[i], (char*)&(pBlock->pBlockStatis[i].min), + (char*)&(pBlock->pBlockStatis[i].max)); if (!load) { // current block has been discard due to filter applied pCost->discardBlocks += 1; - qDebug("QInfo:%p data block discard, brange:%"PRId64 "-%"PRId64", rows:%d", pQInfo, pBlockInfo->window.skey, - pBlockInfo->window.ekey, pBlockInfo->rows); + qDebug("QInfo:%p data block discard, brange:%" PRId64 "-%" PRId64 ", rows:%d", pQInfo, + pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows); (*status) = BLK_DATA_DISCARD; return TSDB_CODE_SUCCESS; } @@ -2962,7 +3191,8 @@ int32_t loadDataBlockOnDemand_rv(SQueryRuntimeEnv *pRuntimeEnv, STableScanInfo* // current block has been discard due to filter applied if (!doFilterOnBlockStatistics(pRuntimeEnv, pBlock->pBlockStatis, pTableScanInfo->pCtx, pBlockInfo->rows)) { pCost->discardBlocks += 1; - qDebug("QInfo:%p data block discard, brange:%"PRId64 "-%"PRId64", rows:%d", pQInfo, pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows); + qDebug("QInfo:%p data block discard, brange:%" PRId64 "-%" PRId64 ", rows:%d", pQInfo, pBlockInfo->window.skey, + pBlockInfo->window.ekey, pBlockInfo->rows); (*status) = BLK_DATA_DISCARD; return TSDB_CODE_SUCCESS; } @@ -2974,22 +3204,29 @@ int32_t loadDataBlockOnDemand_rv(SQueryRuntimeEnv *pRuntimeEnv, STableScanInfo* return terrno; } - if (pQuery->numOfFilterCols > 0) { + if (pQuery->numOfFilterCols > 0 && pQuery->pFilterInfo[0].pData == NULL) { // set the initial static data value filter expression - if (pQuery->pFilterInfo[0].pData == NULL) { - for(int32_t i = 0; i < pQuery->numOfFilterCols; ++i) { - for(int32_t j = 0; j < pBlock->info.numOfCols; ++j) { - SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, j); + for (int32_t i = 0; i < pQuery->numOfFilterCols; ++i) { + for (int32_t j = 0; j < pBlock->info.numOfCols; ++j) { + SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, j); - if (pQuery->pFilterInfo[i].info.colId == pColInfo->info.colId) { - pQuery->pFilterInfo[i].pData = pColInfo->pData; - break; - } + if (pQuery->pFilterInfo[i].info.colId == pColInfo->info.colId) { + pQuery->pFilterInfo[i].pData = pColInfo->pData; + break; } } } + } - filterDataBlock_rv(pQuery->pFilterInfo, pQuery->numOfFilterCols, pBlock); + // todo add tscomp filter + STableId* id = TSDB_TABLEID(pQuery->current->pTable); + if (pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTsBuf != NULL) { + if (id->tid == 2) { + int32_t k = 1; + printf("%d\n", k); + } + filterDataBlock_rv(pRuntimeEnv, pQuery->pFilterInfo, pQuery->numOfFilterCols, pBlock, pRuntimeEnv->pTsBuf, + QUERY_IS_ASC_QUERY(pQuery)); } } @@ -3252,8 +3489,10 @@ void setTagVal_rv(SOperatorInfo* pOperatorInfo, void *pTable, SQLFunctionCtx* pC // set the join tag for first column SSqlFuncMsg* pFuncMsg = &pExprInfo->base; - if ((pFuncMsg->functionId == TSDB_FUNC_TS || pFuncMsg->functionId == TSDB_FUNC_PRJ) && - pRuntimeEnv->pTsBuf != NULL && pFuncMsg->colInfo.colIndex == PRIMARYKEY_TIMESTAMP_COL_INDEX) { + if (pQuery->stableQuery && + (pFuncMsg->functionId == TSDB_FUNC_TS || pFuncMsg->functionId == TSDB_FUNC_PRJ) && + (pRuntimeEnv->pTsBuf != NULL) && + pFuncMsg->colInfo.colIndex == PRIMARYKEY_TIMESTAMP_COL_INDEX) { assert(pFuncMsg->numOfParams == 1); int16_t tagColId = (int16_t)pExprInfo->base.arg->argValue.i64; @@ -5087,18 +5326,52 @@ static void generateBlockDistResult(STableBlockDist *pTableBlockDist) { static void doDestroyTableQueryInfo(STableGroupInfo* pTableqinfoGroupInfo); +static void setTableQueryHandle(SQueryRuntimeEnv* pRuntimeEnv, int32_t tableIndex) { + SQuery* pQuery = pRuntimeEnv->pQuery; + SArray *group = GET_TABLEGROUP(pRuntimeEnv, 0); + + // handle the first table + STableQueryInfo* pCheckInfo = taosArrayGetP(group, tableIndex); + + STsdbQueryCond cond = { + .twindow = {pCheckInfo->lastKey, pCheckInfo->win.ekey}, + .order = pQuery->order.order, + .colList = pQuery->colList, + .numOfCols = pQuery->numOfCols, + .loadExternalRows = false, + }; + + SArray *g1 = taosArrayInit(1, POINTER_BYTES); + SArray *tx = taosArrayInit(1, sizeof(STableKeyInfo)); + + STableKeyInfo info = {.pTable = pCheckInfo->pTable, .lastKey = pCheckInfo->lastKey}; + taosArrayPush(tx, &info); + + taosArrayPush(g1, &tx); + STableGroupInfo gp = {.numOfTables = 1, .pGroupList = g1}; + + if (pRuntimeEnv->pQueryHandle == NULL) { + pRuntimeEnv->pQueryHandle = tsdbQueryTables(pQuery->tsdb, &cond, &gp, pRuntimeEnv->qinfo, &pQuery->memRef); + } else { + tsdbResetQueryHandleForNewTable(pRuntimeEnv->pQueryHandle, &cond, &gp); + } + + taosArrayDestroy(tx); + taosArrayDestroy(g1); +} + static int32_t setupQueryHandle(void* tsdb, SQInfo* pQInfo, bool isSTableQuery) { SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; SQuery *pQuery = pQInfo->runtimeEnv.pQuery; + // TODO set the tags scan handle if (onlyQueryTags(pQuery)) { return TSDB_CODE_SUCCESS; + } else if (isTsCompQuery(pQuery)) { + setTableQueryHandle(pRuntimeEnv, 0); + return terrno; } -// if (isSTableQuery && (!QUERY_IS_INTERVAL_QUERY(pQuery)) && (!isFixedOutputQuery(pQuery))) { -// return TSDB_CODE_SUCCESS; -// } - STsdbQueryCond cond = createTsdbQueryCond(pQuery, &pQuery->window); if (!isSTableQuery @@ -5175,11 +5448,14 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, SArray* prevResult, void *ts SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; SQuery *pQuery = pQInfo->runtimeEnv.pQuery; + pQuery->tsdb = tsdb; pQuery->topBotQuery = isTopBottomQuery(pQuery); pQuery->hasTagResults = hasTagValOutput(pQuery); pQuery->timeWindowInterpo = timeWindowInterpoRequired(pQuery); + pRuntimeEnv->prevResult = prevResult; + pRuntimeEnv->qinfo = pQInfo; setScanLimitationByResultBuffer(pQuery); @@ -5200,11 +5476,14 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, SArray* prevResult, void *ts pRuntimeEnv->cur.vgroupIndex = -1; if (onlyQueryTags(pQuery)) { + pRuntimeEnv->resultInfo.capacity = 4096; pRuntimeEnv->proot = createTagScanOperatorInfo(pRuntimeEnv, pQuery->pExpr1, pQuery->numOfOutput); + } else if (isTsCompQuery(pQuery)) { + pRuntimeEnv->pTableScanner = createSeqTableBlockScanOperator(pRuntimeEnv->pQueryHandle, pRuntimeEnv); } else if (needReverseScan(pQuery)) { - pRuntimeEnv->proot = createBiDirectionTableScanInfo(pRuntimeEnv->pQueryHandle, pRuntimeEnv, getNumOfScanTimes(pQuery), 1); + pRuntimeEnv->pTableScanner = createBiDirectionTableScanInfo(pRuntimeEnv->pQueryHandle, pRuntimeEnv, getNumOfScanTimes(pQuery), 1); } else { - pRuntimeEnv->proot = createTableScanOperator(pRuntimeEnv->pQueryHandle, pRuntimeEnv, getNumOfScanTimes(pQuery)); + pRuntimeEnv->pTableScanner = createTableScanOperator(pRuntimeEnv->pQueryHandle, pRuntimeEnv, getNumOfScanTimes(pQuery)); } if (pTsBuf != NULL) { @@ -5233,7 +5512,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, SArray* prevResult, void *ts // create runtime environment int32_t numOfTables = pQuery->tableGroupInfo.numOfTables; pQInfo->summary.tableInfoSize += (numOfTables * sizeof(STableQueryInfo)); - code = setupQueryRuntimeEnv(pRuntimeEnv, (int32_t) pQuery->tableGroupInfo.numOfTables, pQuery->order.order, pQuery->vgId); + code = setupQueryRuntimeEnv(pRuntimeEnv, (int32_t) pQuery->tableGroupInfo.numOfTables); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -6148,9 +6427,10 @@ static SSDataBlock* doTableScan(void* param) { pTableScanInfo->reverseTimes = 0; pTableScanInfo->order = cond.order; - // todo refactor, extract function - pResultRowInfo->curIndex = pResultRowInfo->size-1; - pResultRowInfo->prevSKey = pResultRowInfo->pResult[pResultRowInfo->size-1]->win.skey; + if (pResultRowInfo->size > 0) { + pResultRowInfo->curIndex = pResultRowInfo->size-1; + pResultRowInfo->prevSKey = pResultRowInfo->pResult[pResultRowInfo->size-1]->win.skey; + } SSDataBlock* p = doTableScanImpl(pTableScanInfo); if (p != NULL) { @@ -6161,6 +6441,30 @@ static SSDataBlock* doTableScan(void* param) { return NULL; } +static SSDataBlock* doSeqTableBlockScan(void* param) { + SOperatorInfo* pOperator = (SOperatorInfo*)param; + + STableScanInfo *pTableScanInfo = pOperator->info; + SQueryRuntimeEnv *pRuntimeEnv = pTableScanInfo->pRuntimeEnv; + + int32_t totalTables = pRuntimeEnv->tableqinfoGroupInfo.numOfTables; + + while (1) { + SSDataBlock* p = doTableScanImpl(pTableScanInfo); + if (p != NULL) { + return p; + } + + // try the next table + if (++pTableScanInfo->tableIndex >= totalTables) { + return NULL; + } + + setTableQueryHandle(pRuntimeEnv, pTableScanInfo->tableIndex); + pTableScanInfo->pQueryHandle = pRuntimeEnv->pQueryHandle; + } +} + SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, int32_t repeatTime) { assert(repeatTime > 0); @@ -6177,9 +6481,31 @@ SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pOperator->name = "SeqScanTableOp"; pOperator->blockingOptr = false; pOperator->status = OP_IN_EXECUTING; - pOperator->info = pInfo; + pOperator->info = pInfo; pOperator->numOfOutput = pRuntimeEnv->pQuery->numOfCols; - pOperator->exec = doTableScan; + pOperator->exec = doTableScan; + + return pOperator; +} + +SOperatorInfo* createSeqTableBlockScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv) { + STableScanInfo* pInfo = calloc(1, sizeof(STableScanInfo)); + + pInfo->pQueryHandle = pTsdbQueryHandle; + pInfo->times = 1; + pInfo->reverseTimes = 0; + pInfo->order = pRuntimeEnv->pQuery->order.order; + + pInfo->current = 0; + pInfo->pRuntimeEnv = pRuntimeEnv; + + SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); + pOperator->name = "TableBlockSeqScan"; + pOperator->blockingOptr = false; + pOperator->status = OP_IN_EXECUTING; + pOperator->info = pInfo; + pOperator->numOfOutput = pRuntimeEnv->pQuery->numOfCols; + pOperator->exec = doSeqTableBlockScan; return pOperator; } @@ -6351,8 +6677,13 @@ static SSDataBlock* doSTableAggregate(void* param) { pOperator->status = OP_RES_TO_RETURN; closeAllResultRows(&pInfo->resultRowInfo); + if (isTsCompQuery(pQuery)) { + finalizeQueryResult_rv(pOperator, pInfo->pCtx, &pInfo->resultRowInfo, pInfo->rowCellInfoOffset); + } + updateWindowResNumOfRes_rv(pRuntimeEnv, pInfo->pCtx, pOperator->numOfOutput, &pInfo->resultRowInfo, pInfo->rowCellInfoOffset); + initGroupResInfo(&pRuntimeEnv->groupResInfo, &pInfo->resultRowInfo, 0); toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pInfo->pRes); @@ -6368,8 +6699,9 @@ static SSDataBlock* doArithmeticOperation(void* param) { SArithOperatorInfo* pArithInfo = pOperator->info; SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv; - - pArithInfo->binfo.pRes->info.rows = 0; + SOptrBasicInfo *pInfo = &pArithInfo->binfo; + + pInfo->pRes->info.rows = 0; while(1) { SSDataBlock* pBlock = pOperator->upstream->exec(pOperator->upstream); @@ -6378,26 +6710,21 @@ static SSDataBlock* doArithmeticOperation(void* param) { break; } - setTagVal_rv(pOperator, pRuntimeEnv->pQuery->current->pTable, pArithInfo->binfo.pCtx, pOperator->numOfOutput); + setTagVal_rv(pOperator, pRuntimeEnv->pQuery->current->pTable, pInfo->pCtx, pOperator->numOfOutput); // the pDataBlock are always the same one, no need to call this again - setInputDataBlock(pOperator, pArithInfo->binfo.pCtx, pBlock, pRuntimeEnv->pQuery->order.order); + setInputDataBlock(pOperator, pInfo->pCtx, pBlock, pRuntimeEnv->pQuery->order.order); updateOutputBuf(pArithInfo, pBlock->info.rows); - arithmeticApplyFunctions(pRuntimeEnv, pArithInfo->binfo.pCtx, pOperator->numOfOutput); + arithmeticApplyFunctions(pRuntimeEnv, pInfo->pCtx, pOperator->numOfOutput); - pArithInfo->binfo.pRes->info.rows = getNumOfResult_rv(pRuntimeEnv, pArithInfo->binfo.pCtx, pOperator->numOfOutput); - if (pArithInfo->binfo.pRes->info.rows >= 4096) { + pInfo->pRes->info.rows = getNumOfResult_rv(pRuntimeEnv, pInfo->pCtx, pOperator->numOfOutput); + if (pInfo->pRes->info.rows >= 4096) { break; } } - setNumOfRes(pArithInfo->binfo.pCtx, pOperator->numOfOutput); - - if (pArithInfo->binfo.pRes->info.rows > 0) { - return pArithInfo->binfo.pRes; - } else { - return NULL; - } + setNumOfRes(pInfo->pCtx, pOperator->numOfOutput); + return (pInfo->pRes->info.rows > 0)? pInfo->pRes:NULL; } static SSDataBlock* doLimit(void* param) { @@ -6467,7 +6794,7 @@ static SSDataBlock* doOffset(void* param) { } } -static SSDataBlock* doHashIntervalAgg(void* param) { +static SSDataBlock* doIntervalAgg(void* param) { SOperatorInfo* pOperator = (SOperatorInfo*) param; if (pOperator->status == OP_EXEC_DONE) { return NULL; @@ -6697,7 +7024,10 @@ static void destroyOperatorInfo(SOperatorInfo* pOperator) { static SOperatorInfo* createAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { SAggOperatorInfo* pInfo = calloc(1, sizeof(SAggOperatorInfo)); - pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, 1); + SQuery* pQuery = pRuntimeEnv->pQuery; + int32_t numOfRows = (int32_t)(GET_ROW_PARAM_FOR_MULTIOUTPUT(pQuery, pQuery->topBotQuery, pQuery->stableQuery)); + + pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, numOfRows); pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset); initResultRowInfo(&pInfo->binfo.resultRowInfo, 8, TSDB_DATA_TYPE_INT); @@ -6754,9 +7084,10 @@ static void destroyArithOperatorInfo(void* param, int32_t numOfOutput) { SOperatorInfo* createStableAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { SAggOperatorInfo* pInfo = calloc(1, sizeof(SAggOperatorInfo)); - pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, 1); + size_t tableGroup = GET_NUM_OF_TABLEGROUP(pRuntimeEnv); + pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, (int32_t) tableGroup); pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset); - initResultRowInfo(&pInfo->binfo.resultRowInfo, 8, TSDB_DATA_TYPE_INT); + initResultRowInfo(&pInfo->binfo.resultRowInfo, tableGroup, TSDB_DATA_TYPE_INT); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); pOperator->name = "STableAggregate"; @@ -6856,7 +7187,7 @@ SOperatorInfo* createIntervalAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOpe pOperator->info = pInfo; pOperator->pRuntimeEnv = pRuntimeEnv; - pOperator->exec = doHashIntervalAgg; + pOperator->exec = doIntervalAgg; pOperator->cleanup = destroyBasicOperatorInfo; return pOperator; @@ -7044,10 +7375,10 @@ static SSDataBlock* doTagScan(void* param) { count += 1; } - pRes->info.rows = count; qDebug("QInfo:%p create tag values results completed, rows:%d", pRuntimeEnv->qinfo, count); } + pRes->info.rows = count; return (pRes->info.rows == 0)? NULL:pInfo->pRes; } @@ -7163,18 +7494,9 @@ void buildTableBlockDistResult(SQInfo *pQInfo) { void stableQueryImpl(SQInfo *pQInfo) { SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv; -// pRuntimeEnv->resultInfo.rows = 0; int64_t st = taosGetTimestampUs(); - -// if (QUERY_IS_INTERVAL_QUERY(pQuery) || -// (isFixedOutputQuery(pQuery) && (!isPointInterpoQuery(pQuery)) && (!pQuery->groupbyColumn))) { - //multiTableQueryProcess(pQInfo); - pRuntimeEnv->outputBuf = pRuntimeEnv->proot->exec(pRuntimeEnv->proot); -// } else { -// assert(pQuery->checkResultBuf == 1 || isPointInterpoQuery(pQuery) || pQuery->groupbyColumn); -// sequentialTableProcess(pQInfo); -// } + pRuntimeEnv->outputBuf = pRuntimeEnv->proot->exec(pRuntimeEnv->proot); // record the total elapsed time pQInfo->summary.elapsedTime += (taosGetTimestampUs() - st); @@ -7451,7 +7773,7 @@ int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SQueryParam* param) { pExprMsg->colInfo.colIndex = htons(pExprMsg->colInfo.colIndex); pExprMsg->colInfo.colId = htons(pExprMsg->colInfo.colId); - pExprMsg->colInfo.flag = htons(pExprMsg->colInfo.flag); + pExprMsg->colInfo.flag = htons(pExprMsg->colInfo.flag); pExprMsg->colType = htons(pExprMsg->colType); pExprMsg->colBytes = htons(pExprMsg->colBytes); @@ -7941,8 +8263,9 @@ static void calResultBufSize(SQuery* pQuery, SRspResultInfo* pResultInfo) { } } -SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyExpr, SExprInfo *pExprs, - SExprInfo *pSecExprs, STableGroupInfo *pTableGroupInfo, SColumnInfo* pTagCols, bool stableQuery, char* sql) { +SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SSqlGroupbyExpr* pGroupbyExpr, SExprInfo* pExprs, + SExprInfo* pSecExprs, STableGroupInfo* pTableGroupInfo, SColumnInfo* pTagCols, bool stableQuery, + char* sql) { int16_t numOfCols = pQueryMsg->numOfCols; int16_t numOfOutput = pQueryMsg->numOfOutput; @@ -8323,26 +8646,15 @@ void freeQInfo(SQInfo *pQInfo) { } size_t getResultSize(SQInfo *pQInfo, int64_t *numOfRows) { - SQuery *pQuery = pQInfo->runtimeEnv.pQuery; + SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv; + SQuery *pQuery = pRuntimeEnv->pQuery; /* * get the file size and set the numOfRows to be the file size, since for tsComp query, * the returned row size is equalled to 1 * TODO handle the case that the file is too large to send back one time */ - if (isTsCompQuery(pQuery) && (*numOfRows) > 0) { - struct stat fStat; - FILE *f = NULL;//*(FILE **)pQuery->sdata[0]->data; - if ((f != NULL) && (fstat(fileno(f), &fStat) == 0)) { - *numOfRows = fStat.st_size; - return fStat.st_size; - } else { - qError("QInfo:%p failed to get file info, file:%p, reason:%s", pQInfo, f, strerror(errno)); - return 0; - } - } else { - return (size_t)(pQuery->resultRowSize * (*numOfRows)); - } + return (size_t)(pQuery->resultRowSize * (*numOfRows)); } int32_t doDumpQueryResult(SQInfo *pQInfo, char *data) { @@ -8352,12 +8664,13 @@ int32_t doDumpQueryResult(SQInfo *pQInfo, char *data) { // load data from file to msg buffer if (isTsCompQuery(pQuery)) { - - FILE *f = NULL;//*(FILE **)pQuery->sdata[0]->data; // TODO refactor + SColumnInfoData* pColInfoData = taosArrayGet(pRuntimeEnv->outputBuf->pDataBlock, 0); + FILE *f = *(FILE **)pColInfoData->pData; // TODO refactor // make sure file exist if (f) { off_t s = lseek(fileno(f), 0, SEEK_END); + assert(s == pRuntimeEnv->outputBuf->info.rows); qDebug("QInfo:%p ts comp data return, file:%p, size:%"PRId64, pQInfo, f, (uint64_t)s); if (fseek(f, 0, SEEK_SET) >= 0) { @@ -8372,14 +8685,13 @@ int32_t doDumpQueryResult(SQInfo *pQInfo, char *data) { assert(0); } + // dump error info if (s <= (sizeof(STSBufFileHeader) + sizeof(STSGroupBlockInfo) + 6 * sizeof(int32_t))) { qDump(data, s); - assert(0); } fclose(f); -// *(FILE **)pQuery->sdata[0]->data = NULL; } // all data returned, set query over diff --git a/src/query/src/queryMain.c b/src/query/src/queryMain.c index 266dd064ab..d78cea741f 100644 --- a/src/query/src/queryMain.c +++ b/src/query/src/queryMain.c @@ -312,10 +312,9 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co SQuery *pQuery = pQInfo->runtimeEnv.pQuery; SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv; - int64_t s = GET_NUM_OF_RESULTS(pRuntimeEnv); - - size_t size = getResultSize(pQInfo, &s); + int32_t s = GET_NUM_OF_RESULTS(pRuntimeEnv); + size_t size = pQuery->resultRowSize * s; size += sizeof(int32_t); size += sizeof(STableIdInfo) * taosHashGetSize(pQInfo->arrTableIdInfo); diff --git a/src/tsdb/src/tsdbRead.c b/src/tsdb/src/tsdbRead.c index 4934e81c16..3d70487986 100644 --- a/src/tsdb/src/tsdbRead.c +++ b/src/tsdb/src/tsdbRead.c @@ -295,8 +295,8 @@ static void resetCheckInfo(STsdbQueryHandle* pQueryHandle) { for (int32_t i = 0; i < numOfTables; ++i) { STableCheckInfo* pCheckInfo = (STableCheckInfo*) taosArrayGet(pQueryHandle->pTableCheckInfo, i); pCheckInfo->lastKey = pQueryHandle->window.skey; - pCheckInfo->iter = tSkipListDestroyIter(pCheckInfo->iter); - pCheckInfo->iiter = tSkipListDestroyIter(pCheckInfo->iiter); + pCheckInfo->iter = tSkipListDestroyIter(pCheckInfo->iter); + pCheckInfo->iiter = tSkipListDestroyIter(pCheckInfo->iiter); pCheckInfo->initBuf = false; if (ASCENDING_TRAVERSE(pQueryHandle->order)) { @@ -453,6 +453,39 @@ void tsdbResetQueryHandle(TsdbQueryHandleT queryHandle, STsdbQueryCond *pCond) { resetCheckInfo(pQueryHandle); } +void tsdbResetQueryHandleForNewTable(TsdbQueryHandleT queryHandle, STsdbQueryCond *pCond, STableGroupInfo* groupList) { + STsdbQueryHandle* pQueryHandle = queryHandle; + + pQueryHandle->order = pCond->order; + pQueryHandle->window = pCond->twindow; + pQueryHandle->type = TSDB_QUERY_TYPE_ALL; + pQueryHandle->cur.fid = -1; + pQueryHandle->cur.win = TSWINDOW_INITIALIZER; + pQueryHandle->checkFiles = true; + pQueryHandle->activeIndex = 0; // current active table index + pQueryHandle->locateStart = false; + pQueryHandle->loadExternalRow = pCond->loadExternalRows; + + if (ASCENDING_TRAVERSE(pCond->order)) { + assert(pQueryHandle->window.skey <= pQueryHandle->window.ekey); + } else { + assert(pQueryHandle->window.skey >= pQueryHandle->window.ekey); + } + + // allocate buffer in order to load data blocks from file + memset(pQueryHandle->statis, 0, sizeof(SDataStatis)); + + tsdbInitDataBlockLoadInfo(&pQueryHandle->dataBlockLoadInfo); + tsdbInitCompBlockLoadInfo(&pQueryHandle->compBlockLoadInfo); + + STsdbMeta* pMeta = tsdbGetMeta(pQueryHandle->pTsdb); + pQueryHandle->pTableCheckInfo = createCheckInfoFromTableGroup(pQueryHandle, groupList, pMeta); + if (pQueryHandle->pTableCheckInfo == NULL) { + tsdbCleanupQueryHandle(pQueryHandle); + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + } +} + TsdbQueryHandleT tsdbQueryLastRow(STsdbRepo *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, void* qinfo, SMemRef* pMemRef) { pCond->twindow = updateLastrowForEachGroup(groupList); @@ -3105,23 +3138,26 @@ static void* doFreeColumnInfoData(SArray* pColumnInfoData) { return NULL; } +static void* destroyTableCheckInfo(SArray* pTableCheckInfo) { + size_t size = taosArrayGetSize(pTableCheckInfo); + for (int32_t i = 0; i < size; ++i) { + STableCheckInfo* p = taosArrayGet(pTableCheckInfo, i); + destroyTableMemIterator(p); + + tfree(p->pCompInfo); + } + + taosArrayDestroy(pTableCheckInfo); + return NULL; +} + void tsdbCleanupQueryHandle(TsdbQueryHandleT queryHandle) { STsdbQueryHandle* pQueryHandle = (STsdbQueryHandle*)queryHandle; if (pQueryHandle == NULL) { return; } - - if (pQueryHandle->pTableCheckInfo != NULL) { - size_t size = taosArrayGetSize(pQueryHandle->pTableCheckInfo); - for (int32_t i = 0; i < size; ++i) { - STableCheckInfo* pTableCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, i); - destroyTableMemIterator(pTableCheckInfo); - - tfree(pTableCheckInfo->pCompInfo); - } - taosArrayDestroy(pQueryHandle->pTableCheckInfo); - } + pQueryHandle->pTableCheckInfo = destroyTableCheckInfo(pQueryHandle->pTableCheckInfo); pQueryHandle->pColumns = doFreeColumnInfoData(pQueryHandle->pColumns); taosArrayDestroy(pQueryHandle->defaultLoadColumn);