fix(query): fix bug in calculating the aggregate function with constant numeric value as input parameter.
This commit is contained in:
parent
f0463a6a58
commit
7429f37c0c
|
@ -220,14 +220,9 @@ typedef struct SExecTaskInfo {
|
||||||
} SExecTaskInfo;
|
} SExecTaskInfo;
|
||||||
|
|
||||||
typedef struct STaskRuntimeEnv {
|
typedef struct STaskRuntimeEnv {
|
||||||
|
|
||||||
jmp_buf env;
|
|
||||||
STaskAttr* pQueryAttr;
|
STaskAttr* pQueryAttr;
|
||||||
uint32_t status; // query status
|
uint32_t status; // query status
|
||||||
void* qinfo;
|
|
||||||
uint8_t scanFlag; // denotes reversed scan of data or not
|
uint8_t scanFlag; // denotes reversed scan of data or not
|
||||||
void* pTsdbReadHandle;
|
|
||||||
bool enableGroupData;
|
|
||||||
SDiskbasedBuf* pResultBuf; // query result buffer based on blocked-wised disk file
|
SDiskbasedBuf* pResultBuf; // query result buffer based on blocked-wised disk file
|
||||||
SHashObj* pResultRowHashTable; // quick locate the window object for each result
|
SHashObj* pResultRowHashTable; // quick locate the window object for each result
|
||||||
SHashObj* pResultRowListSet; // used to check if current ResultRowInfo has ResultRow object or not
|
SHashObj* pResultRowListSet; // used to check if current ResultRowInfo has ResultRow object or not
|
||||||
|
@ -235,12 +230,10 @@ typedef struct STaskRuntimeEnv {
|
||||||
char* keyBuf; // window key buffer
|
char* keyBuf; // window key buffer
|
||||||
// The window result objects pool, all the resultRow Objects are allocated and managed by this object.
|
// The window result objects pool, all the resultRow Objects are allocated and managed by this object.
|
||||||
char** prevRow;
|
char** prevRow;
|
||||||
SArray* prevResult; // intermediate result, SArray<SInterResult>
|
|
||||||
STSBuf* pTsBuf; // timestamp filter list
|
STSBuf* pTsBuf; // timestamp filter list
|
||||||
STSCursor cur;
|
STSCursor cur;
|
||||||
|
|
||||||
char* tagVal; // tag value of current data block
|
char* tagVal; // tag value of current data block
|
||||||
struct SScalarFunctionSupport* scalarSup;
|
|
||||||
STableGroupInfo tableqinfoGroupInfo; // this is a group array list, including SArray<STableQueryInfo*> structure
|
STableGroupInfo tableqinfoGroupInfo; // this is a group array list, including SArray<STableQueryInfo*> structure
|
||||||
struct SOperatorInfo* proot;
|
struct SOperatorInfo* proot;
|
||||||
SGroupResInfo groupResInfo;
|
SGroupResInfo groupResInfo;
|
||||||
|
@ -266,7 +259,6 @@ typedef struct SOperatorInfo {
|
||||||
char* name; // name, used to show the query execution plan
|
char* name; // name, used to show the query execution plan
|
||||||
void* info; // extension attribution
|
void* info; // extension attribution
|
||||||
SExprInfo* pExpr;
|
SExprInfo* pExpr;
|
||||||
STaskRuntimeEnv* pRuntimeEnv; // todo remove it
|
|
||||||
SExecTaskInfo* pTaskInfo;
|
SExecTaskInfo* pTaskInfo;
|
||||||
SOperatorCostInfo cost;
|
SOperatorCostInfo cost;
|
||||||
SResultInfo resultInfo;
|
SResultInfo resultInfo;
|
||||||
|
@ -291,7 +283,7 @@ typedef struct {
|
||||||
|
|
||||||
typedef enum {
|
typedef enum {
|
||||||
EX_SOURCE_DATA_NOT_READY = 0x1,
|
EX_SOURCE_DATA_NOT_READY = 0x1,
|
||||||
EX_SOURCE_DATA_READY = 0x2,
|
EX_SOURCE_DATA_READY = 0x2,
|
||||||
EX_SOURCE_DATA_EXHAUSTED = 0x3,
|
EX_SOURCE_DATA_EXHAUSTED = 0x3,
|
||||||
} EX_SOURCE_STATUS;
|
} EX_SOURCE_STATUS;
|
||||||
|
|
||||||
|
|
|
@ -186,10 +186,9 @@ static void getNextTimeWindow(SInterval* pInterval, int32_t precision, int32_t o
|
||||||
static void doSetTagValueToResultBuf(char* output, const char* val, int16_t type, int16_t bytes);
|
static void doSetTagValueToResultBuf(char* output, const char* val, int16_t type, int16_t bytes);
|
||||||
static bool functionNeedToExecute(SqlFunctionCtx* pCtx);
|
static bool functionNeedToExecute(SqlFunctionCtx* pCtx);
|
||||||
|
|
||||||
static void setBlockStatisInfo(SqlFunctionCtx* pCtx, SExprInfo* pExpr, SSDataBlock* pSDataBlock, SColumn* pColumn);
|
static void setBlockStatisInfo(SqlFunctionCtx* pCtx, SExprInfo* pExpr, SSDataBlock* pSDataBlock);
|
||||||
|
|
||||||
static void destroyTableQueryInfoImpl(STableQueryInfo* pTableQueryInfo);
|
static void destroyTableQueryInfoImpl(STableQueryInfo* pTableQueryInfo);
|
||||||
static bool hasMainOutput(STaskAttr* pQueryAttr);
|
|
||||||
|
|
||||||
static SColumnInfo* extractColumnFilterInfo(SExprInfo* pExpr, int32_t numOfOutput, int32_t* numOfFilterCols);
|
static SColumnInfo* extractColumnFilterInfo(SExprInfo* pExpr, int32_t numOfOutput, int32_t* numOfFilterCols);
|
||||||
|
|
||||||
|
@ -456,7 +455,6 @@ static SResultRow* doSetResultOutBufByKey_rv(SDiskbasedBuf* pResultBuf, SResultR
|
||||||
if (p1 != NULL) {
|
if (p1 != NULL) {
|
||||||
if (pResultRowInfo->size == 0) {
|
if (pResultRowInfo->size == 0) {
|
||||||
existInCurrentResusltRowInfo = false; // this time window created by other timestamp that does not belongs to current table.
|
existInCurrentResusltRowInfo = false; // this time window created by other timestamp that does not belongs to current table.
|
||||||
// assert(pResultRowInfo->curPos == -1);
|
|
||||||
} else if (pResultRowInfo->size == 1) {
|
} else if (pResultRowInfo->size == 1) {
|
||||||
SResultRowPosition* p = &pResultRowInfo->pPosition[0];
|
SResultRowPosition* p = &pResultRowInfo->pPosition[0];
|
||||||
existInCurrentResusltRowInfo = (p->pageId == p1->pageId && p->offset == p1->offset);
|
existInCurrentResusltRowInfo = (p->pageId == p1->pageId && p->offset == p1->offset);
|
||||||
|
@ -465,7 +463,6 @@ static SResultRow* doSetResultOutBufByKey_rv(SDiskbasedBuf* pResultBuf, SResultR
|
||||||
int64_t* index = taosHashGet(pSup->pResultRowListSet, pSup->keyBuf, GET_RES_EXT_WINDOW_KEY_LEN(bytes));
|
int64_t* index = taosHashGet(pSup->pResultRowListSet, pSup->keyBuf, GET_RES_EXT_WINDOW_KEY_LEN(bytes));
|
||||||
if (index != NULL) {
|
if (index != NULL) {
|
||||||
// TODO check the scan order for current opened time window
|
// TODO check the scan order for current opened time window
|
||||||
// pResultRowInfo->curPos = (int32_t)*index;
|
|
||||||
existInCurrentResusltRowInfo = true;
|
existInCurrentResusltRowInfo = true;
|
||||||
} else {
|
} else {
|
||||||
existInCurrentResusltRowInfo = false;
|
existInCurrentResusltRowInfo = false;
|
||||||
|
@ -505,7 +502,6 @@ static SResultRow* doSetResultOutBufByKey_rv(SDiskbasedBuf* pResultBuf, SResultR
|
||||||
}
|
}
|
||||||
|
|
||||||
// 2. set the new time window to be the new active time window
|
// 2. set the new time window to be the new active time window
|
||||||
// pResultRowInfo->curPos = pResultRowInfo->size;
|
|
||||||
pResultRowInfo->pPosition[pResultRowInfo->size++] = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset};
|
pResultRowInfo->pPosition[pResultRowInfo->size++] = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset};
|
||||||
pResultRowInfo->cur = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset};
|
pResultRowInfo->cur = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset};
|
||||||
SET_RES_EXT_WINDOW_KEY(pSup->keyBuf, pData, bytes, uid, pResultRowInfo);
|
SET_RES_EXT_WINDOW_KEY(pSup->keyBuf, pData, bytes, uid, pResultRowInfo);
|
||||||
|
@ -1035,13 +1031,13 @@ static TSKEY getStartTsKey(STimeWindow* win, const TSKEY* tsCols, int32_t rows,
|
||||||
return ts;
|
return ts;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order);
|
static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order);
|
||||||
|
|
||||||
static void doSetInputDataBlockInfo(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order) {
|
static void doSetInputDataBlockInfo(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order) {
|
||||||
for (int32_t i = 0; i < pOperator->numOfOutput; ++i) {
|
for (int32_t i = 0; i < pOperator->numOfOutput; ++i) {
|
||||||
pCtx[i].order = order;
|
pCtx[i].order = order;
|
||||||
pCtx[i].size = pBlock->info.rows;
|
pCtx[i].size = pBlock->info.rows;
|
||||||
setBlockStatisInfo(&pCtx[i], &pOperator->pExpr[i], pBlock, NULL /*&pOperator->pExpr[i].base.colInfo*/);
|
setBlockStatisInfo(&pCtx[i], &pOperator->pExpr[i], pBlock);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1053,22 +1049,64 @@ void setInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlo
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static void doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order) {
|
static int32_t doCreateConstantValColumnInfo(SInputColumnInfoData* pInput, SFunctParam* pFuncParam, int32_t type, int32_t paramIndex, int32_t numOfRows) {
|
||||||
|
SColumnInfoData* pColInfo = NULL;
|
||||||
|
if (pInput->pData[paramIndex] == NULL) {
|
||||||
|
pColInfo = taosMemoryCalloc(1, sizeof(SColumnInfoData));
|
||||||
|
if (pColInfo == NULL) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Set the correct column info (data type and bytes)
|
||||||
|
pColInfo->info.type = type;
|
||||||
|
pColInfo->info.bytes = tDataTypes[type].bytes;
|
||||||
|
|
||||||
|
pInput->pData[paramIndex] = pColInfo;
|
||||||
|
}
|
||||||
|
|
||||||
|
ASSERT(!IS_VAR_DATA_TYPE(type));
|
||||||
|
colInfoDataEnsureCapacity(pColInfo, numOfRows);
|
||||||
|
|
||||||
|
if (type == TSDB_DATA_TYPE_BIGINT || type == TSDB_DATA_TYPE_UBIGINT) {
|
||||||
|
int64_t v = pFuncParam->param.i;
|
||||||
|
for(int32_t i = 0; i < numOfRows; ++i) {
|
||||||
|
colDataAppendInt64(pColInfo, i, &v);
|
||||||
|
}
|
||||||
|
} else if (type == TSDB_DATA_TYPE_DOUBLE) {
|
||||||
|
double v = pFuncParam->param.d;
|
||||||
|
for(int32_t i = 0; i < numOfRows; ++i) {
|
||||||
|
colDataAppendDouble(pColInfo, i, &v);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order) {
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
for (int32_t i = 0; i < pOperator->numOfOutput; ++i) {
|
for (int32_t i = 0; i < pOperator->numOfOutput; ++i) {
|
||||||
pCtx[i].order = order;
|
pCtx[i].order = order;
|
||||||
pCtx[i].size = pBlock->info.rows;
|
pCtx[i].size = pBlock->info.rows;
|
||||||
pCtx[i].currentStage = MAIN_SCAN;
|
pCtx[i].currentStage = MAIN_SCAN;
|
||||||
|
|
||||||
|
SInputColumnInfoData* pInput = &pCtx[i].input;
|
||||||
|
|
||||||
SExprInfo* pOneExpr = &pOperator->pExpr[i];
|
SExprInfo* pOneExpr = &pOperator->pExpr[i];
|
||||||
for (int32_t j = 0; j < pOneExpr->base.numOfParams; ++j) {
|
for (int32_t j = 0; j < pOneExpr->base.numOfParams; ++j) {
|
||||||
SFunctParam *pFuncParam = &pOneExpr->base.pParam[j];
|
SFunctParam *pFuncParam = &pOneExpr->base.pParam[j];
|
||||||
if (pFuncParam->type == FUNC_PARAM_TYPE_COLUMN) {
|
if (pFuncParam->type == FUNC_PARAM_TYPE_COLUMN) {
|
||||||
int32_t slotId = pFuncParam->pCol->slotId;
|
int32_t slotId = pFuncParam->pCol->slotId;
|
||||||
pCtx[i].input.pData[j] = taosArrayGet(pBlock->pDataBlock, slotId);
|
pInput->pData[j] = taosArrayGet(pBlock->pDataBlock, slotId);
|
||||||
pCtx[i].input.totalRows = pBlock->info.rows;
|
pInput->totalRows = pBlock->info.rows;
|
||||||
pCtx[i].input.numOfRows = pBlock->info.rows;
|
pInput->numOfRows = pBlock->info.rows;
|
||||||
pCtx[i].input.startRowIndex = 0;
|
pInput->startRowIndex = 0;
|
||||||
ASSERT(pCtx[i].input.pData[j] != NULL);
|
ASSERT(pInput->pData[j] != NULL);
|
||||||
|
} else if (pFuncParam->type == FUNC_PARAM_TYPE_VALUE) {
|
||||||
|
code = doCreateConstantValColumnInfo(pInput, pFuncParam, pFuncParam->param.nType, j, pBlock->info.rows);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1111,6 +1149,8 @@ static void doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx,
|
||||||
// }
|
// }
|
||||||
// }
|
// }
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SqlFunctionCtx* pCtx) {
|
static void doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SqlFunctionCtx* pCtx) {
|
||||||
|
@ -1200,7 +1240,6 @@ void projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock*
|
||||||
|
|
||||||
void doTimeWindowInterpolation(SOperatorInfo* pOperator, SOptrBasicInfo* pInfo, SArray* pDataBlock, TSKEY prevTs,
|
void doTimeWindowInterpolation(SOperatorInfo* pOperator, SOptrBasicInfo* pInfo, SArray* pDataBlock, TSKEY prevTs,
|
||||||
int32_t prevRowIndex, TSKEY curTs, int32_t curRowIndex, TSKEY windowKey, int32_t type) {
|
int32_t prevRowIndex, TSKEY curTs, int32_t curRowIndex, TSKEY windowKey, int32_t type) {
|
||||||
STaskRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
|
|
||||||
SExprInfo* pExpr = pOperator->pExpr;
|
SExprInfo* pExpr = pOperator->pExpr;
|
||||||
|
|
||||||
SqlFunctionCtx* pCtx = pInfo->pCtx;
|
SqlFunctionCtx* pCtx = pInfo->pCtx;
|
||||||
|
@ -1220,7 +1259,7 @@ void doTimeWindowInterpolation(SOperatorInfo* pOperator, SOptrBasicInfo* pInfo,
|
||||||
double v1 = 0, v2 = 0, v = 0;
|
double v1 = 0, v2 = 0, v = 0;
|
||||||
|
|
||||||
if (prevRowIndex == -1) {
|
if (prevRowIndex == -1) {
|
||||||
GET_TYPED_DATA(v1, double, pColInfo->info.type, (char*)pRuntimeEnv->prevRow[index]);
|
// GET_TYPED_DATA(v1, double, pColInfo->info.type, (char*)pRuntimeEnv->prevRow[index]);
|
||||||
} else {
|
} else {
|
||||||
GET_TYPED_DATA(v1, double, pColInfo->info.type, (char*)pColInfo->pData + prevRowIndex * pColInfo->info.bytes);
|
GET_TYPED_DATA(v1, double, pColInfo->info.type, (char*)pColInfo->pData + prevRowIndex * pColInfo->info.bytes);
|
||||||
}
|
}
|
||||||
|
@ -1237,7 +1276,7 @@ void doTimeWindowInterpolation(SOperatorInfo* pOperator, SOptrBasicInfo* pInfo,
|
||||||
|
|
||||||
if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR) {
|
if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR) {
|
||||||
if (prevRowIndex == -1) {
|
if (prevRowIndex == -1) {
|
||||||
pCtx[k].start.ptr = (char*)pRuntimeEnv->prevRow[index];
|
// pCtx[k].start.ptr = (char*)pRuntimeEnv->prevRow[index];
|
||||||
} else {
|
} else {
|
||||||
pCtx[k].start.ptr = (char*)pColInfo->pData + prevRowIndex * pColInfo->info.bytes;
|
pCtx[k].start.ptr = (char*)pColInfo->pData + prevRowIndex * pColInfo->info.bytes;
|
||||||
}
|
}
|
||||||
|
@ -1507,86 +1546,6 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe
|
||||||
// updateResultRowInfoActiveIndex(pResultRowInfo, &pInfo->win, pRuntimeEnv->current->lastKey, true, false);
|
// updateResultRowInfoActiveIndex(pResultRowInfo, &pInfo->win, pRuntimeEnv->current->lastKey, true, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void hashAllIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pSDataBlock,
|
|
||||||
int32_t tableGroupId) {
|
|
||||||
STableIntervalOperatorInfo* pInfo = (STableIntervalOperatorInfo*)pOperatorInfo->info;
|
|
||||||
|
|
||||||
STaskRuntimeEnv* pRuntimeEnv = pOperatorInfo->pRuntimeEnv;
|
|
||||||
int32_t numOfOutput = pOperatorInfo->numOfOutput;
|
|
||||||
|
|
||||||
int32_t step = 1;//GET_FORWARD_DIRECTION_FACTOR(pQueryAttr->order.order);
|
|
||||||
bool ascQuery = true;
|
|
||||||
|
|
||||||
TSKEY* tsCols = NULL;
|
|
||||||
if (pSDataBlock->pDataBlock != NULL) {
|
|
||||||
SColumnInfoData* pColDataInfo = taosArrayGet(pSDataBlock->pDataBlock, 0);
|
|
||||||
tsCols = (int64_t*)pColDataInfo->pData;
|
|
||||||
assert(tsCols[0] == pSDataBlock->info.window.skey &&
|
|
||||||
tsCols[pSDataBlock->info.rows - 1] == pSDataBlock->info.window.ekey);
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t startPos = ascQuery ? 0 : (pSDataBlock->info.rows - 1);
|
|
||||||
TSKEY ts = getStartTsKey(&pSDataBlock->info.window, tsCols, pSDataBlock->info.rows, ascQuery);
|
|
||||||
|
|
||||||
STimeWindow win = {0};//getCurrentActiveTimeWindow(pResultRowInfo, ts, pQueryAttr);
|
|
||||||
bool masterScan = IS_MAIN_SCAN(pRuntimeEnv);
|
|
||||||
|
|
||||||
SResultRow* pResult = NULL;
|
|
||||||
int32_t forwardStep = 0;
|
|
||||||
int32_t ret = 0;
|
|
||||||
STimeWindow preWin = win;
|
|
||||||
|
|
||||||
while (1) {
|
|
||||||
// null data, failed to allocate more memory buffer
|
|
||||||
// ret = setResultOutputBufByKey(pRuntimeEnv, pResultRowInfo, pSDataBlock->info.uid, &win, masterScan, &pResult,
|
|
||||||
// tableGroupId, pInfo->binfo.pCtx, numOfOutput, pInfo->binfo.rowCellInfoOffset);
|
|
||||||
if (ret != TSDB_CODE_SUCCESS) {
|
|
||||||
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
|
|
||||||
}
|
|
||||||
|
|
||||||
TSKEY ekey = 0;//reviseWindowEkey(pQueryAttr, &win);
|
|
||||||
// forwardStep = getNumOfRowsInTimeWindow(pRuntimeEnv, &pSDataBlock->info, tsCols, startPos, ekey,
|
|
||||||
// binarySearchForKey, true);
|
|
||||||
|
|
||||||
// window start(end) key interpolation
|
|
||||||
// doWindowBorderInterpolation(pOperatorInfo, pSDataBlock, pInfo->binfo.pCtx, pResult, &win, startPos,
|
|
||||||
// forwardStep); doApplyFunctions(pRuntimeEnv, pInfo->binfo.pCtx, ascQuery ? &win : &preWin, startPos,
|
|
||||||
// forwardStep, tsCols, pSDataBlock->info.rows, numOfOutput);
|
|
||||||
preWin = win;
|
|
||||||
|
|
||||||
int32_t prevEndPos = (forwardStep - 1) * step + startPos;
|
|
||||||
// startPos = getNextQualifiedWindow(pQueryAttr, &win, &pSDataBlock->info, tsCols, binarySearchForKey,
|
|
||||||
// prevEndPos);
|
|
||||||
if (startPos < 0) {
|
|
||||||
// if ((ascQuery && win.skey <= pQueryAttr->window.ekey) || ((!ascQuery) && win.ekey >= pQueryAttr->window.ekey)) {
|
|
||||||
// int32_t code =
|
|
||||||
// setResultOutputBufByKey(pRuntimeEnv, pResultRowInfo, pSDataBlock->info.uid, &win, masterScan, &pResult,
|
|
||||||
// tableGroupId, pInfo->binfo.pCtx, numOfOutput, pInfo->binfo.rowCellInfoOffset);
|
|
||||||
// if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
|
|
||||||
// longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// startPos = pSDataBlock->info.rows - 1;
|
|
||||||
|
|
||||||
// window start(end) key interpolation
|
|
||||||
// doWindowBorderInterpolation(pOperatorInfo, pSDataBlock, pInfo->binfo.pCtx, pResult, &win, startPos,
|
|
||||||
// forwardStep); doApplyFunctions(pRuntimeEnv, pInfo->binfo.pCtx, ascQuery ? &win : &preWin, startPos,
|
|
||||||
// forwardStep, tsCols, pSDataBlock->info.rows, numOfOutput);
|
|
||||||
// }
|
|
||||||
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
setResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
|
|
||||||
}
|
|
||||||
|
|
||||||
// if (pQueryAttr->timeWindowInterpo) {
|
|
||||||
// int32_t rowIndex = ascQuery ? (pSDataBlock->info.rows - 1) : 0;
|
|
||||||
// saveDataBlockLastRow(pRuntimeEnv, &pSDataBlock->info, pSDataBlock->pDataBlock, rowIndex);
|
|
||||||
// }
|
|
||||||
|
|
||||||
// updateResultRowInfoActiveIndex(pResultRowInfo, pQueryAttr, pRuntimeEnv->current->lastKey);
|
|
||||||
}
|
|
||||||
|
|
||||||
static void doKeepTuple(SWindowRowsSup* pRowSup, int64_t ts) {
|
static void doKeepTuple(SWindowRowsSup* pRowSup, int64_t ts) {
|
||||||
pRowSup->win.ekey = ts;
|
pRowSup->win.ekey = ts;
|
||||||
pRowSup->prevTs = ts;
|
pRowSup->prevTs = ts;
|
||||||
|
@ -1730,30 +1689,85 @@ static bool functionNeedToExecute(SqlFunctionCtx* pCtx) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
void setBlockStatisInfo(SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, SSDataBlock* pBlock, SColumn* pColumn) {
|
static int32_t doCreateConstantValColumnAggInfo(SInputColumnInfoData* pInput, SFunctParam* pFuncParam, int32_t type, int32_t paramIndex, int32_t numOfRows) {
|
||||||
if (pBlock->pBlockAgg != NULL /*&& TSDB_COL_IS_NORMAL_COL(pColumn->flag)*/) {
|
if (pInput->pData[paramIndex] == NULL) {
|
||||||
|
pInput->pData[paramIndex] = taosMemoryCalloc(1, sizeof(SColumnInfoData));
|
||||||
|
if (pInput->pData[paramIndex] == NULL) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Set the correct column info (data type and bytes)
|
||||||
|
pInput->pData[paramIndex]->info.type = type;
|
||||||
|
pInput->pData[paramIndex]->info.bytes = tDataTypes[type].bytes;
|
||||||
|
}
|
||||||
|
|
||||||
|
SColumnDataAgg* da = NULL;
|
||||||
|
if (pInput->pColumnDataAgg[paramIndex] == NULL) {
|
||||||
|
da = taosMemoryCalloc(1, sizeof(SColumnDataAgg));
|
||||||
|
pInput->pColumnDataAgg[paramIndex] = da;
|
||||||
|
if (da == NULL) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
da = pInput->pColumnDataAgg[paramIndex];
|
||||||
|
}
|
||||||
|
|
||||||
|
ASSERT(!IS_VAR_DATA_TYPE(type));
|
||||||
|
|
||||||
|
if (type == TSDB_DATA_TYPE_BIGINT) {
|
||||||
|
int64_t v = pFuncParam->param.i;
|
||||||
|
*da = (SColumnDataAgg) {.numOfNull = 0, .min = v, .max = v, .maxIndex = 0, .minIndex = 0, .sum = v * numOfRows};
|
||||||
|
} else if (type == TSDB_DATA_TYPE_DOUBLE) {
|
||||||
|
double v = pFuncParam->param.d;
|
||||||
|
*da = (SColumnDataAgg) {.numOfNull = 0, .maxIndex = 0, .minIndex = 0};
|
||||||
|
|
||||||
|
*(double*) &da->min = v;
|
||||||
|
*(double*) &da->max = v;
|
||||||
|
*(double*) &da->sum = v * numOfRows;
|
||||||
|
} else if (type == TSDB_DATA_TYPE_BOOL) { // todo validate this data type
|
||||||
|
bool v = pFuncParam->param.i;
|
||||||
|
|
||||||
|
*da = (SColumnDataAgg) {.numOfNull = 0, .maxIndex = 0, .minIndex = 0};
|
||||||
|
*(bool*) &da->min = 0;
|
||||||
|
*(bool*) &da->max = v;
|
||||||
|
*(bool*) &da->sum = v * numOfRows;
|
||||||
|
} else if (type == TSDB_DATA_TYPE_TIMESTAMP) {
|
||||||
|
// do nothing
|
||||||
|
} else {
|
||||||
|
ASSERT(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
void setBlockStatisInfo(SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, SSDataBlock* pBlock) {
|
||||||
|
int32_t numOfRows = pBlock->info.rows;
|
||||||
|
|
||||||
|
SInputColumnInfoData* pInput = &pCtx->input;
|
||||||
|
pInput->numOfRows = numOfRows;
|
||||||
|
pInput->totalRows = numOfRows;
|
||||||
|
|
||||||
|
if (pBlock->pBlockAgg != NULL) {
|
||||||
|
pInput->colDataAggIsSet = true;
|
||||||
|
|
||||||
for (int32_t j = 0; j < pExprInfo->base.numOfParams; ++j) {
|
for (int32_t j = 0; j < pExprInfo->base.numOfParams; ++j) {
|
||||||
SFunctParam* pFuncParam = &pExprInfo->base.pParam[j];
|
SFunctParam* pFuncParam = &pExprInfo->base.pParam[j];
|
||||||
|
|
||||||
if (pFuncParam->type == FUNC_PARAM_TYPE_COLUMN) {
|
if (pFuncParam->type == FUNC_PARAM_TYPE_COLUMN) {
|
||||||
int32_t slotId = pFuncParam->pCol->slotId;
|
int32_t slotId = pFuncParam->pCol->slotId;
|
||||||
SInputColumnInfoData* pInput = &pCtx->input;
|
|
||||||
|
|
||||||
pInput->pColumnDataAgg[j] = &pBlock->pBlockAgg[slotId];
|
pInput->pColumnDataAgg[j] = &pBlock->pBlockAgg[slotId];
|
||||||
pInput->colDataAggIsSet = true;
|
|
||||||
pInput->numOfRows = pBlock->info.rows;
|
|
||||||
pInput->totalRows = pBlock->info.rows;
|
|
||||||
|
|
||||||
// Here we set the column info data since the data type for each column data is required, but
|
// Here we set the column info data since the data type for each column data is required, but
|
||||||
// the data in the corresponding SColumnInfoData will not be used.
|
// the data in the corresponding SColumnInfoData will not be used.
|
||||||
pInput->pData[j] = taosArrayGet(pBlock->pDataBlock, slotId);
|
pInput->pData[j] = taosArrayGet(pBlock->pDataBlock, slotId);
|
||||||
|
} else if (pFuncParam->type == FUNC_PARAM_TYPE_VALUE) {
|
||||||
|
doCreateConstantValColumnAggInfo(pInput, pFuncParam, pFuncParam->param.nType, j, pBlock->info.rows);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
pCtx->input.colDataAggIsSet = false;
|
pInput->colDataAggIsSet = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
// pCtx->hasNull = hasNull(pColumn, pAgg);
|
|
||||||
|
|
||||||
// set the statistics data for primary time stamp column
|
// set the statistics data for primary time stamp column
|
||||||
// if (pCtx->functionId == FUNCTION_SPREAD && pColumn->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
|
// if (pCtx->functionId == FUNCTION_SPREAD && pColumn->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
|
||||||
// pCtx->isAggSet = true;
|
// pCtx->isAggSet = true;
|
||||||
|
@ -2224,33 +2238,6 @@ static bool overlapWithTimeWindow(STaskAttr* pQueryAttr, SDataBlockInfo* pBlockI
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t doTSJoinFilter(STaskRuntimeEnv* pRuntimeEnv, TSKEY key, bool ascQuery) {
|
|
||||||
STSElem elem = tsBufGetElem(pRuntimeEnv->pTsBuf);
|
|
||||||
|
|
||||||
#if defined(_DEBUG_VIEW)
|
|
||||||
printf("elem in comp ts file:%" PRId64 ", key:%" PRId64 ", tag:%" PRIu64
|
|
||||||
", query order:%d, ts order:%d, traverse:%d, index:%d\n",
|
|
||||||
elem.ts, key, elem.tag.i, pQueryAttr->order.order, pRuntimeEnv->pTsBuf->tsOrder,
|
|
||||||
pRuntimeEnv->pTsBuf->cur.order, pRuntimeEnv->pTsBuf->cur.tsIndex);
|
|
||||||
#endif
|
|
||||||
|
|
||||||
if (ascQuery) {
|
|
||||||
if (key < elem.ts) {
|
|
||||||
return TS_JOIN_TS_NOT_EQUALS;
|
|
||||||
} else if (key > elem.ts) {
|
|
||||||
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_INCONSISTAN);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
if (key > elem.ts) {
|
|
||||||
return TS_JOIN_TS_NOT_EQUALS;
|
|
||||||
} else if (key < elem.ts) {
|
|
||||||
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_INCONSISTAN);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return TS_JOIN_TS_EQUAL;
|
|
||||||
}
|
|
||||||
|
|
||||||
void doCompactSDataBlock(SSDataBlock* pBlock, int32_t numOfRows, int8_t* p) {
|
void doCompactSDataBlock(SSDataBlock* pBlock, int32_t numOfRows, int8_t* p) {
|
||||||
int32_t len = 0;
|
int32_t len = 0;
|
||||||
int32_t start = 0;
|
int32_t start = 0;
|
||||||
|
@ -2300,54 +2287,6 @@ void doCompactSDataBlock(SSDataBlock* pBlock, int32_t numOfRows, int8_t* p) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void filterColRowsInDataBlock(STaskRuntimeEnv* pRuntimeEnv, SSDataBlock* pBlock, bool ascQuery) {
|
|
||||||
int32_t numOfRows = pBlock->info.rows;
|
|
||||||
|
|
||||||
int8_t* p = NULL;
|
|
||||||
bool all = true;
|
|
||||||
|
|
||||||
if (pRuntimeEnv->pTsBuf != NULL) {
|
|
||||||
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, 0);
|
|
||||||
p = taosMemoryCalloc(numOfRows, sizeof(int8_t));
|
|
||||||
|
|
||||||
TSKEY* k = (TSKEY*)pColInfoData->pData;
|
|
||||||
for (int32_t i = 0; i < numOfRows; ++i) {
|
|
||||||
int32_t offset = ascQuery ? i : (numOfRows - i - 1);
|
|
||||||
int32_t ret = doTSJoinFilter(pRuntimeEnv, k[offset], ascQuery);
|
|
||||||
if (ret == TS_JOIN_TAG_NOT_EQUALS) {
|
|
||||||
break;
|
|
||||||
} else if (ret == TS_JOIN_TS_NOT_EQUALS) {
|
|
||||||
all = false;
|
|
||||||
continue;
|
|
||||||
} else {
|
|
||||||
assert(ret == TS_JOIN_TS_EQUAL);
|
|
||||||
p[offset] = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!tsBufNextPos(pRuntimeEnv->pTsBuf)) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// save the cursor status
|
|
||||||
// pRuntimeEnv->current->cur = tsBufGetCursor(pRuntimeEnv->pTsBuf);
|
|
||||||
} else {
|
|
||||||
// all = filterExecute(pRuntimeEnv->pQueryAttr->pFilters, numOfRows, &p, pBlock->pBlockAgg,
|
|
||||||
// pRuntimeEnv->pQueryAttr->numOfCols);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!all) {
|
|
||||||
if (p) {
|
|
||||||
doCompactSDataBlock(pBlock, numOfRows, p);
|
|
||||||
} else {
|
|
||||||
pBlock->info.rows = 0;
|
|
||||||
pBlock->pBlockAgg = NULL; // clean the block statistics info
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
taosMemoryFreeClear(p);
|
|
||||||
}
|
|
||||||
|
|
||||||
static SColumnInfo* doGetTagColumnInfoById(SColumnInfo* pTagColList, int32_t numOfTags, int16_t colId);
|
static SColumnInfo* doGetTagColumnInfoById(SColumnInfo* pTagColList, int32_t numOfTags, int16_t colId);
|
||||||
static void doSetTagValueInParam(void* pTable, int32_t tagColId, SVariant* tag, int16_t type, int16_t bytes);
|
static void doSetTagValueInParam(void* pTable, int32_t tagColId, SVariant* tag, int16_t type, int16_t bytes);
|
||||||
|
|
||||||
|
@ -2936,18 +2875,6 @@ void finalizeUpdatedResult(SqlFunctionCtx* pCtx, int32_t numOfOutput, SDiskbased
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool hasMainOutput(STaskAttr* pQueryAttr) {
|
|
||||||
for (int32_t i = 0; i < pQueryAttr->numOfOutput; ++i) {
|
|
||||||
int32_t functionId = getExprFunctionId(&pQueryAttr->pExpr1[i]);
|
|
||||||
|
|
||||||
if (functionId != FUNCTION_TS && functionId != FUNCTION_TAG && functionId != FUNCTION_TAGPRJ) {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
STableQueryInfo* createTableQueryInfo(void* buf, bool groupbyColumn, STimeWindow win) {
|
STableQueryInfo* createTableQueryInfo(void* buf, bool groupbyColumn, STimeWindow win) {
|
||||||
STableQueryInfo* pTableQueryInfo = buf;
|
STableQueryInfo* pTableQueryInfo = buf;
|
||||||
pTableQueryInfo->lastKey = win.skey;
|
pTableQueryInfo->lastKey = win.skey;
|
||||||
|
@ -3111,48 +3038,6 @@ void setCtxTagForJoin(STaskRuntimeEnv* pRuntimeEnv, SqlFunctionCtx* pCtx, SExprI
|
||||||
// }
|
// }
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t setTimestampListJoinInfo(STaskRuntimeEnv* pRuntimeEnv, SVariant* pTag, STableQueryInfo* pTableQueryInfo) {
|
|
||||||
STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
|
|
||||||
|
|
||||||
assert(pRuntimeEnv->pTsBuf != NULL);
|
|
||||||
#if 0
|
|
||||||
// both the master and supplement scan needs to set the correct ts comp start position
|
|
||||||
if (pTableQueryInfo->cur.vgroupIndex == -1) {
|
|
||||||
taosVariantAssign(&pTableQueryInfo->tag, pTag);
|
|
||||||
|
|
||||||
STSElem elem = tsBufGetElemStartPos(pRuntimeEnv->pTsBuf, pQueryAttr->vgId, &pTableQueryInfo->tag);
|
|
||||||
|
|
||||||
// failed to find data with the specified tag value and vnodeId
|
|
||||||
if (!tsBufIsValidElem(&elem)) {
|
|
||||||
if (pTag->nType == TSDB_DATA_TYPE_BINARY || pTag->nType == TSDB_DATA_TYPE_NCHAR) {
|
|
||||||
//qError("QInfo:0x%"PRIx64" failed to find tag:%s in ts_comp", GET_TASKID(pRuntimeEnv), pTag->pz);
|
|
||||||
} else {
|
|
||||||
//qError("QInfo:0x%"PRIx64" failed to find tag:%" PRId64 " in ts_comp", GET_TASKID(pRuntimeEnv), pTag->i);
|
|
||||||
}
|
|
||||||
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Keep the cursor info of current table
|
|
||||||
pTableQueryInfo->cur = tsBufGetCursor(pRuntimeEnv->pTsBuf);
|
|
||||||
if (pTag->nType == TSDB_DATA_TYPE_BINARY || pTag->nType == TSDB_DATA_TYPE_NCHAR) {
|
|
||||||
//qDebug("QInfo:0x%"PRIx64" find tag:%s start pos in ts_comp, blockIndex:%d, tsIndex:%d", GET_TASKID(pRuntimeEnv), pTag->pz, pTableQueryInfo->cur.blockIndex, pTableQueryInfo->cur.tsIndex);
|
|
||||||
} else {
|
|
||||||
//qDebug("QInfo:0x%"PRIx64" find tag:%"PRId64" start pos in ts_comp, blockIndex:%d, tsIndex:%d", GET_TASKID(pRuntimeEnv), pTag->i, pTableQueryInfo->cur.blockIndex, pTableQueryInfo->cur.tsIndex);
|
|
||||||
}
|
|
||||||
|
|
||||||
} else {
|
|
||||||
tsBufSetCursor(pRuntimeEnv->pTsBuf, &pTableQueryInfo->cur);
|
|
||||||
if (pTag->nType == TSDB_DATA_TYPE_BINARY || pTag->nType == TSDB_DATA_TYPE_NCHAR) {
|
|
||||||
//qDebug("QInfo:0x%"PRIx64" find tag:%s start pos in ts_comp, blockIndex:%d, tsIndex:%d", GET_TASKID(pRuntimeEnv), pTag->pz, pTableQueryInfo->cur.blockIndex, pTableQueryInfo->cur.tsIndex);
|
|
||||||
} else {
|
|
||||||
//qDebug("QInfo:0x%"PRIx64" find tag:%"PRId64" start pos in ts_comp, blockIndex:%d, tsIndex:%d", GET_TASKID(pRuntimeEnv), pTag->i, pTableQueryInfo->cur.blockIndex, pTableQueryInfo->cur.tsIndex);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* There are two cases to handle:
|
* There are two cases to handle:
|
||||||
*
|
*
|
||||||
|
@ -3323,19 +3208,15 @@ int32_t doFillTimeIntervalGapsInResults(struct SFillInfo* pFillInfo, SSDataBlock
|
||||||
return pOutput->info.rows;
|
return pOutput->info.rows;
|
||||||
}
|
}
|
||||||
|
|
||||||
void publishOperatorProfEvent(SOperatorInfo* operatorInfo, EQueryProfEventType eventType) {
|
void publishOperatorProfEvent(SOperatorInfo* pOperator, EQueryProfEventType eventType) {
|
||||||
SQueryProfEvent event = {0};
|
SQueryProfEvent event = {0};
|
||||||
|
|
||||||
event.eventType = eventType;
|
event.eventType = eventType;
|
||||||
event.eventTime = taosGetTimestampUs();
|
event.eventTime = taosGetTimestampUs();
|
||||||
event.operatorType = operatorInfo->operatorType;
|
event.operatorType = pOperator->operatorType;
|
||||||
|
// if (pQInfo->summary.queryProfEvents) {
|
||||||
if (operatorInfo->pRuntimeEnv) {
|
// taosArrayPush(pQInfo->summary.queryProfEvents, &event);
|
||||||
// SQInfo* pQInfo = operatorInfo->pRuntimeEnv->qinfo;
|
// }
|
||||||
// if (pQInfo->summary.queryProfEvents) {
|
|
||||||
// taosArrayPush(pQInfo->summary.queryProfEvents, &event);
|
|
||||||
// }
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void publishQueryAbortEvent(SExecTaskInfo* pTaskInfo, int32_t code) {
|
void publishQueryAbortEvent(SExecTaskInfo* pTaskInfo, int32_t code) {
|
||||||
|
@ -5313,7 +5194,7 @@ static SSDataBlock* doAllIntervalAgg(SOperatorInfo *pOperator, bool* newgroup) {
|
||||||
// setTagValue(pOperator, pRuntimeEnv->current->pTable, pIntervalInfo->pCtx, pOperator->numOfOutput);
|
// setTagValue(pOperator, pRuntimeEnv->current->pTable, pIntervalInfo->pCtx, pOperator->numOfOutput);
|
||||||
// the pDataBlock are always the same one, no need to call this again
|
// the pDataBlock are always the same one, no need to call this again
|
||||||
setInputDataBlock(pOperator, pSliceInfo->binfo.pCtx, pBlock, order);
|
setInputDataBlock(pOperator, pSliceInfo->binfo.pCtx, pBlock, order);
|
||||||
hashAllIntervalAgg(pOperator, &pSliceInfo->binfo.resultRowInfo, pBlock, 0);
|
// hashAllIntervalAgg(pOperator, &pSliceInfo->binfo.resultRowInfo, pBlock, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
// restore the value
|
// restore the value
|
||||||
|
@ -5387,57 +5268,6 @@ static SSDataBlock* doSTableIntervalAgg(SOperatorInfo* pOperator, bool* newgroup
|
||||||
return pInfo->binfo.pRes->info.rows == 0 ? NULL : pInfo->binfo.pRes;
|
return pInfo->binfo.pRes->info.rows == 0 ? NULL : pInfo->binfo.pRes;
|
||||||
}
|
}
|
||||||
|
|
||||||
static SSDataBlock* doAllSTableIntervalAgg(SOperatorInfo* pOperator, bool* newgroup) {
|
|
||||||
if (pOperator->status == OP_EXEC_DONE) {
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
STableIntervalOperatorInfo* pIntervalInfo = pOperator->info;
|
|
||||||
STaskRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
|
|
||||||
|
|
||||||
if (pOperator->status == OP_RES_TO_RETURN) {
|
|
||||||
// copyToSDataBlock(pRuntimeEnv, 3000, pIntervalInfo->pRes, pIntervalInfo->rowCellInfoOffset);
|
|
||||||
if (pIntervalInfo->binfo.pRes->info.rows == 0 || !hasRemainData(&pRuntimeEnv->groupResInfo)) {
|
|
||||||
pOperator->status = OP_EXEC_DONE;
|
|
||||||
}
|
|
||||||
|
|
||||||
return pIntervalInfo->binfo.pRes;
|
|
||||||
}
|
|
||||||
|
|
||||||
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
|
||||||
|
|
||||||
while (1) {
|
|
||||||
publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
|
|
||||||
SSDataBlock* pBlock = downstream->getNextFn(downstream, newgroup);
|
|
||||||
publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
|
|
||||||
|
|
||||||
if (pBlock == NULL) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
// the pDataBlock are always the same one, no need to call this again
|
|
||||||
STableQueryInfo* pTableQueryInfo = pRuntimeEnv->current;
|
|
||||||
|
|
||||||
// setTagValue(pOperator, pTableQueryInfo->pTable, pIntervalInfo->pCtx, pOperator->numOfOutput);
|
|
||||||
// setInputDataBlock(pOperator, pIntervalInfo->binfo.pCtx, pBlock, pQueryAttr->order.order);
|
|
||||||
// setIntervalQueryRange(pRuntimeEnv, pBlock->info.window.skey);
|
|
||||||
|
|
||||||
// hashAllIntervalAgg(pOperator, &pTableQueryInfo->resInfo, pBlock, pTableQueryInfo->groupIndex);
|
|
||||||
}
|
|
||||||
|
|
||||||
pOperator->status = OP_RES_TO_RETURN;
|
|
||||||
// pQueryAttr->order.order = order; // TODO : restore the order
|
|
||||||
setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED);
|
|
||||||
|
|
||||||
int64_t st = taosGetTimestampUs();
|
|
||||||
// copyToSDataBlock(pRuntimeEnv, 3000, pIntervalInfo->pRes, pIntervalInfo->rowCellInfoOffset);
|
|
||||||
if (pIntervalInfo->binfo.pRes->info.rows == 0 || !hasRemainData(&pRuntimeEnv->groupResInfo)) {
|
|
||||||
pOperator->status = OP_EXEC_DONE;
|
|
||||||
}
|
|
||||||
|
|
||||||
return pIntervalInfo->binfo.pRes;
|
|
||||||
}
|
|
||||||
|
|
||||||
static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorInfo* pInfo, SSDataBlock* pBlock) {
|
static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorInfo* pInfo, SSDataBlock* pBlock) {
|
||||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
SOptrBasicInfo* pBInfo = &pInfo->binfo;
|
SOptrBasicInfo* pBInfo = &pInfo->binfo;
|
||||||
|
@ -6480,8 +6310,9 @@ SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t*
|
||||||
pExp->pExpr->_function.num = 1;
|
pExp->pExpr->_function.num = 1;
|
||||||
pExp->pExpr->_function.functionId = -1;
|
pExp->pExpr->_function.functionId = -1;
|
||||||
|
|
||||||
|
int32_t type = nodeType(pTargetNode->pExpr);
|
||||||
// it is a project query, or group by column
|
// it is a project query, or group by column
|
||||||
if (nodeType(pTargetNode->pExpr) == QUERY_NODE_COLUMN) {
|
if (type == QUERY_NODE_COLUMN) {
|
||||||
pExp->pExpr->nodeType = QUERY_NODE_COLUMN;
|
pExp->pExpr->nodeType = QUERY_NODE_COLUMN;
|
||||||
SColumnNode* pColNode = (SColumnNode*)pTargetNode->pExpr;
|
SColumnNode* pColNode = (SColumnNode*)pTargetNode->pExpr;
|
||||||
|
|
||||||
|
@ -6492,7 +6323,7 @@ SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t*
|
||||||
pExp->base.resSchema = createResSchema(pType->type, pType->bytes, pTargetNode->slotId, pType->scale, pType->precision, pColNode->colName);
|
pExp->base.resSchema = createResSchema(pType->type, pType->bytes, pTargetNode->slotId, pType->scale, pType->precision, pColNode->colName);
|
||||||
pExp->base.pParam[0].pCol = createColumn(pColNode->dataBlockId, pColNode->slotId, pType);
|
pExp->base.pParam[0].pCol = createColumn(pColNode->dataBlockId, pColNode->slotId, pType);
|
||||||
pExp->base.pParam[0].type = FUNC_PARAM_TYPE_COLUMN;
|
pExp->base.pParam[0].type = FUNC_PARAM_TYPE_COLUMN;
|
||||||
} else if (nodeType(pTargetNode->pExpr) == QUERY_NODE_VALUE) {
|
} else if (type == QUERY_NODE_VALUE) {
|
||||||
pExp->pExpr->nodeType = QUERY_NODE_VALUE;
|
pExp->pExpr->nodeType = QUERY_NODE_VALUE;
|
||||||
SValueNode* pValNode = (SValueNode*)pTargetNode->pExpr;
|
SValueNode* pValNode = (SValueNode*)pTargetNode->pExpr;
|
||||||
|
|
||||||
|
@ -6503,7 +6334,7 @@ SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t*
|
||||||
pExp->base.resSchema = createResSchema(pType->type, pType->bytes, pTargetNode->slotId, pType->scale, pType->precision, pValNode->node.aliasName);
|
pExp->base.resSchema = createResSchema(pType->type, pType->bytes, pTargetNode->slotId, pType->scale, pType->precision, pValNode->node.aliasName);
|
||||||
pExp->base.pParam[0].type = FUNC_PARAM_TYPE_VALUE;
|
pExp->base.pParam[0].type = FUNC_PARAM_TYPE_VALUE;
|
||||||
valueNodeToVariant(pValNode, &pExp->base.pParam[0].param);
|
valueNodeToVariant(pValNode, &pExp->base.pParam[0].param);
|
||||||
} else if (nodeType(pTargetNode->pExpr) == QUERY_NODE_FUNCTION) {
|
} else if (type == QUERY_NODE_FUNCTION) {
|
||||||
pExp->pExpr->nodeType = QUERY_NODE_FUNCTION;
|
pExp->pExpr->nodeType = QUERY_NODE_FUNCTION;
|
||||||
SFunctionNode* pFuncNode = (SFunctionNode*)pTargetNode->pExpr;
|
SFunctionNode* pFuncNode = (SFunctionNode*)pTargetNode->pExpr;
|
||||||
|
|
||||||
|
@ -6514,14 +6345,13 @@ SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t*
|
||||||
pExp->pExpr->_function.pFunctNode = pFuncNode;
|
pExp->pExpr->_function.pFunctNode = pFuncNode;
|
||||||
strncpy(pExp->pExpr->_function.functionName, pFuncNode->functionName, tListLen(pExp->pExpr->_function.functionName));
|
strncpy(pExp->pExpr->_function.functionName, pFuncNode->functionName, tListLen(pExp->pExpr->_function.functionName));
|
||||||
|
|
||||||
// TODO: value parameter needs to be handled
|
|
||||||
int32_t numOfParam = LIST_LENGTH(pFuncNode->pParameterList);
|
int32_t numOfParam = LIST_LENGTH(pFuncNode->pParameterList);
|
||||||
|
|
||||||
pExp->base.pParam = taosMemoryCalloc(numOfParam, sizeof(SFunctParam));
|
pExp->base.pParam = taosMemoryCalloc(numOfParam, sizeof(SFunctParam));
|
||||||
pExp->base.numOfParams = numOfParam;
|
pExp->base.numOfParams = numOfParam;
|
||||||
|
|
||||||
for (int32_t j = 0; j < numOfParam; ++j) {
|
for (int32_t j = 0; j < numOfParam; ++j) {
|
||||||
SNode* p1 = nodesListGetNode(pFuncNode->pParameterList, j);
|
SNode* p1 = nodesListGetNode(pFuncNode->pParameterList, j);
|
||||||
if (p1->type == QUERY_NODE_COLUMN) {
|
if (p1->type == QUERY_NODE_COLUMN) {
|
||||||
SColumnNode* pcn = (SColumnNode*) p1;
|
SColumnNode* pcn = (SColumnNode*) p1;
|
||||||
|
|
||||||
|
@ -6530,9 +6360,10 @@ SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t*
|
||||||
} else if (p1->type == QUERY_NODE_VALUE) {
|
} else if (p1->type == QUERY_NODE_VALUE) {
|
||||||
SValueNode* pvn = (SValueNode*)p1;
|
SValueNode* pvn = (SValueNode*)p1;
|
||||||
pExp->base.pParam[j].type = FUNC_PARAM_TYPE_VALUE;
|
pExp->base.pParam[j].type = FUNC_PARAM_TYPE_VALUE;
|
||||||
|
valueNodeToVariant(pvn, &pExp->base.pParam[j].param);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else if (nodeType(pTargetNode->pExpr) == QUERY_NODE_OPERATOR) {
|
} else if (type == QUERY_NODE_OPERATOR) {
|
||||||
pExp->pExpr->nodeType = QUERY_NODE_OPERATOR;
|
pExp->pExpr->nodeType = QUERY_NODE_OPERATOR;
|
||||||
SOperatorNode* pNode = (SOperatorNode*)pTargetNode->pExpr;
|
SOperatorNode* pNode = (SOperatorNode*)pTargetNode->pExpr;
|
||||||
|
|
||||||
|
@ -6541,11 +6372,7 @@ SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t*
|
||||||
|
|
||||||
SDataType* pType = &pNode->node.resType;
|
SDataType* pType = &pNode->node.resType;
|
||||||
pExp->base.resSchema = createResSchema(pType->type, pType->bytes, pTargetNode->slotId, pType->scale, pType->precision, pNode->node.aliasName);
|
pExp->base.resSchema = createResSchema(pType->type, pType->bytes, pTargetNode->slotId, pType->scale, pType->precision, pNode->node.aliasName);
|
||||||
|
|
||||||
pExp->pExpr->_optrRoot.pRootNode = pTargetNode->pExpr;
|
pExp->pExpr->_optrRoot.pRootNode = pTargetNode->pExpr;
|
||||||
|
|
||||||
// pExp->base.pParam[0].type = FUNC_PARAM_TYPE_COLUMN;
|
|
||||||
// pExp->base.pParam[0].pCol = createColumn(pTargetNode->dataBlockId, pTargetNode->slotId, pType);
|
|
||||||
} else {
|
} else {
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue