[td-2895] refactor.

This commit is contained in:
Haojun Liao 2021-03-01 17:29:55 +08:00
parent 18112c74cb
commit dd3174df37
8 changed files with 550 additions and 185 deletions

View File

@ -494,8 +494,6 @@ int tscProcessSql(SSqlObj *pSql) {
return pSql->res.code;
}
} else if (pCmd->command >= TSDB_SQL_LOCAL) {
//pSql->epSet = tscMgmtEpSet;
// } else { // local handler
return (*tscProcessMsgRsp[pCmd->command])(pSql);
}

View File

@ -497,6 +497,7 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) {
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, 0);
int16_t funcId = pExpr->functionId;
// add the invisible timestamp column
if ((pExpr->colInfo.colId != PRIMARYKEY_TIMESTAMP_COL_INDEX) ||
(funcId != TSDB_FUNC_TS && funcId != TSDB_FUNC_TS_DUMMY && funcId != TSDB_FUNC_PRJ)) {

View File

@ -307,7 +307,7 @@ int32_t tsdbQuerySTableByTagCond(STsdbRepo *tsdb, uint64_t uid, TSKEY key, const
SColIndex *pColIndex, int32_t numOfCols);
/**
* destory the created table group list, which is generated by tag query
* destroy the created table group list, which is generated by tag query
* @param pGroupList
*/
void tsdbDestroyTableGroup(STableGroupInfo *pGroupList);
@ -339,6 +339,8 @@ void tsdbCleanupQueryHandle(TsdbQueryHandleT queryHandle);
void tsdbResetQueryHandle(TsdbQueryHandleT queryHandle, STsdbQueryCond *pCond);
void tsdbResetQueryHandleForNewTable(TsdbQueryHandleT queryHandle, STsdbQueryCond *pCond, STableGroupInfo* groupList);
/**
* get the statistics of repo usage
* @param repo. point to the tsdbrepo

View File

@ -241,12 +241,16 @@ typedef void (*__optr_cleanup_fn_t)(void* param, int32_t num);
struct SOperatorInfo;
typedef struct {
FILE* file; // file struct pointer
} SFileResultInfo;
typedef struct SQueryRuntimeEnv {
jmp_buf env;
SQuery* pQuery;
void* qinfo;
uint16_t scanFlag; // denotes reversed scan of data or not
SFillInfo* pFillInfo;
SFillInfo* pFillInfo; // todo move to operatorInfo
void* pQueryHandle;
int32_t prevGroupId; // previous executed group id
@ -264,10 +268,10 @@ typedef struct SQueryRuntimeEnv {
SArithmeticSupport *sasArray;
SSDataBlock *outputBuf;
int32_t groupIndex;
int32_t tableIndex;
int32_t tableIndex; //TODO remove it
STableGroupInfo tableqinfoGroupInfo; // this is a group array list, including SArray<STableQueryInfo*> structure
struct SOperatorInfo *proot;
struct SOperatorInfo *pTableScanner; // table scan operator
SGroupResInfo groupResInfo;
int64_t currentOffset; // dynamic offset value
@ -364,6 +368,8 @@ typedef struct STableScanInfo {
int32_t numOfOutput;
int64_t elapsedTime;
int32_t tableIndex;
} STableScanInfo;
typedef struct STagScanInfo {

View File

@ -208,7 +208,7 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
if (functionId == TSDB_FUNC_TS_COMP) {
*type = TSDB_DATA_TYPE_BINARY;
*bytes = sizeof(int32_t); // this results is compressed ts data
*bytes = 1; // this results is compressed ts data, only one byte
*interBytes = POINTER_BYTES;
return TSDB_CODE_SUCCESS;
}
@ -4203,11 +4203,22 @@ static void ts_comp_finalize(SQLFunctionCtx *pCtx) {
STSBuf * pTSbuf = pInfo->pTSBuf;
tsBufFlush(pTSbuf);
qDebug("total timestamp :%"PRId64, pTSbuf->numOfTotal);
// TODO refactor transfer ownership of current file
*(FILE **)pCtx->pOutput = pTSbuf->f;
pResInfo->complete = true;
// get the file size
struct stat fStat;
if ((fstat(fileno(pTSbuf->f), &fStat) == 0)) {
pResInfo->numOfRes = fStat.st_size;
}
pTSbuf->remainOpen = true;
tsBufDestroy(pTSbuf);
doFinalizer(pCtx);
}

View File

@ -177,6 +177,8 @@ static STableIdInfo createTableIdInfo(SQuery* pQuery);
static SOperatorInfo* createBiDirectionTableScanInfo(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, int32_t repeatTime, int32_t reverseTime);
static SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, int32_t repeatTime);
static SOperatorInfo* createSeqTableBlockScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv);
static void setTableScanFilterOperatorInfo(STableScanInfo* pTableScanInfo, SOperatorInfo* pDownstream);
static int32_t getNumOfScanTimes(SQuery* pQuery);
@ -1178,8 +1180,6 @@ static void doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SQLFunction
SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
for (int32_t k = 0; k < pOperator->numOfOutput; ++k) {
// setBlockStatisInfo(&pCtx[k], pSDataBlock, &pOperator->pExpr[k].base.colInfo);
int32_t functionId = pCtx[k].functionId;
if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) {
pCtx[k].startTs = startTs;// this can be set during create the struct
@ -1197,6 +1197,172 @@ static void arithmeticApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionC
}
}
void doRowwiseTimeWindowInterpolation(SOperatorInfo* pOperator, SOptrBasicInfo* pInfo,
SArray *pDataBlock, TSKEY prevTs, int32_t prevRowIndex, TSKEY curTs, int32_t curRowIndex, TSKEY windowKey,
int32_t type) {
SQueryRuntimeEnv *pRuntimeEnv = pOperator->pRuntimeEnv;
SExprInfo* pExpr = pOperator->pExpr;
SQLFunctionCtx* pCtx = pInfo->pCtx;
for (int32_t k = 0; k < pOperator->numOfOutput; ++k) {
int32_t functionId = pCtx[k].functionId;
if (functionId != TSDB_FUNC_TWA && functionId != TSDB_FUNC_INTERP) {
pCtx[k].start.key = INT64_MIN;
continue;
}
SColIndex * pColIndex = &pExpr[k].base.colInfo;
int16_t index = pColIndex->colIndex;
SColumnInfoData *pColInfo = taosArrayGet(pDataBlock, index);
assert(pColInfo->info.colId == pColIndex->colId && curTs != windowKey);
double v1 = 0, v2 = 0, v = 0;
if (prevRowIndex == -1) {
GET_TYPED_DATA(v1, double, pColInfo->info.type, (char *)pRuntimeEnv->prevRow[index]);
} else {
GET_TYPED_DATA(v1, double, pColInfo->info.type, (char *)pColInfo->pData + prevRowIndex * pColInfo->info.bytes);
}
GET_TYPED_DATA(v2, double, pColInfo->info.type, (char *)pColInfo->pData + curRowIndex * pColInfo->info.bytes);
SPoint point1 = (SPoint){.key = prevTs, .val = &v1};
SPoint point2 = (SPoint){.key = curTs, .val = &v2};
SPoint point = (SPoint){.key = windowKey, .val = &v};
if (functionId == TSDB_FUNC_TWA) {
taosGetLinearInterpolationVal(&point, TSDB_DATA_TYPE_DOUBLE, &point1, &point2, TSDB_DATA_TYPE_DOUBLE);
if (type == RESULT_ROW_START_INTERP) {
pCtx[k].start.key = point.key;
pCtx[k].start.val = v;
} else {
pCtx[k].end.key = point.key;
pCtx[k].end.val = v;
}
} else {
if (type == RESULT_ROW_START_INTERP) {
pCtx[k].start.key = prevTs;
pCtx[k].start.val = v1;
pCtx[k].end.key = curTs;
pCtx[k].end.val = v2;
}
}
}
}
static bool setTimeWindowInterpolationStartTs(SOperatorInfo* pOperatorInfo, SQLFunctionCtx* pCtx,
int32_t pos, int32_t numOfRows,
SArray* pDataBlock, TSKEY* tsCols, STimeWindow* win, int16_t type) {
SQueryRuntimeEnv* pRuntimeEnv = pOperatorInfo->pRuntimeEnv;
SQuery* pQuery = pRuntimeEnv->pQuery;
TSKEY curTs = tsCols[pos];
TSKEY lastTs = *(TSKEY *) pRuntimeEnv->prevRow[0];
// lastTs == INT64_MIN and pos == 0 means this is the first time window, interpolation is not needed.
// start exactly from this point, no need to do interpolation
TSKEY key = QUERY_IS_ASC_QUERY(pQuery)? win->skey:win->ekey;
if (key == curTs) {
setNotInterpoWindowKey(pCtx, pOperatorInfo->numOfOutput, RESULT_ROW_START_INTERP);
return true;
}
if (lastTs == INT64_MIN && ((pos == 0 && QUERY_IS_ASC_QUERY(pQuery)) || (pos == (numOfRows - 1) && !QUERY_IS_ASC_QUERY(pQuery)))) {
setNotInterpoWindowKey(pCtx, pOperatorInfo->numOfOutput, RESULT_ROW_START_INTERP);
return true;
}
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
TSKEY prevTs = ((pos == 0 && QUERY_IS_ASC_QUERY(pQuery)) || (pos == (numOfRows - 1) && !QUERY_IS_ASC_QUERY(pQuery)))?
lastTs:tsCols[pos - step];
doRowwiseTimeWindowInterpolation(pOperatorInfo, pOperatorInfo->info, pDataBlock, prevTs, pos - step, curTs, pos, key, RESULT_ROW_START_INTERP);
return true;
}
static bool setTimeWindowInterpolationEndTs(SOperatorInfo* pOperatorInfo, SQLFunctionCtx* pCtx,
int32_t endRowIndex, SArray* pDataBlock, TSKEY* tsCols, TSKEY blockEkey, STimeWindow* win) {
SQueryRuntimeEnv *pRuntimeEnv = pOperatorInfo->pRuntimeEnv;
SQuery* pQuery = pRuntimeEnv->pQuery;
int32_t numOfOutput = pOperatorInfo->numOfOutput;
TSKEY actualEndKey = tsCols[endRowIndex];
TSKEY key = QUERY_IS_ASC_QUERY(pQuery)? win->ekey:win->skey;
// not ended in current data block, do not invoke interpolation
if ((key > blockEkey && QUERY_IS_ASC_QUERY(pQuery)) || (key < blockEkey && !QUERY_IS_ASC_QUERY(pQuery))) {
setNotInterpoWindowKey(pCtx, numOfOutput, RESULT_ROW_END_INTERP);
return false;
}
// there is actual end point of current time window, no interpolation need
if (key == actualEndKey) {
setNotInterpoWindowKey(pCtx, numOfOutput, RESULT_ROW_END_INTERP);
return true;
}
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
int32_t nextRowIndex = endRowIndex + step;
assert(nextRowIndex >= 0);
TSKEY nextKey = tsCols[nextRowIndex];
doRowwiseTimeWindowInterpolation(pOperatorInfo, pOperatorInfo->info, pDataBlock, actualEndKey, endRowIndex, nextKey, nextRowIndex, key, RESULT_ROW_END_INTERP);
return true;
}
static void doWindowBorderInterpolation(SOperatorInfo* pOperatorInfo, SSDataBlock* pBlock, SQLFunctionCtx* pCtx,
SResultRow* pResult, STimeWindow* win, int32_t startPos, int32_t forwardStep) {
SQueryRuntimeEnv* pRuntimeEnv = pOperatorInfo->pRuntimeEnv;
SQuery* pQuery = pRuntimeEnv->pQuery;
if (!pQuery->timeWindowInterpo) {
return;
}
assert(pBlock != NULL);
int32_t fillType = pQuery->fillType;
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, 0);
TSKEY *tsCols = (TSKEY *)(pColInfo->pData);
bool done = resultRowInterpolated(pResult, RESULT_ROW_START_INTERP);
if (!done) { // it is not interpolated, now start to generated the interpolated value
int32_t startRowIndex = startPos;
bool interp = setTimeWindowInterpolationStartTs(pOperatorInfo, pCtx, startRowIndex, pBlock->info.rows, pBlock->pDataBlock,
tsCols, win, fillType);
if (interp) {
setResultRowInterpo(pResult, RESULT_ROW_START_INTERP);
}
} else {
setNotInterpoWindowKey(pCtx, pQuery->numOfOutput, RESULT_ROW_START_INTERP);
}
// point interpolation does not require the end key time window interpolation.
if (isPointInterpoQuery(pQuery)) {
return;
}
// interpolation query does not generate the time window end interpolation
done = resultRowInterpolated(pResult, RESULT_ROW_END_INTERP);
if (!done) {
int32_t endRowIndex = startPos + (forwardStep - 1) * step;
TSKEY endKey = QUERY_IS_ASC_QUERY(pQuery)? pBlock->info.window.ekey:pBlock->info.window.skey;
bool interp = setTimeWindowInterpolationEndTs(pOperatorInfo, pCtx, endRowIndex, pBlock->pDataBlock, tsCols, endKey, win);
if (interp) {
setResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
}
} else {
setNotInterpoWindowKey(pCtx, pQuery->numOfOutput, RESULT_ROW_END_INTERP);
}
}
static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo,
STableIntervalOperatorInfo* pInfo, SSDataBlock* pSDataBlock, int32_t groupId) {
SQueryRuntimeEnv* pRuntimeEnv = pOperatorInfo->pRuntimeEnv;
@ -1248,9 +1414,9 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
numOfOutput, pInfo->rowCellInfoOffset);
assert(ret == TSDB_CODE_SUCCESS && !resultRowInterpolated(pResult, RESULT_ROW_END_INTERP));
// int32_t p = QUERY_IS_ASC_QUERY(pQuery) ? 0 : pSDataBlock->info.rows - 1;
// doRowwiseTimeWindowInterpolation(pRuntimeEnv, pSDataBlock->pDataBlock, *(TSKEY *)pRuntimeEnv->prevRow[0],
// -1, tsCols[0], p, w.ekey, RESULT_ROW_END_INTERP);
doRowwiseTimeWindowInterpolation(pOperatorInfo, pInfo, pSDataBlock->pDataBlock, *(TSKEY *)pRuntimeEnv->prevRow[0],
-1, tsCols[startPos], startPos, w.ekey, RESULT_ROW_END_INTERP);
setResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
setNotInterpoWindowKey(pInfo->pCtx, pQuery->numOfOutput, RESULT_ROW_START_INTERP);
@ -1265,8 +1431,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
}
// window start key interpolation
// doWindowBorderInterpolation(pRuntimeEnv, &pSDataBlock->info, pSDataBlock->pDataBlock, pResult, &win, pQuery->pos,
// forwardStep);
doWindowBorderInterpolation(pOperatorInfo, pSDataBlock, pInfo->pCtx, pResult, &win, startPos, forwardStep);
doBlockwiseApplyFunctions_rv(pRuntimeEnv, pInfo->pCtx, &win, startPos, forwardStep, tsCols, pSDataBlock->info.rows,
numOfOutput);
@ -1290,8 +1455,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
getNumOfRowsInTimeWindow(pQuery, &pSDataBlock->info, tsCols, startPos, ekey, binarySearchForKey, true);
// window start(end) key interpolation
// doWindowBorderInterpolation(pRuntimeEnv, &pSDataBlock->info, pSDataBlock->pDataBlock, pResult, &nextWin,
// startPos, forwardStep);
doWindowBorderInterpolation(pOperatorInfo, pSDataBlock, pInfo->pCtx, pResult, &nextWin, startPos, forwardStep);
doBlockwiseApplyFunctions_rv(pRuntimeEnv, pInfo->pCtx, &nextWin, startPos, forwardStep, tsCols,
pSDataBlock->info.rows, numOfOutput);
}
@ -2133,7 +2297,7 @@ static void* destroySQLFunctionCtx(SQLFunctionCtx* pCtx, int32_t numOfOutput) {
static void calResultBufSize(SQuery* pQuery, SRspResultInfo* pResultInfo);
static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOfTables, int16_t order, int32_t vgId) {
static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOfTables) {
qDebug("QInfo:%p setup runtime env", GET_QINFO_ADDR(pRuntimeEnv));
SQuery *pQuery = pRuntimeEnv->pQuery;
@ -2159,6 +2323,7 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
goto _clean;
}
if (pQuery->numOfCols) {
char* start = POINTER_BYTES * pQuery->numOfCols + (char*) pRuntimeEnv->prevRow;
pRuntimeEnv->prevRow[0] = start;
for(int32_t i = 1; i < pQuery->numOfCols; ++i) {
@ -2166,6 +2331,7 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
}
*(int64_t*) pRuntimeEnv->prevRow[0] = INT64_MIN;
}
qDebug("QInfo:%p init runtime completed", GET_QINFO_ADDR(pRuntimeEnv));
@ -2173,11 +2339,11 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
// interval (down sampling operation)
if (QUERY_IS_INTERVAL_QUERY(pQuery)) {
if (pQuery->stableQuery) {
pRuntimeEnv->proot = createStableIntervalOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQuery->pExpr1, pQuery->numOfOutput);
setTableScanFilterOperatorInfo(pRuntimeEnv->proot->info, pRuntimeEnv->proot);
pRuntimeEnv->proot = createStableIntervalOperatorInfo(pRuntimeEnv, pRuntimeEnv->pTableScanner, pQuery->pExpr1, pQuery->numOfOutput);
setTableScanFilterOperatorInfo(pRuntimeEnv->pTableScanner->info, pRuntimeEnv->proot);
} else {
pRuntimeEnv->proot = createIntervalAggOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQuery->pExpr1, pQuery->numOfOutput);
setTableScanFilterOperatorInfo(pRuntimeEnv->proot->info, pRuntimeEnv->proot);
pRuntimeEnv->proot = createIntervalAggOperatorInfo(pRuntimeEnv, pRuntimeEnv->pTableScanner, pQuery->pExpr1, pQuery->numOfOutput);
setTableScanFilterOperatorInfo(pRuntimeEnv->pTableScanner->info, pRuntimeEnv->proot);
if (pQuery->pExpr2 != NULL) {
pRuntimeEnv->proot = createArithOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQuery->pExpr2, pQuery->numOfExpr2);
@ -2190,29 +2356,31 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
}
} else if (pQuery->groupbyColumn) {
pRuntimeEnv->proot = createHashGroupbyOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQuery->pExpr1, pQuery->numOfOutput);
setTableScanFilterOperatorInfo(pRuntimeEnv->proot->info, pRuntimeEnv->proot);
pRuntimeEnv->proot = createHashGroupbyOperatorInfo(pRuntimeEnv, pRuntimeEnv->pTableScanner, pQuery->pExpr1, pQuery->numOfOutput);
setTableScanFilterOperatorInfo(pRuntimeEnv->pTableScanner->info, pRuntimeEnv->proot);
if (pQuery->pExpr2 != NULL) {
pRuntimeEnv->proot = createArithOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQuery->pExpr2, pQuery->numOfExpr2);
}
} else if (isFixedOutputQuery(pQuery)) {
if (!pQuery->stableQuery) {
pRuntimeEnv->proot = createAggregateOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQuery->pExpr1, pQuery->numOfOutput);
if (pQuery->stableQuery && !isTsCompQuery(pQuery)) {
pRuntimeEnv->proot = createStableAggOperatorInfo(pRuntimeEnv, pRuntimeEnv->pTableScanner, pQuery->pExpr1, pQuery->numOfOutput);
} else {
pRuntimeEnv->proot = createStableAggOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQuery->pExpr1, pQuery->numOfOutput);
pRuntimeEnv->proot = createAggregateOperatorInfo(pRuntimeEnv, pRuntimeEnv->pTableScanner, pQuery->pExpr1, pQuery->numOfOutput);
}
setTableScanFilterOperatorInfo(pRuntimeEnv->proot->info, pRuntimeEnv->proot);
setTableScanFilterOperatorInfo(pRuntimeEnv->pTableScanner->info, pRuntimeEnv->proot);
if (pQuery->pExpr2 != NULL) {
pRuntimeEnv->proot = createArithOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQuery->pExpr2, pQuery->numOfExpr2);
}
} else { // diff/add/multiply/subtract/division
assert(pQuery->checkResultBuf == 1);
if (!onlyQueryTags(pQuery)) {
pRuntimeEnv->proot = createArithOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQuery->pExpr1, pQuery->numOfOutput);
setTableScanFilterOperatorInfo(pRuntimeEnv->proot->info, pRuntimeEnv->proot);
// if (isTsCompQuery(pQuery)) {
// pRuntimeEnv->proot = createSeqTableBlockScanOperator(pRuntimeEnv->pTableScanner, pRuntimeEnv);
/*} else*/ if (!onlyQueryTags(pQuery)) {
pRuntimeEnv->proot = createArithOperatorInfo(pRuntimeEnv, pRuntimeEnv->pTableScanner, pQuery->pExpr1, pQuery->numOfOutput);
setTableScanFilterOperatorInfo(pRuntimeEnv->pTableScanner->info, pRuntimeEnv->proot);
}
}
@ -2249,23 +2417,12 @@ static void doFreeQueryHandle(SQInfo* pQInfo) {
assert(pMemRef->ref == 0 && pMemRef->imem == NULL && pMemRef->mem == NULL);
}
static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) {
SQuery *pQuery = pRuntimeEnv->pQuery;
SQInfo* pQInfo = (SQInfo*) GET_QINFO_ADDR(pRuntimeEnv);
qDebug("QInfo:%p teardown runtime env", pQInfo);
// if (isTsCompQuery(pQuery)) {
// FILE *f = *(FILE **)pQuery->sdata[0]->data;
//
// if (f) {
// fclose(f);
// *(FILE **)pQuery->sdata[0]->data = NULL;
// }
// }
if (pRuntimeEnv->sasArray != NULL) {
for(int32_t i = 0; i < pQuery->numOfOutput; ++i) {
tfree(pRuntimeEnv->sasArray[i].data);
@ -2327,7 +2484,7 @@ static bool isFixedOutputQuery(SQuery* pQuery) {
}
// Note:top/bottom query is fixed output query
if (pQuery->topBotQuery || pQuery->groupbyColumn) {
if (pQuery->topBotQuery || pQuery->groupbyColumn || isTsCompQuery(pQuery)) {
return true;
}
@ -2759,13 +2916,61 @@ static bool overlapWithTimeWindow(SQuery* pQuery, SDataBlockInfo* pBlockInfo) {
return false;
}
static int32_t doTSJoinFilter(SQueryRuntimeEnv *pRuntimeEnv, TSKEY key, bool ascQuery) {
STSElem elem = tsBufGetElem(pRuntimeEnv->pTsBuf);
void filterDataBlock_rv(SSingleColumnFilterInfo *pFilterInfo, int32_t numOfFilterCols, SSDataBlock* pBlock) {
#if defined(_DEBUG_VIEW)
printf("elem in comp ts file:%" PRId64 ", key:%" PRId64 ", tag:%"PRIu64", query order:%d, ts order:%d, traverse:%d, index:%d\n",
elem.ts, key, elem.tag.i64, pQuery->order.order, pRuntimeEnv->pTsBuf->tsOrder,
pRuntimeEnv->pTsBuf->cur.order, pRuntimeEnv->pTsBuf->cur.tsIndex);
#endif
if (ascQuery) {
if (key < elem.ts) {
return TS_JOIN_TS_NOT_EQUALS;
} else if (key > elem.ts) {
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_INCONSISTAN);
}
} else {
if (key > elem.ts) {
return TS_JOIN_TS_NOT_EQUALS;
} else if (key < elem.ts) {
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_INCONSISTAN);
}
}
return TS_JOIN_TS_EQUAL;
}
void filterDataBlock_rv(SQueryRuntimeEnv* pRuntimeEnv, SSingleColumnFilterInfo *pFilterInfo,
int32_t numOfFilterCols, SSDataBlock* pBlock, STSBuf* pTsBuf, bool ascQuery) {
int32_t numOfRows = pBlock->info.rows;
int8_t *p = calloc(numOfRows, sizeof(int8_t));
bool all = true;
if (pTsBuf != NULL) {
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, 0);
TSKEY* k = (TSKEY*) pColInfoData->pData;
for (int32_t i = 0; i < numOfRows; ++i) {
int32_t offset = ascQuery? i:(numOfRows - i - 1);
int32_t ret = doTSJoinFilter(pRuntimeEnv, k[offset], ascQuery);
if (ret == TS_JOIN_TAG_NOT_EQUALS) {
break;
} else if (ret == TS_JOIN_TS_NOT_EQUALS) {
all = false;
continue;
} else {
assert(ret == TS_JOIN_TS_EQUAL);
p[offset] = true;
}
if (!tsBufNextPos(pRuntimeEnv->pTsBuf)) {
break;
}
}
} else {
for (int32_t i = 0; i < numOfRows; ++i) {
bool qualified = false;
@ -2809,6 +3014,7 @@ void filterDataBlock_rv(SSingleColumnFilterInfo *pFilterInfo, int32_t numOfFilte
all = false;
}
}
}
if (!all) {
int32_t start = 0;
@ -2861,7 +3067,12 @@ void filterDataBlock_rv(SSingleColumnFilterInfo *pFilterInfo, int32_t numOfFilte
tfree(p);
}
int32_t loadDataBlockOnDemand_rv(SQueryRuntimeEnv *pRuntimeEnv, STableScanInfo* pTableScanInfo, SSDataBlock *pBlock, uint32_t *status) {
static SColumnInfo* doGetTagColumnInfoById(SColumnInfo* pTagColList, int32_t numOfTags, int16_t colId);
static void doSetTagValueInParam(void* pTable, int32_t tagColId, tVariant *tag, int16_t type, int16_t bytes);
//TODO refactor
int32_t loadDataBlockOnDemand_rv(SQueryRuntimeEnv *pRuntimeEnv, STableScanInfo* pTableScanInfo, SSDataBlock *pBlock,
uint32_t *status) {
*status = BLK_DATA_NO_NEEDED;
pBlock->pDataBlock = NULL;
pBlock->pBlockStatis = NULL;
@ -2872,7 +3083,25 @@ int32_t loadDataBlockOnDemand_rv(SQueryRuntimeEnv *pRuntimeEnv, STableScanInfo*
SQInfo* pQInfo = GET_QINFO_ADDR(pRuntimeEnv);
SQueryCostInfo* pCost = &pQInfo->summary;
if (pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTsBuf > 0) {
if (pRuntimeEnv->pTsBuf != NULL && pQuery->stableQuery) {
SExprInfo* pExprInfo = &pTableScanInfo->pExpr[0];
int16_t tagId = (int16_t)pExprInfo->base.arg->argValue.i64;
SColumnInfo* pColInfo = doGetTagColumnInfoById(pQuery->tagColList, pQuery->numOfTags, tagId);
// compare tag first
tVariant t = {0};
doSetTagValueInParam(pQuery->current->pTable, tagId, &t, pColInfo->type, pColInfo->bytes);
STSElem elem = tsBufGetElem(pRuntimeEnv->pTsBuf);
if (tsBufIsValidElem(&elem) && tVariantCompare(&t, elem.tag) == 0) {
*status = BLK_DATA_ALL_NEEDED;
} else {
(*status) = BLK_DATA_DISCARD;
return TSDB_CODE_SUCCESS;
}
}
if (pQuery->numOfFilterCols > 0) {
*status = BLK_DATA_ALL_NEEDED;
} else { // check if this data block is required to load
// Calculate all time windows that are overlapping or contain current data block.
@ -2890,8 +3119,9 @@ int32_t loadDataBlockOnDemand_rv(SQueryRuntimeEnv *pRuntimeEnv, STableScanInfo*
bool masterScan = IS_MASTER_SCAN(pRuntimeEnv);
TSKEY k = QUERY_IS_ASC_QUERY(pQuery) ? pBlock->info.window.skey : pBlock->info.window.ekey;
STimeWindow win = getActiveTimeWindow(pTableScanInfo->pResultRowInfo, k, pQuery);
if (setWindowOutputBufByKey(pRuntimeEnv, pTableScanInfo->pResultRowInfo, &win, masterScan, &pResult,
groupId, pTableScanInfo->pCtx, pTableScanInfo->numOfOutput, pTableScanInfo->rowCellInfoOffset) != TSDB_CODE_SUCCESS) {
if (setWindowOutputBufByKey(pRuntimeEnv, pTableScanInfo->pResultRowInfo, &win, masterScan, &pResult, groupId,
pTableScanInfo->pCtx, pTableScanInfo->numOfOutput,
pTableScanInfo->rowCellInfoOffset) != TSDB_CODE_SUCCESS) {
// todo handle error in set result for timewindow
}
}
@ -2924,7 +3154,6 @@ int32_t loadDataBlockOnDemand_rv(SQueryRuntimeEnv *pRuntimeEnv, STableScanInfo*
pBlockInfo->window.ekey, pBlockInfo->rows);
pCost->discardBlocks += 1;
} else if ((*status) == BLK_DATA_STATIS_NEEDED) {
// this function never returns error?
pCost->loadBlockStatis += 1;
tsdbRetrieveDataBlockStatisInfo(pTableScanInfo->pQueryHandle, &pBlock->pBlockStatis);
@ -2941,17 +3170,17 @@ int32_t loadDataBlockOnDemand_rv(SQueryRuntimeEnv *pRuntimeEnv, STableScanInfo*
tsdbRetrieveDataBlockStatisInfo(pTableScanInfo->pQueryHandle, &pBlock->pBlockStatis);
if (pQuery->topBotQuery && pBlock->pBlockStatis != NULL) {
bool load = false;
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
int32_t functionId = pTableScanInfo->pCtx[i].functionId;
if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM) {
load = topbot_datablock_filter(&pTableScanInfo->pCtx[i], (char *)&(pBlock->pBlockStatis[i].min), (char *)&(pBlock->pBlockStatis[i].max));
load = topbot_datablock_filter(&pTableScanInfo->pCtx[i], (char*)&(pBlock->pBlockStatis[i].min),
(char*)&(pBlock->pBlockStatis[i].max));
if (!load) {
// current block has been discard due to filter applied
pCost->discardBlocks += 1;
qDebug("QInfo:%p data block discard, brange:%"PRId64 "-%"PRId64", rows:%d", pQInfo, pBlockInfo->window.skey,
pBlockInfo->window.ekey, pBlockInfo->rows);
qDebug("QInfo:%p data block discard, brange:%" PRId64 "-%" PRId64 ", rows:%d", pQInfo,
pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
(*status) = BLK_DATA_DISCARD;
return TSDB_CODE_SUCCESS;
}
@ -2962,7 +3191,8 @@ int32_t loadDataBlockOnDemand_rv(SQueryRuntimeEnv *pRuntimeEnv, STableScanInfo*
// current block has been discard due to filter applied
if (!doFilterOnBlockStatistics(pRuntimeEnv, pBlock->pBlockStatis, pTableScanInfo->pCtx, pBlockInfo->rows)) {
pCost->discardBlocks += 1;
qDebug("QInfo:%p data block discard, brange:%"PRId64 "-%"PRId64", rows:%d", pQInfo, pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
qDebug("QInfo:%p data block discard, brange:%" PRId64 "-%" PRId64 ", rows:%d", pQInfo, pBlockInfo->window.skey,
pBlockInfo->window.ekey, pBlockInfo->rows);
(*status) = BLK_DATA_DISCARD;
return TSDB_CODE_SUCCESS;
}
@ -2974,9 +3204,8 @@ int32_t loadDataBlockOnDemand_rv(SQueryRuntimeEnv *pRuntimeEnv, STableScanInfo*
return terrno;
}
if (pQuery->numOfFilterCols > 0) {
if (pQuery->numOfFilterCols > 0 && pQuery->pFilterInfo[0].pData == NULL) {
// set the initial static data value filter expression
if (pQuery->pFilterInfo[0].pData == NULL) {
for (int32_t i = 0; i < pQuery->numOfFilterCols; ++i) {
for (int32_t j = 0; j < pBlock->info.numOfCols; ++j) {
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, j);
@ -2989,7 +3218,15 @@ int32_t loadDataBlockOnDemand_rv(SQueryRuntimeEnv *pRuntimeEnv, STableScanInfo*
}
}
filterDataBlock_rv(pQuery->pFilterInfo, pQuery->numOfFilterCols, pBlock);
// todo add tscomp filter
STableId* id = TSDB_TABLEID(pQuery->current->pTable);
if (pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTsBuf != NULL) {
if (id->tid == 2) {
int32_t k = 1;
printf("%d\n", k);
}
filterDataBlock_rv(pRuntimeEnv, pQuery->pFilterInfo, pQuery->numOfFilterCols, pBlock, pRuntimeEnv->pTsBuf,
QUERY_IS_ASC_QUERY(pQuery));
}
}
@ -3252,8 +3489,10 @@ void setTagVal_rv(SOperatorInfo* pOperatorInfo, void *pTable, SQLFunctionCtx* pC
// set the join tag for first column
SSqlFuncMsg* pFuncMsg = &pExprInfo->base;
if ((pFuncMsg->functionId == TSDB_FUNC_TS || pFuncMsg->functionId == TSDB_FUNC_PRJ) &&
pRuntimeEnv->pTsBuf != NULL && pFuncMsg->colInfo.colIndex == PRIMARYKEY_TIMESTAMP_COL_INDEX) {
if (pQuery->stableQuery &&
(pFuncMsg->functionId == TSDB_FUNC_TS || pFuncMsg->functionId == TSDB_FUNC_PRJ) &&
(pRuntimeEnv->pTsBuf != NULL) &&
pFuncMsg->colInfo.colIndex == PRIMARYKEY_TIMESTAMP_COL_INDEX) {
assert(pFuncMsg->numOfParams == 1);
int16_t tagColId = (int16_t)pExprInfo->base.arg->argValue.i64;
@ -5087,18 +5326,52 @@ static void generateBlockDistResult(STableBlockDist *pTableBlockDist) {
static void doDestroyTableQueryInfo(STableGroupInfo* pTableqinfoGroupInfo);
static void setTableQueryHandle(SQueryRuntimeEnv* pRuntimeEnv, int32_t tableIndex) {
SQuery* pQuery = pRuntimeEnv->pQuery;
SArray *group = GET_TABLEGROUP(pRuntimeEnv, 0);
// handle the first table
STableQueryInfo* pCheckInfo = taosArrayGetP(group, tableIndex);
STsdbQueryCond cond = {
.twindow = {pCheckInfo->lastKey, pCheckInfo->win.ekey},
.order = pQuery->order.order,
.colList = pQuery->colList,
.numOfCols = pQuery->numOfCols,
.loadExternalRows = false,
};
SArray *g1 = taosArrayInit(1, POINTER_BYTES);
SArray *tx = taosArrayInit(1, sizeof(STableKeyInfo));
STableKeyInfo info = {.pTable = pCheckInfo->pTable, .lastKey = pCheckInfo->lastKey};
taosArrayPush(tx, &info);
taosArrayPush(g1, &tx);
STableGroupInfo gp = {.numOfTables = 1, .pGroupList = g1};
if (pRuntimeEnv->pQueryHandle == NULL) {
pRuntimeEnv->pQueryHandle = tsdbQueryTables(pQuery->tsdb, &cond, &gp, pRuntimeEnv->qinfo, &pQuery->memRef);
} else {
tsdbResetQueryHandleForNewTable(pRuntimeEnv->pQueryHandle, &cond, &gp);
}
taosArrayDestroy(tx);
taosArrayDestroy(g1);
}
static int32_t setupQueryHandle(void* tsdb, SQInfo* pQInfo, bool isSTableQuery) {
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
// TODO set the tags scan handle
if (onlyQueryTags(pQuery)) {
return TSDB_CODE_SUCCESS;
} else if (isTsCompQuery(pQuery)) {
setTableQueryHandle(pRuntimeEnv, 0);
return terrno;
}
// if (isSTableQuery && (!QUERY_IS_INTERVAL_QUERY(pQuery)) && (!isFixedOutputQuery(pQuery))) {
// return TSDB_CODE_SUCCESS;
// }
STsdbQueryCond cond = createTsdbQueryCond(pQuery, &pQuery->window);
if (!isSTableQuery
@ -5175,11 +5448,14 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, SArray* prevResult, void *ts
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
pQuery->tsdb = tsdb;
pQuery->topBotQuery = isTopBottomQuery(pQuery);
pQuery->hasTagResults = hasTagValOutput(pQuery);
pQuery->timeWindowInterpo = timeWindowInterpoRequired(pQuery);
pRuntimeEnv->prevResult = prevResult;
pRuntimeEnv->qinfo = pQInfo;
setScanLimitationByResultBuffer(pQuery);
@ -5200,11 +5476,14 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, SArray* prevResult, void *ts
pRuntimeEnv->cur.vgroupIndex = -1;
if (onlyQueryTags(pQuery)) {
pRuntimeEnv->resultInfo.capacity = 4096;
pRuntimeEnv->proot = createTagScanOperatorInfo(pRuntimeEnv, pQuery->pExpr1, pQuery->numOfOutput);
} else if (isTsCompQuery(pQuery)) {
pRuntimeEnv->pTableScanner = createSeqTableBlockScanOperator(pRuntimeEnv->pQueryHandle, pRuntimeEnv);
} else if (needReverseScan(pQuery)) {
pRuntimeEnv->proot = createBiDirectionTableScanInfo(pRuntimeEnv->pQueryHandle, pRuntimeEnv, getNumOfScanTimes(pQuery), 1);
pRuntimeEnv->pTableScanner = createBiDirectionTableScanInfo(pRuntimeEnv->pQueryHandle, pRuntimeEnv, getNumOfScanTimes(pQuery), 1);
} else {
pRuntimeEnv->proot = createTableScanOperator(pRuntimeEnv->pQueryHandle, pRuntimeEnv, getNumOfScanTimes(pQuery));
pRuntimeEnv->pTableScanner = createTableScanOperator(pRuntimeEnv->pQueryHandle, pRuntimeEnv, getNumOfScanTimes(pQuery));
}
if (pTsBuf != NULL) {
@ -5233,7 +5512,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, SArray* prevResult, void *ts
// create runtime environment
int32_t numOfTables = pQuery->tableGroupInfo.numOfTables;
pQInfo->summary.tableInfoSize += (numOfTables * sizeof(STableQueryInfo));
code = setupQueryRuntimeEnv(pRuntimeEnv, (int32_t) pQuery->tableGroupInfo.numOfTables, pQuery->order.order, pQuery->vgId);
code = setupQueryRuntimeEnv(pRuntimeEnv, (int32_t) pQuery->tableGroupInfo.numOfTables);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
@ -6148,9 +6427,10 @@ static SSDataBlock* doTableScan(void* param) {
pTableScanInfo->reverseTimes = 0;
pTableScanInfo->order = cond.order;
// todo refactor, extract function
if (pResultRowInfo->size > 0) {
pResultRowInfo->curIndex = pResultRowInfo->size-1;
pResultRowInfo->prevSKey = pResultRowInfo->pResult[pResultRowInfo->size-1]->win.skey;
}
SSDataBlock* p = doTableScanImpl(pTableScanInfo);
if (p != NULL) {
@ -6161,6 +6441,30 @@ static SSDataBlock* doTableScan(void* param) {
return NULL;
}
static SSDataBlock* doSeqTableBlockScan(void* param) {
SOperatorInfo* pOperator = (SOperatorInfo*)param;
STableScanInfo *pTableScanInfo = pOperator->info;
SQueryRuntimeEnv *pRuntimeEnv = pTableScanInfo->pRuntimeEnv;
int32_t totalTables = pRuntimeEnv->tableqinfoGroupInfo.numOfTables;
while (1) {
SSDataBlock* p = doTableScanImpl(pTableScanInfo);
if (p != NULL) {
return p;
}
// try the next table
if (++pTableScanInfo->tableIndex >= totalTables) {
return NULL;
}
setTableQueryHandle(pRuntimeEnv, pTableScanInfo->tableIndex);
pTableScanInfo->pQueryHandle = pRuntimeEnv->pQueryHandle;
}
}
SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, int32_t repeatTime) {
assert(repeatTime > 0);
@ -6184,6 +6488,28 @@ SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv*
return pOperator;
}
SOperatorInfo* createSeqTableBlockScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv) {
STableScanInfo* pInfo = calloc(1, sizeof(STableScanInfo));
pInfo->pQueryHandle = pTsdbQueryHandle;
pInfo->times = 1;
pInfo->reverseTimes = 0;
pInfo->order = pRuntimeEnv->pQuery->order.order;
pInfo->current = 0;
pInfo->pRuntimeEnv = pRuntimeEnv;
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "TableBlockSeqScan";
pOperator->blockingOptr = false;
pOperator->status = OP_IN_EXECUTING;
pOperator->info = pInfo;
pOperator->numOfOutput = pRuntimeEnv->pQuery->numOfCols;
pOperator->exec = doSeqTableBlockScan;
return pOperator;
}
void setTableScanFilterOperatorInfo(STableScanInfo* pTableScanInfo, SOperatorInfo* pDownstream) {
assert(pTableScanInfo != NULL && pDownstream != NULL);
@ -6351,8 +6677,13 @@ static SSDataBlock* doSTableAggregate(void* param) {
pOperator->status = OP_RES_TO_RETURN;
closeAllResultRows(&pInfo->resultRowInfo);
if (isTsCompQuery(pQuery)) {
finalizeQueryResult_rv(pOperator, pInfo->pCtx, &pInfo->resultRowInfo, pInfo->rowCellInfoOffset);
}
updateWindowResNumOfRes_rv(pRuntimeEnv, pInfo->pCtx, pOperator->numOfOutput, &pInfo->resultRowInfo,
pInfo->rowCellInfoOffset);
initGroupResInfo(&pRuntimeEnv->groupResInfo, &pInfo->resultRowInfo, 0);
toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pInfo->pRes);
@ -6368,8 +6699,9 @@ static SSDataBlock* doArithmeticOperation(void* param) {
SArithOperatorInfo* pArithInfo = pOperator->info;
SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
SOptrBasicInfo *pInfo = &pArithInfo->binfo;
pArithInfo->binfo.pRes->info.rows = 0;
pInfo->pRes->info.rows = 0;
while(1) {
SSDataBlock* pBlock = pOperator->upstream->exec(pOperator->upstream);
@ -6378,26 +6710,21 @@ static SSDataBlock* doArithmeticOperation(void* param) {
break;
}
setTagVal_rv(pOperator, pRuntimeEnv->pQuery->current->pTable, pArithInfo->binfo.pCtx, pOperator->numOfOutput);
setTagVal_rv(pOperator, pRuntimeEnv->pQuery->current->pTable, pInfo->pCtx, pOperator->numOfOutput);
// the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pOperator, pArithInfo->binfo.pCtx, pBlock, pRuntimeEnv->pQuery->order.order);
setInputDataBlock(pOperator, pInfo->pCtx, pBlock, pRuntimeEnv->pQuery->order.order);
updateOutputBuf(pArithInfo, pBlock->info.rows);
arithmeticApplyFunctions(pRuntimeEnv, pArithInfo->binfo.pCtx, pOperator->numOfOutput);
arithmeticApplyFunctions(pRuntimeEnv, pInfo->pCtx, pOperator->numOfOutput);
pArithInfo->binfo.pRes->info.rows = getNumOfResult_rv(pRuntimeEnv, pArithInfo->binfo.pCtx, pOperator->numOfOutput);
if (pArithInfo->binfo.pRes->info.rows >= 4096) {
pInfo->pRes->info.rows = getNumOfResult_rv(pRuntimeEnv, pInfo->pCtx, pOperator->numOfOutput);
if (pInfo->pRes->info.rows >= 4096) {
break;
}
}
setNumOfRes(pArithInfo->binfo.pCtx, pOperator->numOfOutput);
if (pArithInfo->binfo.pRes->info.rows > 0) {
return pArithInfo->binfo.pRes;
} else {
return NULL;
}
setNumOfRes(pInfo->pCtx, pOperator->numOfOutput);
return (pInfo->pRes->info.rows > 0)? pInfo->pRes:NULL;
}
static SSDataBlock* doLimit(void* param) {
@ -6467,7 +6794,7 @@ static SSDataBlock* doOffset(void* param) {
}
}
static SSDataBlock* doHashIntervalAgg(void* param) {
static SSDataBlock* doIntervalAgg(void* param) {
SOperatorInfo* pOperator = (SOperatorInfo*) param;
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
@ -6697,7 +7024,10 @@ static void destroyOperatorInfo(SOperatorInfo* pOperator) {
static SOperatorInfo* createAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) {
SAggOperatorInfo* pInfo = calloc(1, sizeof(SAggOperatorInfo));
pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, 1);
SQuery* pQuery = pRuntimeEnv->pQuery;
int32_t numOfRows = (int32_t)(GET_ROW_PARAM_FOR_MULTIOUTPUT(pQuery, pQuery->topBotQuery, pQuery->stableQuery));
pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, numOfRows);
pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset);
initResultRowInfo(&pInfo->binfo.resultRowInfo, 8, TSDB_DATA_TYPE_INT);
@ -6754,9 +7084,10 @@ static void destroyArithOperatorInfo(void* param, int32_t numOfOutput) {
SOperatorInfo* createStableAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) {
SAggOperatorInfo* pInfo = calloc(1, sizeof(SAggOperatorInfo));
pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, 1);
size_t tableGroup = GET_NUM_OF_TABLEGROUP(pRuntimeEnv);
pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, (int32_t) tableGroup);
pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset);
initResultRowInfo(&pInfo->binfo.resultRowInfo, 8, TSDB_DATA_TYPE_INT);
initResultRowInfo(&pInfo->binfo.resultRowInfo, tableGroup, TSDB_DATA_TYPE_INT);
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "STableAggregate";
@ -6856,7 +7187,7 @@ SOperatorInfo* createIntervalAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOpe
pOperator->info = pInfo;
pOperator->pRuntimeEnv = pRuntimeEnv;
pOperator->exec = doHashIntervalAgg;
pOperator->exec = doIntervalAgg;
pOperator->cleanup = destroyBasicOperatorInfo;
return pOperator;
@ -7044,10 +7375,10 @@ static SSDataBlock* doTagScan(void* param) {
count += 1;
}
pRes->info.rows = count;
qDebug("QInfo:%p create tag values results completed, rows:%d", pRuntimeEnv->qinfo, count);
}
pRes->info.rows = count;
return (pRes->info.rows == 0)? NULL:pInfo->pRes;
}
@ -7163,18 +7494,9 @@ void buildTableBlockDistResult(SQInfo *pQInfo) {
void stableQueryImpl(SQInfo *pQInfo) {
SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv;
// pRuntimeEnv->resultInfo.rows = 0;
int64_t st = taosGetTimestampUs();
// if (QUERY_IS_INTERVAL_QUERY(pQuery) ||
// (isFixedOutputQuery(pQuery) && (!isPointInterpoQuery(pQuery)) && (!pQuery->groupbyColumn))) {
//multiTableQueryProcess(pQInfo);
pRuntimeEnv->outputBuf = pRuntimeEnv->proot->exec(pRuntimeEnv->proot);
// } else {
// assert(pQuery->checkResultBuf == 1 || isPointInterpoQuery(pQuery) || pQuery->groupbyColumn);
// sequentialTableProcess(pQInfo);
// }
// record the total elapsed time
pQInfo->summary.elapsedTime += (taosGetTimestampUs() - st);
@ -7942,7 +8264,8 @@ static void calResultBufSize(SQuery* pQuery, SRspResultInfo* pResultInfo) {
}
SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SSqlGroupbyExpr* pGroupbyExpr, SExprInfo* pExprs,
SExprInfo *pSecExprs, STableGroupInfo *pTableGroupInfo, SColumnInfo* pTagCols, bool stableQuery, char* sql) {
SExprInfo* pSecExprs, STableGroupInfo* pTableGroupInfo, SColumnInfo* pTagCols, bool stableQuery,
char* sql) {
int16_t numOfCols = pQueryMsg->numOfCols;
int16_t numOfOutput = pQueryMsg->numOfOutput;
@ -8323,27 +8646,16 @@ void freeQInfo(SQInfo *pQInfo) {
}
size_t getResultSize(SQInfo *pQInfo, int64_t *numOfRows) {
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery *pQuery = pRuntimeEnv->pQuery;
/*
* get the file size and set the numOfRows to be the file size, since for tsComp query,
* the returned row size is equalled to 1
* TODO handle the case that the file is too large to send back one time
*/
if (isTsCompQuery(pQuery) && (*numOfRows) > 0) {
struct stat fStat;
FILE *f = NULL;//*(FILE **)pQuery->sdata[0]->data;
if ((f != NULL) && (fstat(fileno(f), &fStat) == 0)) {
*numOfRows = fStat.st_size;
return fStat.st_size;
} else {
qError("QInfo:%p failed to get file info, file:%p, reason:%s", pQInfo, f, strerror(errno));
return 0;
}
} else {
return (size_t)(pQuery->resultRowSize * (*numOfRows));
}
}
int32_t doDumpQueryResult(SQInfo *pQInfo, char *data) {
// the remained number of retrieved rows, not the interpolated result
@ -8352,12 +8664,13 @@ int32_t doDumpQueryResult(SQInfo *pQInfo, char *data) {
// load data from file to msg buffer
if (isTsCompQuery(pQuery)) {
FILE *f = NULL;//*(FILE **)pQuery->sdata[0]->data; // TODO refactor
SColumnInfoData* pColInfoData = taosArrayGet(pRuntimeEnv->outputBuf->pDataBlock, 0);
FILE *f = *(FILE **)pColInfoData->pData; // TODO refactor
// make sure file exist
if (f) {
off_t s = lseek(fileno(f), 0, SEEK_END);
assert(s == pRuntimeEnv->outputBuf->info.rows);
qDebug("QInfo:%p ts comp data return, file:%p, size:%"PRId64, pQInfo, f, (uint64_t)s);
if (fseek(f, 0, SEEK_SET) >= 0) {
@ -8372,14 +8685,13 @@ int32_t doDumpQueryResult(SQInfo *pQInfo, char *data) {
assert(0);
}
// dump error info
if (s <= (sizeof(STSBufFileHeader) + sizeof(STSGroupBlockInfo) + 6 * sizeof(int32_t))) {
qDump(data, s);
assert(0);
}
fclose(f);
// *(FILE **)pQuery->sdata[0]->data = NULL;
}
// all data returned, set query over

View File

@ -312,10 +312,9 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv;
int64_t s = GET_NUM_OF_RESULTS(pRuntimeEnv);
size_t size = getResultSize(pQInfo, &s);
int32_t s = GET_NUM_OF_RESULTS(pRuntimeEnv);
size_t size = pQuery->resultRowSize * s;
size += sizeof(int32_t);
size += sizeof(STableIdInfo) * taosHashGetSize(pQInfo->arrTableIdInfo);

View File

@ -453,6 +453,39 @@ void tsdbResetQueryHandle(TsdbQueryHandleT queryHandle, STsdbQueryCond *pCond) {
resetCheckInfo(pQueryHandle);
}
void tsdbResetQueryHandleForNewTable(TsdbQueryHandleT queryHandle, STsdbQueryCond *pCond, STableGroupInfo* groupList) {
STsdbQueryHandle* pQueryHandle = queryHandle;
pQueryHandle->order = pCond->order;
pQueryHandle->window = pCond->twindow;
pQueryHandle->type = TSDB_QUERY_TYPE_ALL;
pQueryHandle->cur.fid = -1;
pQueryHandle->cur.win = TSWINDOW_INITIALIZER;
pQueryHandle->checkFiles = true;
pQueryHandle->activeIndex = 0; // current active table index
pQueryHandle->locateStart = false;
pQueryHandle->loadExternalRow = pCond->loadExternalRows;
if (ASCENDING_TRAVERSE(pCond->order)) {
assert(pQueryHandle->window.skey <= pQueryHandle->window.ekey);
} else {
assert(pQueryHandle->window.skey >= pQueryHandle->window.ekey);
}
// allocate buffer in order to load data blocks from file
memset(pQueryHandle->statis, 0, sizeof(SDataStatis));
tsdbInitDataBlockLoadInfo(&pQueryHandle->dataBlockLoadInfo);
tsdbInitCompBlockLoadInfo(&pQueryHandle->compBlockLoadInfo);
STsdbMeta* pMeta = tsdbGetMeta(pQueryHandle->pTsdb);
pQueryHandle->pTableCheckInfo = createCheckInfoFromTableGroup(pQueryHandle, groupList, pMeta);
if (pQueryHandle->pTableCheckInfo == NULL) {
tsdbCleanupQueryHandle(pQueryHandle);
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
}
}
TsdbQueryHandleT tsdbQueryLastRow(STsdbRepo *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, void* qinfo, SMemRef* pMemRef) {
pCond->twindow = updateLastrowForEachGroup(groupList);
@ -3105,23 +3138,26 @@ static void* doFreeColumnInfoData(SArray* pColumnInfoData) {
return NULL;
}
static void* destroyTableCheckInfo(SArray* pTableCheckInfo) {
size_t size = taosArrayGetSize(pTableCheckInfo);
for (int32_t i = 0; i < size; ++i) {
STableCheckInfo* p = taosArrayGet(pTableCheckInfo, i);
destroyTableMemIterator(p);
tfree(p->pCompInfo);
}
taosArrayDestroy(pTableCheckInfo);
return NULL;
}
void tsdbCleanupQueryHandle(TsdbQueryHandleT queryHandle) {
STsdbQueryHandle* pQueryHandle = (STsdbQueryHandle*)queryHandle;
if (pQueryHandle == NULL) {
return;
}
if (pQueryHandle->pTableCheckInfo != NULL) {
size_t size = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
for (int32_t i = 0; i < size; ++i) {
STableCheckInfo* pTableCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, i);
destroyTableMemIterator(pTableCheckInfo);
tfree(pTableCheckInfo->pCompInfo);
}
taosArrayDestroy(pQueryHandle->pTableCheckInfo);
}
pQueryHandle->pTableCheckInfo = destroyTableCheckInfo(pQueryHandle->pTableCheckInfo);
pQueryHandle->pColumns = doFreeColumnInfoData(pQueryHandle->pColumns);
taosArrayDestroy(pQueryHandle->defaultLoadColumn);