[td-2895] refactor.

This commit is contained in:
Haojun Liao 2021-02-24 18:16:48 +08:00
parent 9d4b925d39
commit e8ff71f74b
7 changed files with 268 additions and 245 deletions

View File

@ -825,13 +825,31 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pSqlFuncExpr->colInfo.colIndex = htons(pExpr->colInfo.colIndex);
pSqlFuncExpr->colInfo.flag = htons(pExpr->colInfo.flag);
if (TSDB_COL_IS_UD_COL(pExpr->colInfo.flag)) {
pSqlFuncExpr->colType = htons(pExpr->resType);
pSqlFuncExpr->colBytes = htons(pExpr->resBytes);
} else if (pExpr->colInfo.colId == TSDB_TBNAME_COLUMN_INDEX) {
SSchema *s = tGetTbnameColumnSchema();
pSqlFuncExpr->colType = htons(s->type);
pSqlFuncExpr->colBytes = htons(s->bytes);
} else if (pExpr->colInfo.colId == TSDB_BLOCK_DIST_COLUMN_INDEX) {
SSchema s = tGetBlockDistColumnSchema();
pSqlFuncExpr->colType = htons(s.type);
pSqlFuncExpr->colBytes = htons(s.bytes);
} else {
SSchema* s = tscGetColumnSchemaById(pTableMeta, pExpr->colInfo.colId);
pSqlFuncExpr->colType = htons(s->type);
pSqlFuncExpr->colBytes = htons(s->bytes);
}
pSqlFuncExpr->functionId = htons(pExpr->functionId);
pSqlFuncExpr->numOfParams = htons(pExpr->numOfParams);
pSqlFuncExpr->resColId = htons(pExpr->resColId);
pMsg += sizeof(SSqlFuncMsg);
for (int32_t j = 0; j < pExpr->numOfParams; ++j) {
// todo add log
for (int32_t j = 0; j < pExpr->numOfParams; ++j) { // todo add log
pSqlFuncExpr->arg[j].argType = htons((uint16_t)pExpr->param[j].nType);
pSqlFuncExpr->arg[j].argBytes = htons(pExpr->param[j].nLen);
@ -869,14 +887,19 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pSqlFuncExpr1->colInfo.colId = htons(pExpr->resColId);
pSqlFuncExpr1->colInfo.flag = htons(TSDB_COL_NORMAL);
bool assign = false;
for (int32_t f = 0; f < tscSqlExprNumOfExprs(pQueryInfo); ++f) {
SSqlExpr *pe = tscSqlExprGet(pQueryInfo, f);
if (pe == pExpr) {
pSqlFuncExpr1->colInfo.colIndex = htons(f);
pSqlFuncExpr1->colType = htons(pe->resType);
pSqlFuncExpr1->colBytes = htons(pe->resBytes);
assign = true;
break;
}
}
assert(assign);
pMsg += sizeof(SSqlFuncMsg);
pSqlFuncExpr1 = (SSqlFuncMsg *)pMsg;
} else {

View File

@ -387,7 +387,7 @@ typedef struct SColIndex {
int16_t colId; // column id
int16_t colIndex; // column index in colList if it is a normal column or index in tagColList if a tag
uint16_t flag; // denote if it is a tag or a normal column
char name[TSDB_COL_NAME_LEN];
char name[TSDB_COL_NAME_LEN]; // TODO remove it
} SColIndex;
/* sql function msg, to describe the message to vnode about sql function
@ -395,7 +395,10 @@ typedef struct SColIndex {
typedef struct SSqlFuncMsg {
int16_t functionId;
int16_t numOfParams;
int16_t resColId; // result column id, id of the current output column
int16_t colType;
int16_t colBytes;
SColIndex colInfo;
struct ArgElem {

View File

@ -240,6 +240,7 @@ typedef struct SQuery {
} SQuery;
typedef SSDataBlock* (*__operator_fn_t)(void* param);
typedef void (*__optr_cleanup_fn_t)(void* param, int32_t num);
struct SOperatorInfo;
@ -256,8 +257,6 @@ typedef struct SQueryRuntimeEnv {
SHashObj* pResultRowHashTable; // quick locate the window object for each result
char* keyBuf; // window key buffer
SResultRowPool* pool; // window result object pool
// int32_t* rowCellInfoOffset;// offset value for each row result cell info
char** prevRow;
SArray* prevResult; // intermediate result, SArray<SInterResult>
@ -286,10 +285,10 @@ typedef struct SOperatorInfo {
int32_t numOfOutput;
SQueryRuntimeEnv *pRuntimeEnv;
struct SOperatorInfo *upstream;
__operator_fn_t exec;
__operator_fn_t cleanup;
struct SOperatorInfo *upstream;
__optr_cleanup_fn_t cleanup;
} SOperatorInfo;
enum {
@ -357,6 +356,7 @@ typedef struct STableScanInfo {
SQLFunctionCtx *pCtx; // next operator query context
SResultRowInfo *pResultRowInfo;
int32_t numOfOutput;
int32_t *rowCellInfoOffset;
int64_t elapsedTime;
@ -367,18 +367,18 @@ typedef struct STagScanInfo {
SSDataBlock* pRes;
} STagScanInfo;
typedef struct SAggOperatorInfo {
typedef struct SOptrBasicInfo {
SResultRowInfo resultRowInfo;
int32_t *rowCellInfoOffset; // offset value for each row result cell info
SQLFunctionCtx *pCtx;
int32_t *rowCellInfoOffset;
SSDataBlock *pRes;
} SAggOperatorInfo;
} SOptrBasicInfo;
typedef struct SOptrBasicInfo SAggOperatorInfo;
typedef struct SOptrBasicInfo SHashIntervalOperatorInfo;
typedef struct SArithOperatorInfo {
SQLFunctionCtx *pCtx;
int32_t *rowCellInfoOffset;
SResultRowInfo resultRowInfo;
SSDataBlock *pOutput;
SOptrBasicInfo binfo;
int32_t bufCapacity;
} SArithOperatorInfo;
@ -392,22 +392,12 @@ typedef struct SOffsetOperatorInfo {
int64_t currentOffset;
} SOffsetOperatorInfo;
typedef struct SHashIntervalOperatorInfo {
SQLFunctionCtx *pCtx;
int32_t *rowCellInfoOffset;
SResultRowInfo resultRowInfo;
SSDataBlock *pRes;
} SHashIntervalOperatorInfo;
typedef struct SFillOperatorInfo {
SSDataBlock *pRes;
} SFillOperatorInfo;
typedef struct SHashGroupbyOperatorInfo {
SQLFunctionCtx *pCtx;
int32_t *rowCellInfoOffset;
SResultRowInfo resultRowInfo;
SSDataBlock *pRes;
SOptrBasicInfo binfo;
int32_t colIndex;
} SHashGroupbyOperatorInfo;

View File

@ -3566,7 +3566,6 @@ static void arithmetic_function(SQLFunctionCtx *pCtx) {
SArithmeticSupport *sas = (SArithmeticSupport *)pCtx->param[1].pz;
arithmeticTreeTraverse(sas->pArithExpr->pExpr, pCtx->size, pCtx->pOutput, sas, pCtx->order, getArithColumnData);
// pCtx->pOutput += pCtx->outputBytes * pCtx->size;
}
static void arithmetic_function_f(SQLFunctionCtx *pCtx, int32_t index) {

View File

@ -183,17 +183,30 @@ static int32_t getNumOfScanTimes(SQuery* pQuery);
static bool isFixedOutputQuery(SQuery* pQuery);
static SOperatorInfo* createAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
static void destroyBasicOperatorInfo(void* param, int32_t numOfOutput);
static void destroySFillOperatorInfo(void* param, int32_t numOfOutput);
static void destroyGroupbyOperatorInfo(void* param, int32_t numOfOutput);
static void destroyArithOperatorInfo(void* param, int32_t numOfOutput);
static SOperatorInfo* createArithOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
static SOperatorInfo* createLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream);
static SOperatorInfo* createOffsetOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream);
static SOperatorInfo* createIntervalAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
static SOperatorInfo* createFillOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr,
int32_t numOfOutput);
static SOperatorInfo* createHashGroupbyOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr,
int32_t numOfOutput);
static SOperatorInfo* createStableAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
static SOperatorInfo* createStableIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
static SOperatorInfo* createTagScanOperator(SQueryRuntimeEnv* pRuntimeEnv);
static SOperatorInfo* createTagScanOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv);
static int32_t doCopyToSData_rv(SQueryRuntimeEnv* pRuntimeEnv, SGroupResInfo* pGroupResInfo, int32_t orderType, SSDataBlock* pBlock);
@ -859,8 +872,8 @@ static int32_t getNumOfRowsInTimeWindow(SQuery *pQuery, SDataBlockInfo *pDataBlo
static char *getDataBlock(SQuery* pQuery, SArithmeticSupport *sas, int32_t col, int32_t size, SArray *pDataBlock);
static UNUSED_FUNC void doBlockwiseApplyFunctions_rv(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx* pCtx, STimeWindow *pWin, int32_t offset,
int32_t forwardStep, TSKEY *tsCol, int32_t numOfTotal, int32_t numOfOutput) {
static void doBlockwiseApplyFunctions_rv(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx* pCtx, STimeWindow *pWin,
int32_t offset, int32_t forwardStep, TSKEY *tsCol, int32_t numOfTotal, int32_t numOfOutput) {
SQuery *pQuery = pRuntimeEnv->pQuery;
bool hasPrev = pCtx[0].preAggVals.isSet;
@ -1093,53 +1106,6 @@ static TSKEY getStartTsKey(SQuery* pQuery, SDataBlockInfo* pDataBlockInfo, const
return ts;
}
static void setInputSDataBlock(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SSDataBlock* pSDataBlock, int32_t order) {
if (pCtx[0].pInput == NULL && pSDataBlock->pDataBlock != NULL) {
for (int32_t i = 0; i < pOperator->numOfOutput; ++i) {
pCtx[i].size = pSDataBlock->info.rows;
pCtx[i].order = order;
SColIndex *pCol = &pOperator->pExpr[i].base.colInfo;
if (TSDB_COL_IS_NORMAL_COL(pCol->flag)) {
SColIndex* pColIndex = &pOperator->pExpr[i].base.colInfo;
SColumnInfoData *p = taosArrayGet(pSDataBlock->pDataBlock, pColIndex->colIndex);
assert(p->info.colId == pColIndex->colId);
SQLFunctionCtx* pCtx1 = &pCtx[i];
pCtx1->pInput = p->pData;
pCtx1->inputType = p->info.type;
pCtx1->inputBytes = p->info.bytes;
uint32_t status = aAggs[pCtx1->functionId].status;
if ((status & (TSDB_FUNCSTATE_SELECTIVITY | TSDB_FUNCSTATE_NEED_TS)) != 0) {
SColumnInfoData *tsInfo = taosArrayGet(pSDataBlock->pDataBlock, 0);
pCtx1->ptsList = tsInfo->pData;
}
}
}
} else {
for (int32_t i = 0; i < pOperator->numOfOutput; ++i) {
pCtx[i].size = pSDataBlock->info.rows;
pCtx[i].order = order;
}
}
}
static void aggApplyFunctions(SOperatorInfo* pOperator, TSKEY startTs, SQLFunctionCtx* pCtx, SSDataBlock* pSDataBlock) {
SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
for (int32_t k = 0; k < pOperator->numOfOutput; ++k) {
setBlockStatisInfo(&pCtx[k], pSDataBlock, &pOperator->pExpr[k].base.colInfo);
int32_t functionId = pCtx[k].functionId;
if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) {
pCtx[k].startTs = startTs;// this can be set during create the struct
aAggs[functionId].xFunction(&pCtx[k]);
}
}
}
static void setArithParams(SArithmeticSupport* sas, SExprInfo *pExprInfo, SSDataBlock* pSDataBlock) {
sas->numOfCols = (int32_t) pSDataBlock->info.numOfCols;
sas->pArithExpr = pExprInfo;
@ -1159,7 +1125,77 @@ static void setArithParams(SArithmeticSupport* sas, SExprInfo *pExprInfo, SSData
}
}
static void arithmeticApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *pCtx, SExprInfo *pExprInfo, int32_t numOfOutput) {
static void doSetInputDataBlock(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order);
static void doSetInputDataBlockInfo(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order) {
for (int32_t i = 0; i < pOperator->numOfOutput; ++i) {
pCtx[i].size = pBlock->info.rows;
pCtx[i].order = order;
setBlockStatisInfo(&pCtx[i], pBlock, &pOperator->pExpr[i].base.colInfo);
}
}
static void setInputDataBlock(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order) {
if (pCtx[0].functionId == TSDB_FUNC_ARITHM) {
SArithmeticSupport* pSupport = (SArithmeticSupport*) pCtx[0].param[1].pz;
if (pSupport->colList == NULL) {
doSetInputDataBlock(pOperator, pCtx, pBlock, order);
} else {
doSetInputDataBlockInfo(pOperator, pCtx, pBlock, order);
}
} else {
if (pCtx[0].pInput == NULL && pBlock->pDataBlock != NULL) {
doSetInputDataBlock(pOperator, pCtx, pBlock, order);
} else {
doSetInputDataBlockInfo(pOperator, pCtx, pBlock, order);
}
}
}
static void doSetInputDataBlock(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order) {
for (int32_t i = 0; i < pOperator->numOfOutput; ++i) {
pCtx[i].size = pBlock->info.rows;
pCtx[i].order = order;
setBlockStatisInfo(&pCtx[i], pBlock, &pOperator->pExpr[i].base.colInfo);
if (pCtx[i].functionId == TSDB_FUNC_ARITHM) {
setArithParams((SArithmeticSupport*)pCtx[i].param[1].pz, &pOperator->pExpr[i], pBlock);
} else {
SColIndex* pCol = &pOperator->pExpr[i].base.colInfo;
if (TSDB_COL_IS_NORMAL_COL(pCol->flag)) {
SColIndex* pColIndex = &pOperator->pExpr[i].base.colInfo;
SColumnInfoData* p = taosArrayGet(pBlock->pDataBlock, pColIndex->colIndex);
pCtx[i].pInput = p->pData;
assert(p->info.colId == pColIndex->colId && pCtx[i].inputType == p->info.type && pCtx[i].inputBytes == p->info.bytes);
uint32_t status = aAggs[pCtx[i].functionId].status;
if ((status & (TSDB_FUNCSTATE_SELECTIVITY | TSDB_FUNCSTATE_NEED_TS)) != 0) {
SColumnInfoData* tsInfo = taosArrayGet(pBlock->pDataBlock, 0);
pCtx[i].ptsList = tsInfo->pData;
}
}
}
}
}
static void doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SQLFunctionCtx* pCtx, SSDataBlock* pSDataBlock) {
SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
for (int32_t k = 0; k < pOperator->numOfOutput; ++k) {
setBlockStatisInfo(&pCtx[k], pSDataBlock, &pOperator->pExpr[k].base.colInfo);
int32_t functionId = pCtx[k].functionId;
if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) {
pCtx[k].startTs = startTs;// this can be set during create the struct
aAggs[functionId].xFunction(&pCtx[k]);
}
}
}
static void arithmeticApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *pCtx, int32_t numOfOutput) {
SQuery *pQuery = pRuntimeEnv->pQuery;
for (int32_t k = 0; k < numOfOutput; ++k) {
@ -1290,16 +1326,16 @@ static void hashGroupbyAgg(SQueryRuntimeEnv *pRuntimeEnv, SOperatorInfo* pOperat
}
// TODO compare with the previous value to speedup the query processing
int32_t ret = setGroupResultOutputBuf_rv(pRuntimeEnv, &pInfo->resultRowInfo, pInfo->pCtx, pOperator->numOfOutput, val, type, bytes, item->groupIndex, pInfo->rowCellInfoOffset);
int32_t ret = setGroupResultOutputBuf_rv(pRuntimeEnv, &pInfo->binfo.resultRowInfo, pInfo->binfo.pCtx, pOperator->numOfOutput, val, type, bytes, item->groupIndex, pInfo->binfo.rowCellInfoOffset);
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_APP_ERROR);
}
for (int32_t k = 0; k < pOperator->numOfOutput; ++k) {
pInfo->pCtx[k].size = 1; // TODO refactor: extract from here
int32_t functionId = pInfo->pCtx[k].functionId;
if (functionNeedToExecute(pRuntimeEnv, &pInfo->pCtx[k], functionId)) {
aAggs[functionId].xFunctionF(&pInfo->pCtx[k], offset);
pInfo->binfo.pCtx[k].size = 1; // TODO refactor: extract from here
int32_t functionId = pInfo->binfo.pCtx[k].functionId;
if (functionNeedToExecute(pRuntimeEnv, &pInfo->binfo.pCtx[k], functionId)) {
aAggs[functionId].xFunctionF(&pInfo->binfo.pCtx[k], offset);
}
}
}
@ -1998,6 +2034,10 @@ static SQLFunctionCtx* createSQLFunctionCtx(SQueryRuntimeEnv* pRuntimeEnv, SExpr
}
*rowCellInfoOffset = calloc(numOfOutput, sizeof(int32_t));
if (*rowCellInfoOffset == 0) {
tfree(pFuncCtx);
return NULL;
}
for (int32_t i = 0; i < numOfOutput; ++i) {
SSqlFuncMsg *pSqlFuncMsg = &pExpr[i].base;
@ -2012,26 +2052,8 @@ static SQLFunctionCtx* createSQLFunctionCtx(SQueryRuntimeEnv* pRuntimeEnv, SExpr
pCtx->requireNull = false;
}
// set the input along with the setting of the input buffer
int32_t index = pSqlFuncMsg->colInfo.colIndex;
if (TSDB_COL_IS_TAG(pIndex->flag)) {
if (pIndex->colId == TSDB_TBNAME_COLUMN_INDEX) { // todo refactor
SSchema *s = tGetTbnameColumnSchema();
pCtx->inputBytes = s->bytes;
pCtx->inputType = s->type;
} else if (pIndex->colId == TSDB_BLOCK_DIST_COLUMN_INDEX) {
SSchema s = tGetBlockDistColumnSchema();
pCtx->inputBytes = s.bytes;
pCtx->inputType = s.type;
} else {
pCtx->inputBytes = pQuery->tagColList[index].bytes;
pCtx->inputType = pQuery->tagColList[index].type;
}
} else if (TSDB_COL_IS_UD_COL(pIndex->flag)) {
pCtx->inputBytes = pSqlFuncMsg->arg[0].argBytes;
pCtx->inputType = pSqlFuncMsg->arg[0].argType;
}
pCtx->inputBytes = pSqlFuncMsg->colBytes;
pCtx->inputType = pSqlFuncMsg->colType;
pCtx->ptsOutputBuf = NULL;
@ -2093,25 +2115,15 @@ static SQLFunctionCtx* createSQLFunctionCtx(SQueryRuntimeEnv* pRuntimeEnv, SExpr
pCtx->param[2].i64 = pQuery->window.ekey;
pCtx->param[2].nType = TSDB_DATA_TYPE_BIGINT;
} else if (functionId == TSDB_FUNC_ARITHM) {
pRuntimeEnv->sasArray[i].data = calloc(pQuery->numOfCols, POINTER_BYTES);
if (pRuntimeEnv->sasArray[i].data == NULL) {
goto _clean;
}
pCtx->param[1].pz = (char*) &pRuntimeEnv->sasArray[i];
}
if (i > 0) {
// (*offset)[i] = (*offset)[i - 1] + pFuncCtx[i - 1].outputBytes;
(*rowCellInfoOffset)[i] = (*rowCellInfoOffset)[i - 1] + sizeof(SResultRowCellInfo) + pExpr[i - 1].interBytes;
}
}
return pFuncCtx;
_clean:
tfree(pFuncCtx);
return NULL;
}
static void* destroySQLFunctionCtx(SQLFunctionCtx* pCtx, int32_t numOfOutput) {
@ -2163,12 +2175,6 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
*(int64_t*) pRuntimeEnv->prevRow[0] = INT64_MIN;
// if it is group by normal column, do not set output buffer, the output buffer is pResult
// fixed output query/multi-output query for normal table
// if (!pQuery->groupbyColumn && !pQuery->stableQuery && !QUERY_IS_INTERVAL_QUERY(pRuntimeEnv->pQuery)) {
// resetDefaultResInfoOutputBuf(pRuntimeEnv);
// }
// if (setCtxTagColumnInfo(pRuntimeEnv, pRuntimeEnv->pCtx) != TSDB_CODE_SUCCESS) {
// goto _clean;
// }
@ -2265,7 +2271,6 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) {
SQInfo* pQInfo = (SQInfo*) GET_QINFO_ADDR(pRuntimeEnv);
qDebug("QInfo:%p teardown runtime env", pQInfo);
// cleanupResultRowInfo(&pRuntimeEnv->resultRowInfo);
if (isTsCompQuery(pQuery)) {
FILE *f = *(FILE **)pQuery->sdata[0]->data;
@ -2276,25 +2281,10 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) {
}
}
// if (pRuntimeEnv->pCtx != NULL) {
// for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
// SQLFunctionCtx *pCtx = &pRuntimeEnv->pCtx[i];
//
// for (int32_t j = 0; j < pCtx->numOfParams; ++j) {
// tVariantDestroy(&pCtx->param[j]);
// }
//
// tVariantDestroy(&pCtx->tag);
// tfree(pCtx->tagInfo.pTagCtxList);
// }
//
// tfree(pRuntimeEnv->pCtx);
// }
if (pRuntimeEnv->sasArray != NULL) {
for(int32_t i = 0; i < pQuery->numOfOutput; ++i) {
tfree(pRuntimeEnv->sasArray[i].data);
tfree(pRuntimeEnv->sasArray[i].colList);
}
tfree(pRuntimeEnv->sasArray);
@ -2308,7 +2298,6 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) {
pRuntimeEnv->pTsBuf = tsBufDestroy(pRuntimeEnv->pTsBuf);
tfree(pRuntimeEnv->keyBuf);
// tfree(pRuntimeEnv->rowCellInfoOffset);
tfree(pRuntimeEnv->prevRow);
tfree(pRuntimeEnv->tagVal);
@ -2319,7 +2308,6 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) {
taosArrayDestroyEx(pRuntimeEnv->prevResult, freeInterResult);
pRuntimeEnv->prevResult = NULL;
pRuntimeEnv->outputBuf = destroyOutputBuf(pRuntimeEnv->outputBuf);
destroyOperatorInfo(pRuntimeEnv->proot);
}
@ -3575,7 +3563,7 @@ void setDefaultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx* pCtx, SR
}
void updateOutputBuf(SArithOperatorInfo* pInfo, int32_t numOfInputRows) {
SSDataBlock* pDataBlock = pInfo->pOutput;
SSDataBlock* pDataBlock = pInfo->binfo.pRes;
if (pInfo->bufCapacity < pDataBlock->info.rows + numOfInputRows) {
int32_t newSize = pDataBlock->info.rows + numOfInputRows;
@ -3593,7 +3581,7 @@ void updateOutputBuf(SArithOperatorInfo* pInfo, int32_t numOfInputRows) {
for (int32_t i = 0; i < pDataBlock->info.numOfCols; ++i) {
SColumnInfoData *pColInfo = taosArrayGet(pDataBlock->pDataBlock, i);
pInfo->pCtx[i].pOutput = pColInfo->pData + pColInfo->info.bytes * pDataBlock->info.rows;
pInfo->binfo.pCtx[i].pOutput = pColInfo->pData + pColInfo->info.bytes * pDataBlock->info.rows;
}
}
@ -4581,9 +4569,11 @@ static void toSSDataBlock(SGroupResInfo *pGroupResInfo, SQueryRuntimeEnv* pRunti
SColumnInfoData* pInfoData = taosArrayGet(pBlock->pDataBlock, 0);
if (pInfoData->info.type == TSDB_DATA_TYPE_TIMESTAMP) {
STimeWindow* w = &pBlock->info.window;
w->skey = *(int64_t*)pInfoData->pData;
w->ekey = *(int64_t*)(pInfoData->pData + TSDB_KEYSIZE * (pBlock->info.rows - 1));
}
}
static void updateWindowResNumOfRes_rv(SQueryRuntimeEnv *pRuntimeEnv,
@ -5223,7 +5213,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, SArray* prevResult, void *ts
pQuery->groupbyColumn = isGroupbyColumn(pQuery->pGroupbyExpr);
if (onlyQueryTags(pQuery)) {
pRuntimeEnv->proot = createTagScanOperator(pRuntimeEnv);
pRuntimeEnv->proot = createTagScanOperatorInfo(pRuntimeEnv);
} else if (needReverseScan(pQuery)) {
pRuntimeEnv->pi = createBiDirectionTableScanInfo(pRuntimeEnv->pQueryHandle, pRuntimeEnv, getNumOfScanTimes(pQuery), 1);
} else {
@ -6204,25 +6194,25 @@ void setTableScanFilterOperatorInfo(STableScanInfo* pTableScanInfo, SOperatorInf
assert(pTableScanInfo != NULL && pDownstream != NULL);
char* name = pDownstream->name;
if ((strcasecmp(name, "AggregationOp") == 0) || (strcasecmp(name, "STableAggregate") == 0)) {
if ((strcasecmp(name, "TableAggregate") == 0) || (strcasecmp(name, "STableAggregate") == 0)) {
SAggOperatorInfo* pAggInfo = pDownstream->info;
pTableScanInfo->pCtx = pAggInfo->pCtx;
pTableScanInfo->pResultRowInfo = &pAggInfo->resultRowInfo;
pTableScanInfo->rowCellInfoOffset = pAggInfo->rowCellInfoOffset;
} else if (strcasecmp(name, "HashIntervalAggOp") == 0) {
} else if (strcasecmp(name, "HashIntervalAgg") == 0) {
SHashIntervalOperatorInfo *pIntervalInfo = pDownstream->info;
pTableScanInfo->pCtx = pIntervalInfo->pCtx;
pTableScanInfo->pResultRowInfo = &pIntervalInfo->resultRowInfo;
pTableScanInfo->rowCellInfoOffset = pIntervalInfo->rowCellInfoOffset;
} else if (strcasecmp(name, "HashGroupbyAggOp") == 0) {
} else if (strcasecmp(name, "HashGroupbyAgg") == 0) {
SHashGroupbyOperatorInfo *pGroupbyInfo = pDownstream->info;
pTableScanInfo->pCtx = pGroupbyInfo->pCtx;
pTableScanInfo->pResultRowInfo = &pGroupbyInfo->resultRowInfo;
pTableScanInfo->rowCellInfoOffset = pGroupbyInfo->rowCellInfoOffset;
pTableScanInfo->pCtx = pGroupbyInfo->binfo.pCtx;
pTableScanInfo->pResultRowInfo = &pGroupbyInfo->binfo.resultRowInfo;
pTableScanInfo->rowCellInfoOffset = pGroupbyInfo->binfo.rowCellInfoOffset;
} else if (strcasecmp(name, "STableIntervalAggOp") == 0) {
SHashIntervalOperatorInfo *pInfo = pDownstream->info;
@ -6234,10 +6224,9 @@ void setTableScanFilterOperatorInfo(STableScanInfo* pTableScanInfo, SOperatorInf
} else if (strcasecmp(name, "ArithmeticOp") == 0) {
SArithOperatorInfo *pInfo = pDownstream->info;
pTableScanInfo->pCtx = pInfo->pCtx;
pTableScanInfo->pResultRowInfo = &pInfo->resultRowInfo;
pTableScanInfo->rowCellInfoOffset = pInfo->rowCellInfoOffset;
pTableScanInfo->pCtx = pInfo->binfo.pCtx;
pTableScanInfo->pResultRowInfo = &pInfo->binfo.resultRowInfo;
pTableScanInfo->rowCellInfoOffset = pInfo->binfo.rowCellInfoOffset;
} else {
assert(0);
}
@ -6304,17 +6293,15 @@ static SSDataBlock* doAggregate(void* param) {
}
// the pDataBlock are always the same one, no need to call this again
setInputSDataBlock(pOperator, pAggInfo->pCtx, pBlock, order);
aggApplyFunctions(pOperator, pQuery->window.skey, pAggInfo->pCtx, pBlock);
setInputDataBlock(pOperator, pAggInfo->pCtx, pBlock, order);
doAggregateImpl(pOperator, pQuery->window.skey, pAggInfo->pCtx, pBlock);
}
pOperator->completed = true;
setQueryStatus(pRuntimeEnv->pQuery, QUERY_COMPLETED);
finalizeQueryResult_rv(pOperator, pAggInfo->pCtx, &pAggInfo->resultRowInfo, pAggInfo->rowCellInfoOffset);
pAggInfo->pRes->info.rows = getNumOfResult_rv(pRuntimeEnv, pAggInfo->pCtx, pOperator->numOfOutput);
destroySQLFunctionCtx(pAggInfo->pCtx, pAggInfo->pRes->info.numOfCols);
return pAggInfo->pRes;
}
@ -6359,11 +6346,11 @@ static SSDataBlock* doSTableAggregate(void* param) {
}
// the pDataBlock are always the same one, no need to call this again
setInputSDataBlock(pOperator, pAggInfo->pCtx, pBlock, order);
setInputDataBlock(pOperator, pAggInfo->pCtx, pBlock, order);
TSKEY k = (pQuery->order.order == TSDB_ORDER_ASC)? pBlock->info.window.ekey + 1:pBlock->info.window.skey-1;
setExecutionContext_rv(pRuntimeEnv, pAggInfo, pOperator->numOfOutput, pQuery->current->groupIndex, k);
aggApplyFunctions(pOperator, pQuery->window.skey, pAggInfo->pCtx, pBlock);
doAggregateImpl(pOperator, pQuery->window.skey, pAggInfo->pCtx, pBlock);
}
closeAllResultRows(&pAggInfo->resultRowInfo);
@ -6375,7 +6362,6 @@ static SSDataBlock* doSTableAggregate(void* param) {
pOperator->completed = true;
}
destroySQLFunctionCtx(pAggInfo->pCtx, pAggInfo->pRes->info.numOfCols);
return pAggInfo->pRes;
}
@ -6385,10 +6371,8 @@ static SSDataBlock* doArithmeticOperation(void* param) {
SArithOperatorInfo* pArithInfo = pOperator->info;
SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
setDefaultOutputBuf(pRuntimeEnv, pArithInfo->pCtx, &pArithInfo->resultRowInfo, pArithInfo->pOutput, pArithInfo->rowCellInfoOffset);
pRuntimeEnv->pQuery->pos = 0;
pArithInfo->pOutput->info.rows = 0;
pArithInfo->binfo.pRes->info.rows = 0;
while(1) {
SSDataBlock* pBlock = pOperator->upstream->exec(pOperator->upstream);
@ -6397,39 +6381,20 @@ static SSDataBlock* doArithmeticOperation(void* param) {
break;
}
setTagVal_rv(pOperator, pRuntimeEnv->pQuery->current->pTable, pArithInfo->pCtx, pOperator->numOfOutput);
setTagVal_rv(pOperator, pRuntimeEnv->pQuery->current->pTable, pArithInfo->binfo.pCtx, pOperator->numOfOutput);
// the pDataBlock are always the same one, no need to call this again
for (int32_t i = 0; i < pOperator->numOfOutput; ++i) {
pArithInfo->pCtx[i].size = pBlock->info.rows;
if (pArithInfo->pCtx[i].functionId == TSDB_FUNC_ARITHM) {
setArithParams((SArithmeticSupport*) pArithInfo->pCtx[i].param[1].pz, &pOperator->pExpr[i], pBlock);
} else {
SColIndex *pCol = &pOperator->pExpr[i].base.colInfo;
if (TSDB_COL_IS_NORMAL_COL(pCol->flag)) {
for (int32_t j = 0; j < pBlock->info.numOfCols; ++j) {
SColumnInfoData *p = taosArrayGet(pBlock->pDataBlock, j);
if (p->info.colId == pCol->colId) {
pArithInfo->pCtx[i].pInput = p->pData;
pArithInfo->pCtx[i].inputType = p->info.type;
pArithInfo->pCtx[i].inputBytes = p->info.bytes;
break;
}
}
}
}
}
setInputDataBlock(pOperator, pArithInfo->binfo.pCtx, pBlock, pRuntimeEnv->pQuery->order.order);
updateOutputBuf(pArithInfo, pBlock->info.rows);
arithmeticApplyFunctions(pRuntimeEnv, pArithInfo->pCtx, pOperator->pExpr, pOperator->numOfOutput);
pArithInfo->pOutput->info.rows += pBlock->info.rows;
arithmeticApplyFunctions(pRuntimeEnv, pArithInfo->binfo.pCtx, pOperator->numOfOutput);
if (pArithInfo->pOutput->info.rows >= 4096) {
pArithInfo->binfo.pRes->info.rows += pBlock->info.rows;
if (pArithInfo->binfo.pRes->info.rows >= 4096) {
break;
}
}
return pArithInfo->pOutput;
return pArithInfo->binfo.pRes;
}
static SSDataBlock* doLimit(void* param) {
@ -6523,7 +6488,7 @@ static SSDataBlock* doHashIntervalAgg(void* param) {
}
// the pDataBlock are always the same one, no need to call this again
setInputSDataBlock(pOperator, pIntervalInfo->pCtx, pBlock, pQuery->order.order);
setInputDataBlock(pOperator, pIntervalInfo->pCtx, pBlock, pQuery->order.order);
hashIntervalAgg(pOperator, &pIntervalInfo->resultRowInfo, pIntervalInfo, pBlock, 0);
}
@ -6581,7 +6546,7 @@ static SSDataBlock* doSTableIntervalAgg(void* param) {
STableQueryInfo* pTableQueryInfo = pRuntimeEnv->pQuery->current;
setTagVal_rv(pOperator, pTableQueryInfo->pTable, pIntervalInfo->pCtx, pOperator->numOfOutput);
setInputSDataBlock(pOperator, pIntervalInfo->pCtx, pBlock, pQuery->order.order);
setInputDataBlock(pOperator, pIntervalInfo->pCtx, pBlock, pQuery->order.order);
setIntervalQueryRange(pRuntimeEnv, pBlock->info.window.skey);
hashIntervalAgg(pOperator, &pTableQueryInfo->resInfo, pIntervalInfo, pBlock, pTableQueryInfo->groupIndex);
@ -6609,11 +6574,11 @@ static SSDataBlock* doHashGroupbyAgg(void* param) {
SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
if (hasRemainData(&pRuntimeEnv->groupResInfo)) {
toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pInfo->pRes);
if (pInfo->pRes->info.rows == 0 || !hasRemainData(&pRuntimeEnv->groupResInfo)) {
toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pInfo->binfo.pRes);
if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainData(&pRuntimeEnv->groupResInfo)) {
pOperator->completed = true;
}
return pInfo->pRes;
return pInfo->binfo.pRes;
}
SOperatorInfo* upstream = pOperator->upstream;
@ -6626,7 +6591,7 @@ static SSDataBlock* doHashGroupbyAgg(void* param) {
}
// the pDataBlock are always the same one, no need to call this again
setInputSDataBlock(pOperator, pInfo->pCtx, pBlock, pRuntimeEnv->pQuery->order.order);
setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, pRuntimeEnv->pQuery->order.order);
if (pInfo->colIndex == -1) {
pInfo->colIndex = getGroupbyColumnData_rv(pRuntimeEnv->pQuery->pGroupbyExpr, pBlock);
}
@ -6634,23 +6599,23 @@ static SSDataBlock* doHashGroupbyAgg(void* param) {
hashGroupbyAgg(pRuntimeEnv, pOperator, pInfo, pBlock);
}
closeAllResultRows(&pInfo->resultRowInfo);
closeAllResultRows(&pInfo->binfo.resultRowInfo);
setQueryStatus(pRuntimeEnv->pQuery, QUERY_COMPLETED);
if (!pRuntimeEnv->pQuery->stableQuery) {
finalizeQueryResult_rv(pOperator, pInfo->pCtx, &pInfo->resultRowInfo, pInfo->rowCellInfoOffset);
finalizeQueryResult_rv(pOperator, pInfo->binfo.pCtx, &pInfo->binfo.resultRowInfo, pInfo->binfo.rowCellInfoOffset);
}
updateWindowResNumOfRes_rv(pRuntimeEnv, pInfo->pCtx, pOperator->numOfOutput, &pInfo->resultRowInfo, pInfo->rowCellInfoOffset);
updateWindowResNumOfRes_rv(pRuntimeEnv, pInfo->binfo.pCtx, pOperator->numOfOutput, &pInfo->binfo.resultRowInfo, pInfo->binfo.rowCellInfoOffset);
initGroupResInfo(&pRuntimeEnv->groupResInfo, &pInfo->resultRowInfo, 0);
toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pInfo->pRes);
initGroupResInfo(&pRuntimeEnv->groupResInfo, &pInfo->binfo.resultRowInfo, 0);
toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pInfo->binfo.pRes);
if (pInfo->pRes->info.rows == 0 || !hasRemainData(&pRuntimeEnv->groupResInfo)) {
if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainData(&pRuntimeEnv->groupResInfo)) {
pOperator->completed = true;
}
return pInfo->pRes;
return pInfo->binfo.pRes;
}
static SSDataBlock* doFill(void* param) {
@ -6702,7 +6667,7 @@ static void destroyOperatorInfo(SOperatorInfo* pOperator) {
}
if (pOperator->cleanup != NULL) {
pOperator->cleanup(pOperator->info);
pOperator->cleanup(pOperator->info, pOperator->numOfOutput);
}
destroyOperatorInfo(pOperator->upstream);
@ -6720,47 +6685,81 @@ static SOperatorInfo* createAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv,
setDefaultOutputBuf(pRuntimeEnv, pInfo->pCtx, &pInfo->resultRowInfo, pInfo->pRes, pInfo->rowCellInfoOffset);
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "Aggregate";
pOperator->name = "TableAggregate";
pOperator->blockingOptr = true;
pOperator->completed = false;
pOperator->info = pInfo;
pOperator->upstream = upstream;
pOperator->exec = doAggregate;
pOperator->pExpr = pExpr;
pOperator->numOfOutput = numOfOutput;
pOperator->pRuntimeEnv = pRuntimeEnv;
pOperator->exec = doAggregate;
pOperator->cleanup = destroyBasicOperatorInfo;
return pOperator;
}
static SOperatorInfo* createStableAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) {
static void doDestroyBasicInfo(SOptrBasicInfo* pInfo, int32_t numOfOutput) {
assert(pInfo != NULL);
destroySQLFunctionCtx(pInfo->pCtx, numOfOutput);
tfree(pInfo->rowCellInfoOffset);
cleanupResultRowInfo(&pInfo->resultRowInfo);
pInfo->pRes = destroyOutputBuf(pInfo->pRes);
}
static void destroyBasicOperatorInfo(void* param, int32_t numOfOutput) {
SOptrBasicInfo* pInfo = (SOptrBasicInfo*) param;
doDestroyBasicInfo(pInfo, numOfOutput);
}
static void destroySFillOperatorInfo(void* param, int32_t numOfOutput) {
SFillOperatorInfo* pInfo = (SFillOperatorInfo*) param;
pInfo->pRes = destroyOutputBuf(pInfo->pRes);
}
static void destroyGroupbyOperatorInfo(void* param, int32_t numOfOutput) {
SHashGroupbyOperatorInfo* pInfo = (SHashGroupbyOperatorInfo*) param;
doDestroyBasicInfo(&pInfo->binfo, numOfOutput);
}
static void destroyArithOperatorInfo(void* param, int32_t numOfOutput) {
SArithOperatorInfo* pInfo = (SArithOperatorInfo*) param;
doDestroyBasicInfo(&pInfo->binfo, numOfOutput);
}
SOperatorInfo* createStableAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) {
SAggOperatorInfo* pInfo = calloc(1, sizeof(SAggOperatorInfo));
pInfo->pRes = createOutputBuf(pExpr, numOfOutput);
pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->rowCellInfoOffset);
initResultRowInfo(&pInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT);
pInfo->pRes = createOutputBuf(pExpr, numOfOutput);
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "STableAggregate";
pOperator->blockingOptr = true;
pOperator->completed = false;
pOperator->info = pInfo;
pOperator->upstream = upstream;
pOperator->exec = doSTableAggregate;
pOperator->pExpr = pExpr;
pOperator->numOfOutput = numOfOutput;
pOperator->pRuntimeEnv = pRuntimeEnv;
pOperator->exec = doSTableAggregate;
pOperator->cleanup = destroyBasicOperatorInfo;
return pOperator;
}
static SOperatorInfo* createArithOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) {
SOperatorInfo* createArithOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) {
SArithOperatorInfo* pInfo = calloc(1, sizeof(SArithOperatorInfo));
pInfo->pOutput = createOutputBuf(pExpr, numOfOutput);
pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->rowCellInfoOffset);
initResultRowInfo(&pInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT);
pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput);
pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset);
initResultRowInfo(&pInfo->binfo.resultRowInfo, 8, TSDB_DATA_TYPE_INT);
setDefaultOutputBuf(pRuntimeEnv, pInfo->binfo.pCtx, &pInfo->binfo.resultRowInfo, pInfo->binfo.pRes, pInfo->binfo.rowCellInfoOffset);
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "ArithmeticOp";
@ -6768,15 +6767,17 @@ static SOperatorInfo* createArithOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOp
pOperator->completed = false;
pOperator->info = pInfo;
pOperator->upstream = upstream;
pOperator->exec = doArithmeticOperation;
pOperator->pExpr = pExpr;
pOperator->numOfOutput = numOfOutput;
pOperator->pRuntimeEnv = pRuntimeEnv;
pOperator->exec = doArithmeticOperation;
pOperator->cleanup = destroyArithOperatorInfo;
return pOperator;
}
static SOperatorInfo* createLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream) {
SOperatorInfo* createLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream) {
SLimitOperatorInfo* pInfo = calloc(1, sizeof(SLimitOperatorInfo));
pInfo->limit = pRuntimeEnv->pQuery->limit.limit;
@ -6793,7 +6794,7 @@ static SOperatorInfo* createLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOp
return pOperator;
}
static SOperatorInfo* createOffsetOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream) {
SOperatorInfo* createOffsetOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream) {
SOffsetOperatorInfo* pInfo = calloc(1, sizeof(SOffsetOperatorInfo));
pInfo->offset = pRuntimeEnv->pQuery->limit.offset;
@ -6812,7 +6813,7 @@ static SOperatorInfo* createOffsetOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SO
return pOperator;
}
static SOperatorInfo* createIntervalAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) {
SOperatorInfo* createIntervalAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) {
SHashIntervalOperatorInfo* pInfo = calloc(1, sizeof(SHashIntervalOperatorInfo));
pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->rowCellInfoOffset);
@ -6825,35 +6826,37 @@ static SOperatorInfo* createIntervalAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEn
pOperator->blockingOptr = true;
pOperator->completed = false;
pOperator->upstream = upstream;
pOperator->exec = doHashIntervalAgg;
pOperator->pExpr = pExpr;
pOperator->numOfOutput = numOfOutput;
pOperator->info = pInfo;
pOperator->pRuntimeEnv = pRuntimeEnv;
pOperator->exec = doHashIntervalAgg;
pOperator->cleanup = destroyBasicOperatorInfo;
return pOperator;
}
static SOperatorInfo* createStableIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) {
SOperatorInfo* createStableIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) {
SHashIntervalOperatorInfo* pInfo = calloc(1, sizeof(SHashIntervalOperatorInfo));
pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->rowCellInfoOffset);
pInfo->pRes = createOutputBuf(pExpr, numOfOutput);
initResultRowInfo(&pInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT);
SQuery* pQuery = pRuntimeEnv->pQuery;
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "STableIntervalAggOp";
pOperator->blockingOptr = true;
pOperator->completed = false;
pOperator->upstream = upstream;
pOperator->exec = doSTableIntervalAgg;
pOperator->pExpr = pQuery->pExpr1;
pOperator->numOfOutput = pQuery->numOfOutput;
pOperator->pExpr = pExpr;
pOperator->numOfOutput = numOfOutput;
pOperator->info = pInfo;
pOperator->pRuntimeEnv = pRuntimeEnv;
pOperator->exec = doSTableIntervalAgg;
pOperator->cleanup = destroyBasicOperatorInfo;
return pOperator;
}
@ -6861,26 +6864,27 @@ SOperatorInfo* createHashGroupbyOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOpe
SHashGroupbyOperatorInfo* pInfo = calloc(1, sizeof(SHashGroupbyOperatorInfo));
pInfo->colIndex = -1; // group by column index
pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->rowCellInfoOffset);
pInfo->pRes = createOutputBuf(pExpr, numOfOutput);
initResultRowInfo(&pInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT);
pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset);
pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput);
initResultRowInfo(&pInfo->binfo.resultRowInfo, 8, TSDB_DATA_TYPE_INT);
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "HashGroupby";
pOperator->name = "HashGroupbyAgg";
pOperator->blockingOptr = true;
pOperator->completed = false;
pOperator->upstream = upstream;
pOperator->exec = doHashGroupbyAgg;
pOperator->pExpr = pExpr;
pOperator->numOfOutput = numOfOutput;
pOperator->info = pInfo;
pOperator->pRuntimeEnv = pRuntimeEnv;
pOperator->exec = doHashGroupbyAgg;
pOperator->cleanup = destroyGroupbyOperatorInfo;
return pOperator;
}
static SOperatorInfo* createFillOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr,
SOperatorInfo* createFillOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr,
int32_t numOfOutput) {
SFillOperatorInfo* pInfo = calloc(1, sizeof(SFillOperatorInfo));
pInfo->pRes = createOutputBuf(pExpr, numOfOutput);
@ -6891,12 +6895,14 @@ static SOperatorInfo* createFillOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOpe
pOperator->blockingOptr = false;
pOperator->completed = false;
pOperator->upstream = upstream;
pOperator->exec = doFill;
pOperator->pExpr = pExpr;
pOperator->numOfOutput = numOfOutput;
pOperator->info = pInfo;
pOperator->pRuntimeEnv = pRuntimeEnv;
pOperator->exec = doFill;
pOperator->cleanup = destroySFillOperatorInfo;
return pOperator;
}
@ -7035,7 +7041,7 @@ static SSDataBlock* doTagScan(void* param) {
return pTagScanInfo->pRes;
}
static SOperatorInfo* createTagScanOperator(SQueryRuntimeEnv* pRuntimeEnv) {
SOperatorInfo* createTagScanOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv) {
STagScanInfo* pInfo = calloc(1, sizeof(STagScanInfo));
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
@ -7432,6 +7438,9 @@ int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SQueryParam* param) {
pExprMsg->colInfo.colIndex = htons(pExprMsg->colInfo.colIndex);
pExprMsg->colInfo.colId = htons(pExprMsg->colInfo.colId);
pExprMsg->colInfo.flag = htons(pExprMsg->colInfo.flag);
pExprMsg->colType = htons(pExprMsg->colType);
pExprMsg->colBytes = htons(pExprMsg->colBytes);
pExprMsg->functionId = htons(pExprMsg->functionId);
pExprMsg->numOfParams = htons(pExprMsg->numOfParams);
pExprMsg->resColId = htons(pExprMsg->resColId);
@ -7471,6 +7480,9 @@ int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SQueryParam* param) {
pExprMsg->colInfo.colIndex = htons(pExprMsg->colInfo.colIndex);
pExprMsg->colInfo.colId = htons(pExprMsg->colInfo.colId);
pExprMsg->colInfo.flag = htons(pExprMsg->colInfo.flag);
pExprMsg->colType = htons(pExprMsg->colType);
pExprMsg->colBytes = htons(pExprMsg->colBytes);
pExprMsg->functionId = htons(pExprMsg->functionId);
pExprMsg->numOfParams = htons(pExprMsg->numOfParams);
@ -8036,7 +8048,7 @@ SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyExpr
// allocate additional memory for interResults that are usually larger then final results
// TODO refactor
int16_t bytes = 0;
if (pQuery->pExpr2 == NULL || col > pQuery->numOfExpr2) {
if (pQuery->pExpr2 == NULL || col >= pQuery->numOfExpr2) {
bytes = pExprs[col].bytes;
} else {
bytes = MAX(pQuery->pExpr2[col].bytes, pExprs[col].bytes);

View File

@ -279,7 +279,7 @@ static int32_t setTagColumnInfo(SFillInfo* pFillInfo, int32_t numOfCols, int32_t
int32_t k = 0;
for (int32_t i = 0; i < numOfCols; ++i) {
SFillColInfo* pColInfo = &pFillInfo->pFillCol[i];
pFillInfo->pData[i] = calloc(1, pColInfo->col.bytes * capacity);
pFillInfo->pData[i] = NULL;
if (TSDB_COL_IS_TAG(pColInfo->flag)) {
bool exists = false;
@ -378,9 +378,9 @@ void* taosDestroyFillInfo(SFillInfo* pFillInfo) {
tfree(pFillInfo->nextValues);
tfree(pFillInfo->pTags);
for(int32_t i = 0; i < pFillInfo->numOfCols; ++i) {
tfree(pFillInfo->pData[i]);
}
// for(int32_t i = 0; i < pFillInfo->numOfCols; ++i) {
// tfree(pFillInfo->pData[i]);
// }
tfree(pFillInfo->pData);
tfree(pFillInfo->pFillCol);

View File

@ -980,10 +980,6 @@ if $data00 != @20-01-01 01:01:00.000@ then
return -1
endi
if $data00 != @20-01-01 01:01:00.000@ then
return -1
endi
if $data1
if $data01 != 2.000000000 then
return -1
endi