Merge branch '3.0' into feature/TD-11463-3.0

This commit is contained in:
Cary Xu 2022-04-18 15:10:21 +08:00
commit ca93af74a1
6 changed files with 192 additions and 377 deletions

View File

@ -220,14 +220,9 @@ typedef struct SExecTaskInfo {
} SExecTaskInfo; } SExecTaskInfo;
typedef struct STaskRuntimeEnv { typedef struct STaskRuntimeEnv {
jmp_buf env;
STaskAttr* pQueryAttr; STaskAttr* pQueryAttr;
uint32_t status; // query status uint32_t status; // query status
void* qinfo;
uint8_t scanFlag; // denotes reversed scan of data or not uint8_t scanFlag; // denotes reversed scan of data or not
void* pTsdbReadHandle;
bool enableGroupData;
SDiskbasedBuf* pResultBuf; // query result buffer based on blocked-wised disk file SDiskbasedBuf* pResultBuf; // query result buffer based on blocked-wised disk file
SHashObj* pResultRowHashTable; // quick locate the window object for each result SHashObj* pResultRowHashTable; // quick locate the window object for each result
SHashObj* pResultRowListSet; // used to check if current ResultRowInfo has ResultRow object or not SHashObj* pResultRowListSet; // used to check if current ResultRowInfo has ResultRow object or not
@ -235,12 +230,10 @@ typedef struct STaskRuntimeEnv {
char* keyBuf; // window key buffer char* keyBuf; // window key buffer
// The window result objects pool, all the resultRow Objects are allocated and managed by this object. // The window result objects pool, all the resultRow Objects are allocated and managed by this object.
char** prevRow; char** prevRow;
SArray* prevResult; // intermediate result, SArray<SInterResult>
STSBuf* pTsBuf; // timestamp filter list STSBuf* pTsBuf; // timestamp filter list
STSCursor cur; STSCursor cur;
char* tagVal; // tag value of current data block char* tagVal; // tag value of current data block
struct SScalarFunctionSupport* scalarSup;
STableGroupInfo tableqinfoGroupInfo; // this is a group array list, including SArray<STableQueryInfo*> structure STableGroupInfo tableqinfoGroupInfo; // this is a group array list, including SArray<STableQueryInfo*> structure
struct SOperatorInfo* proot; struct SOperatorInfo* proot;
SGroupResInfo groupResInfo; SGroupResInfo groupResInfo;
@ -266,7 +259,6 @@ typedef struct SOperatorInfo {
char* name; // name, used to show the query execution plan char* name; // name, used to show the query execution plan
void* info; // extension attribution void* info; // extension attribution
SExprInfo* pExpr; SExprInfo* pExpr;
STaskRuntimeEnv* pRuntimeEnv; // todo remove it
SExecTaskInfo* pTaskInfo; SExecTaskInfo* pTaskInfo;
SOperatorCostInfo cost; SOperatorCostInfo cost;
SResultInfo resultInfo; SResultInfo resultInfo;
@ -291,7 +283,7 @@ typedef struct {
typedef enum { typedef enum {
EX_SOURCE_DATA_NOT_READY = 0x1, EX_SOURCE_DATA_NOT_READY = 0x1,
EX_SOURCE_DATA_READY = 0x2, EX_SOURCE_DATA_READY = 0x2,
EX_SOURCE_DATA_EXHAUSTED = 0x3, EX_SOURCE_DATA_EXHAUSTED = 0x3,
} EX_SOURCE_STATUS; } EX_SOURCE_STATUS;
@ -682,7 +674,7 @@ SOperatorInfo* createTagScanOperatorInfo(SReaderHandle* pReaderHandle, SExprInfo
void projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock, SqlFunctionCtx* pCtx, void projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock, SqlFunctionCtx* pCtx,
int32_t numOfOutput, SArray* pPseudoList); int32_t numOfOutput, SArray* pPseudoList);
void setInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order); void setInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order, bool createDummyCol);
void finalizeQueryResult(SqlFunctionCtx* pCtx, int32_t numOfOutput); void finalizeQueryResult(SqlFunctionCtx* pCtx, int32_t numOfOutput);
void copyTsColoum(SSDataBlock* pRes, SqlFunctionCtx* pCtx, int32_t numOfOutput); void copyTsColoum(SSDataBlock* pRes, SqlFunctionCtx* pCtx, int32_t numOfOutput);

View File

@ -186,10 +186,9 @@ static void getNextTimeWindow(SInterval* pInterval, int32_t precision, int32_t o
static void doSetTagValueToResultBuf(char* output, const char* val, int16_t type, int16_t bytes); static void doSetTagValueToResultBuf(char* output, const char* val, int16_t type, int16_t bytes);
static bool functionNeedToExecute(SqlFunctionCtx* pCtx); static bool functionNeedToExecute(SqlFunctionCtx* pCtx);
static void setBlockStatisInfo(SqlFunctionCtx* pCtx, SExprInfo* pExpr, SSDataBlock* pSDataBlock, SColumn* pColumn); static void setBlockStatisInfo(SqlFunctionCtx* pCtx, SExprInfo* pExpr, SSDataBlock* pSDataBlock);
static void destroyTableQueryInfoImpl(STableQueryInfo* pTableQueryInfo); static void destroyTableQueryInfoImpl(STableQueryInfo* pTableQueryInfo);
static bool hasMainOutput(STaskAttr* pQueryAttr);
static SColumnInfo* extractColumnFilterInfo(SExprInfo* pExpr, int32_t numOfOutput, int32_t* numOfFilterCols); static SColumnInfo* extractColumnFilterInfo(SExprInfo* pExpr, int32_t numOfOutput, int32_t* numOfFilterCols);
@ -456,7 +455,6 @@ static SResultRow* doSetResultOutBufByKey_rv(SDiskbasedBuf* pResultBuf, SResultR
if (p1 != NULL) { if (p1 != NULL) {
if (pResultRowInfo->size == 0) { if (pResultRowInfo->size == 0) {
existInCurrentResusltRowInfo = false; // this time window created by other timestamp that does not belongs to current table. existInCurrentResusltRowInfo = false; // this time window created by other timestamp that does not belongs to current table.
// assert(pResultRowInfo->curPos == -1);
} else if (pResultRowInfo->size == 1) { } else if (pResultRowInfo->size == 1) {
SResultRowPosition* p = &pResultRowInfo->pPosition[0]; SResultRowPosition* p = &pResultRowInfo->pPosition[0];
existInCurrentResusltRowInfo = (p->pageId == p1->pageId && p->offset == p1->offset); existInCurrentResusltRowInfo = (p->pageId == p1->pageId && p->offset == p1->offset);
@ -465,7 +463,6 @@ static SResultRow* doSetResultOutBufByKey_rv(SDiskbasedBuf* pResultBuf, SResultR
int64_t* index = taosHashGet(pSup->pResultRowListSet, pSup->keyBuf, GET_RES_EXT_WINDOW_KEY_LEN(bytes)); int64_t* index = taosHashGet(pSup->pResultRowListSet, pSup->keyBuf, GET_RES_EXT_WINDOW_KEY_LEN(bytes));
if (index != NULL) { if (index != NULL) {
// TODO check the scan order for current opened time window // TODO check the scan order for current opened time window
// pResultRowInfo->curPos = (int32_t)*index;
existInCurrentResusltRowInfo = true; existInCurrentResusltRowInfo = true;
} else { } else {
existInCurrentResusltRowInfo = false; existInCurrentResusltRowInfo = false;
@ -505,7 +502,6 @@ static SResultRow* doSetResultOutBufByKey_rv(SDiskbasedBuf* pResultBuf, SResultR
} }
// 2. set the new time window to be the new active time window // 2. set the new time window to be the new active time window
// pResultRowInfo->curPos = pResultRowInfo->size;
pResultRowInfo->pPosition[pResultRowInfo->size++] = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset}; pResultRowInfo->pPosition[pResultRowInfo->size++] = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset};
pResultRowInfo->cur = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset}; pResultRowInfo->cur = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset};
SET_RES_EXT_WINDOW_KEY(pSup->keyBuf, pData, bytes, uid, pResultRowInfo); SET_RES_EXT_WINDOW_KEY(pSup->keyBuf, pData, bytes, uid, pResultRowInfo);
@ -1035,40 +1031,84 @@ static TSKEY getStartTsKey(STimeWindow* win, const TSKEY* tsCols, int32_t rows,
return ts; return ts;
} }
static void doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order); static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order, bool createDummyCol);
static void doSetInputDataBlockInfo(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order) { static void doSetInputDataBlockInfo(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order) {
for (int32_t i = 0; i < pOperator->numOfOutput; ++i) { for (int32_t i = 0; i < pOperator->numOfOutput; ++i) {
pCtx[i].order = order; pCtx[i].order = order;
pCtx[i].size = pBlock->info.rows; pCtx[i].size = pBlock->info.rows;
setBlockStatisInfo(&pCtx[i], &pOperator->pExpr[i], pBlock, NULL /*&pOperator->pExpr[i].base.colInfo*/); setBlockStatisInfo(&pCtx[i], &pOperator->pExpr[i], pBlock);
} }
} }
void setInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order) { void setInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order, bool createDummyCol) {
if (pBlock->pBlockAgg != NULL) { if (pBlock->pBlockAgg != NULL) {
doSetInputDataBlockInfo(pOperator, pCtx, pBlock, order); doSetInputDataBlockInfo(pOperator, pCtx, pBlock, order);
} else { } else {
doSetInputDataBlock(pOperator, pCtx, pBlock, order); doSetInputDataBlock(pOperator, pCtx, pBlock, order, createDummyCol);
} }
} }
static void doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order) { static int32_t doCreateConstantValColumnInfo(SInputColumnInfoData* pInput, SFunctParam* pFuncParam, int32_t type, int32_t paramIndex, int32_t numOfRows) {
SColumnInfoData* pColInfo = NULL;
if (pInput->pData[paramIndex] == NULL) {
pColInfo = taosMemoryCalloc(1, sizeof(SColumnInfoData));
if (pColInfo == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
// Set the correct column info (data type and bytes)
pColInfo->info.type = type;
pColInfo->info.bytes = tDataTypes[type].bytes;
pInput->pData[paramIndex] = pColInfo;
}
ASSERT(!IS_VAR_DATA_TYPE(type));
colInfoDataEnsureCapacity(pColInfo, numOfRows);
if (type == TSDB_DATA_TYPE_BIGINT || type == TSDB_DATA_TYPE_UBIGINT) {
int64_t v = pFuncParam->param.i;
for(int32_t i = 0; i < numOfRows; ++i) {
colDataAppendInt64(pColInfo, i, &v);
}
} else if (type == TSDB_DATA_TYPE_DOUBLE) {
double v = pFuncParam->param.d;
for(int32_t i = 0; i < numOfRows; ++i) {
colDataAppendDouble(pColInfo, i, &v);
}
}
return TSDB_CODE_SUCCESS;
}
static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order, bool createDummyCol) {
int32_t code = TSDB_CODE_SUCCESS;
for (int32_t i = 0; i < pOperator->numOfOutput; ++i) { for (int32_t i = 0; i < pOperator->numOfOutput; ++i) {
pCtx[i].order = order; pCtx[i].order = order;
pCtx[i].size = pBlock->info.rows; pCtx[i].size = pBlock->info.rows;
pCtx[i].currentStage = MAIN_SCAN; pCtx[i].currentStage = MAIN_SCAN;
SInputColumnInfoData* pInput = &pCtx[i].input;
SExprInfo* pOneExpr = &pOperator->pExpr[i]; SExprInfo* pOneExpr = &pOperator->pExpr[i];
for (int32_t j = 0; j < pOneExpr->base.numOfParams; ++j) { for (int32_t j = 0; j < pOneExpr->base.numOfParams; ++j) {
SFunctParam *pFuncParam = &pOneExpr->base.pParam[j]; SFunctParam *pFuncParam = &pOneExpr->base.pParam[j];
if (pFuncParam->type == FUNC_PARAM_TYPE_COLUMN) { if (pFuncParam->type == FUNC_PARAM_TYPE_COLUMN) {
int32_t slotId = pFuncParam->pCol->slotId; int32_t slotId = pFuncParam->pCol->slotId;
pCtx[i].input.pData[j] = taosArrayGet(pBlock->pDataBlock, slotId); pInput->pData[j] = taosArrayGet(pBlock->pDataBlock, slotId);
pCtx[i].input.totalRows = pBlock->info.rows; pInput->totalRows = pBlock->info.rows;
pCtx[i].input.numOfRows = pBlock->info.rows; pInput->numOfRows = pBlock->info.rows;
pCtx[i].input.startRowIndex = 0; pInput->startRowIndex = 0;
ASSERT(pCtx[i].input.pData[j] != NULL); ASSERT(pInput->pData[j] != NULL);
} else if (pFuncParam->type == FUNC_PARAM_TYPE_VALUE) {
if (createDummyCol) {
code = doCreateConstantValColumnInfo(pInput, pFuncParam, pFuncParam->param.nType, j, pBlock->info.rows);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
} }
} }
@ -1111,6 +1151,8 @@ static void doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx,
// } // }
// } // }
} }
return code;
} }
static void doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SqlFunctionCtx* pCtx) { static void doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SqlFunctionCtx* pCtx) {
@ -1200,7 +1242,6 @@ void projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock*
void doTimeWindowInterpolation(SOperatorInfo* pOperator, SOptrBasicInfo* pInfo, SArray* pDataBlock, TSKEY prevTs, void doTimeWindowInterpolation(SOperatorInfo* pOperator, SOptrBasicInfo* pInfo, SArray* pDataBlock, TSKEY prevTs,
int32_t prevRowIndex, TSKEY curTs, int32_t curRowIndex, TSKEY windowKey, int32_t type) { int32_t prevRowIndex, TSKEY curTs, int32_t curRowIndex, TSKEY windowKey, int32_t type) {
STaskRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
SExprInfo* pExpr = pOperator->pExpr; SExprInfo* pExpr = pOperator->pExpr;
SqlFunctionCtx* pCtx = pInfo->pCtx; SqlFunctionCtx* pCtx = pInfo->pCtx;
@ -1220,7 +1261,7 @@ void doTimeWindowInterpolation(SOperatorInfo* pOperator, SOptrBasicInfo* pInfo,
double v1 = 0, v2 = 0, v = 0; double v1 = 0, v2 = 0, v = 0;
if (prevRowIndex == -1) { if (prevRowIndex == -1) {
GET_TYPED_DATA(v1, double, pColInfo->info.type, (char*)pRuntimeEnv->prevRow[index]); // GET_TYPED_DATA(v1, double, pColInfo->info.type, (char*)pRuntimeEnv->prevRow[index]);
} else { } else {
GET_TYPED_DATA(v1, double, pColInfo->info.type, (char*)pColInfo->pData + prevRowIndex * pColInfo->info.bytes); GET_TYPED_DATA(v1, double, pColInfo->info.type, (char*)pColInfo->pData + prevRowIndex * pColInfo->info.bytes);
} }
@ -1237,7 +1278,7 @@ void doTimeWindowInterpolation(SOperatorInfo* pOperator, SOptrBasicInfo* pInfo,
if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR) { if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR) {
if (prevRowIndex == -1) { if (prevRowIndex == -1) {
pCtx[k].start.ptr = (char*)pRuntimeEnv->prevRow[index]; // pCtx[k].start.ptr = (char*)pRuntimeEnv->prevRow[index];
} else { } else {
pCtx[k].start.ptr = (char*)pColInfo->pData + prevRowIndex * pColInfo->info.bytes; pCtx[k].start.ptr = (char*)pColInfo->pData + prevRowIndex * pColInfo->info.bytes;
} }
@ -1507,86 +1548,6 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe
// updateResultRowInfoActiveIndex(pResultRowInfo, &pInfo->win, pRuntimeEnv->current->lastKey, true, false); // updateResultRowInfoActiveIndex(pResultRowInfo, &pInfo->win, pRuntimeEnv->current->lastKey, true, false);
} }
static void hashAllIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pSDataBlock,
int32_t tableGroupId) {
STableIntervalOperatorInfo* pInfo = (STableIntervalOperatorInfo*)pOperatorInfo->info;
STaskRuntimeEnv* pRuntimeEnv = pOperatorInfo->pRuntimeEnv;
int32_t numOfOutput = pOperatorInfo->numOfOutput;
int32_t step = 1;//GET_FORWARD_DIRECTION_FACTOR(pQueryAttr->order.order);
bool ascQuery = true;
TSKEY* tsCols = NULL;
if (pSDataBlock->pDataBlock != NULL) {
SColumnInfoData* pColDataInfo = taosArrayGet(pSDataBlock->pDataBlock, 0);
tsCols = (int64_t*)pColDataInfo->pData;
assert(tsCols[0] == pSDataBlock->info.window.skey &&
tsCols[pSDataBlock->info.rows - 1] == pSDataBlock->info.window.ekey);
}
int32_t startPos = ascQuery ? 0 : (pSDataBlock->info.rows - 1);
TSKEY ts = getStartTsKey(&pSDataBlock->info.window, tsCols, pSDataBlock->info.rows, ascQuery);
STimeWindow win = {0};//getCurrentActiveTimeWindow(pResultRowInfo, ts, pQueryAttr);
bool masterScan = IS_MAIN_SCAN(pRuntimeEnv);
SResultRow* pResult = NULL;
int32_t forwardStep = 0;
int32_t ret = 0;
STimeWindow preWin = win;
while (1) {
// null data, failed to allocate more memory buffer
// ret = setResultOutputBufByKey(pRuntimeEnv, pResultRowInfo, pSDataBlock->info.uid, &win, masterScan, &pResult,
// tableGroupId, pInfo->binfo.pCtx, numOfOutput, pInfo->binfo.rowCellInfoOffset);
if (ret != TSDB_CODE_SUCCESS) {
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
}
TSKEY ekey = 0;//reviseWindowEkey(pQueryAttr, &win);
// forwardStep = getNumOfRowsInTimeWindow(pRuntimeEnv, &pSDataBlock->info, tsCols, startPos, ekey,
// binarySearchForKey, true);
// window start(end) key interpolation
// doWindowBorderInterpolation(pOperatorInfo, pSDataBlock, pInfo->binfo.pCtx, pResult, &win, startPos,
// forwardStep); doApplyFunctions(pRuntimeEnv, pInfo->binfo.pCtx, ascQuery ? &win : &preWin, startPos,
// forwardStep, tsCols, pSDataBlock->info.rows, numOfOutput);
preWin = win;
int32_t prevEndPos = (forwardStep - 1) * step + startPos;
// startPos = getNextQualifiedWindow(pQueryAttr, &win, &pSDataBlock->info, tsCols, binarySearchForKey,
// prevEndPos);
if (startPos < 0) {
// if ((ascQuery && win.skey <= pQueryAttr->window.ekey) || ((!ascQuery) && win.ekey >= pQueryAttr->window.ekey)) {
// int32_t code =
// setResultOutputBufByKey(pRuntimeEnv, pResultRowInfo, pSDataBlock->info.uid, &win, masterScan, &pResult,
// tableGroupId, pInfo->binfo.pCtx, numOfOutput, pInfo->binfo.rowCellInfoOffset);
// if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
// longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
// }
//
// startPos = pSDataBlock->info.rows - 1;
// window start(end) key interpolation
// doWindowBorderInterpolation(pOperatorInfo, pSDataBlock, pInfo->binfo.pCtx, pResult, &win, startPos,
// forwardStep); doApplyFunctions(pRuntimeEnv, pInfo->binfo.pCtx, ascQuery ? &win : &preWin, startPos,
// forwardStep, tsCols, pSDataBlock->info.rows, numOfOutput);
// }
break;
}
setResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
}
// if (pQueryAttr->timeWindowInterpo) {
// int32_t rowIndex = ascQuery ? (pSDataBlock->info.rows - 1) : 0;
// saveDataBlockLastRow(pRuntimeEnv, &pSDataBlock->info, pSDataBlock->pDataBlock, rowIndex);
// }
// updateResultRowInfoActiveIndex(pResultRowInfo, pQueryAttr, pRuntimeEnv->current->lastKey);
}
static void doKeepTuple(SWindowRowsSup* pRowSup, int64_t ts) { static void doKeepTuple(SWindowRowsSup* pRowSup, int64_t ts) {
pRowSup->win.ekey = ts; pRowSup->win.ekey = ts;
pRowSup->prevTs = ts; pRowSup->prevTs = ts;
@ -1730,30 +1691,85 @@ static bool functionNeedToExecute(SqlFunctionCtx* pCtx) {
return true; return true;
} }
void setBlockStatisInfo(SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, SSDataBlock* pBlock, SColumn* pColumn) { static int32_t doCreateConstantValColumnAggInfo(SInputColumnInfoData* pInput, SFunctParam* pFuncParam, int32_t type, int32_t paramIndex, int32_t numOfRows) {
if (pBlock->pBlockAgg != NULL /*&& TSDB_COL_IS_NORMAL_COL(pColumn->flag)*/) { if (pInput->pData[paramIndex] == NULL) {
pInput->pData[paramIndex] = taosMemoryCalloc(1, sizeof(SColumnInfoData));
if (pInput->pData[paramIndex] == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
// Set the correct column info (data type and bytes)
pInput->pData[paramIndex]->info.type = type;
pInput->pData[paramIndex]->info.bytes = tDataTypes[type].bytes;
}
SColumnDataAgg* da = NULL;
if (pInput->pColumnDataAgg[paramIndex] == NULL) {
da = taosMemoryCalloc(1, sizeof(SColumnDataAgg));
pInput->pColumnDataAgg[paramIndex] = da;
if (da == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
} else {
da = pInput->pColumnDataAgg[paramIndex];
}
ASSERT(!IS_VAR_DATA_TYPE(type));
if (type == TSDB_DATA_TYPE_BIGINT) {
int64_t v = pFuncParam->param.i;
*da = (SColumnDataAgg) {.numOfNull = 0, .min = v, .max = v, .maxIndex = 0, .minIndex = 0, .sum = v * numOfRows};
} else if (type == TSDB_DATA_TYPE_DOUBLE) {
double v = pFuncParam->param.d;
*da = (SColumnDataAgg) {.numOfNull = 0, .maxIndex = 0, .minIndex = 0};
*(double*) &da->min = v;
*(double*) &da->max = v;
*(double*) &da->sum = v * numOfRows;
} else if (type == TSDB_DATA_TYPE_BOOL) { // todo validate this data type
bool v = pFuncParam->param.i;
*da = (SColumnDataAgg) {.numOfNull = 0, .maxIndex = 0, .minIndex = 0};
*(bool*) &da->min = 0;
*(bool*) &da->max = v;
*(bool*) &da->sum = v * numOfRows;
} else if (type == TSDB_DATA_TYPE_TIMESTAMP) {
// do nothing
} else {
ASSERT(0);
}
return TSDB_CODE_SUCCESS;
}
void setBlockStatisInfo(SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, SSDataBlock* pBlock) {
int32_t numOfRows = pBlock->info.rows;
SInputColumnInfoData* pInput = &pCtx->input;
pInput->numOfRows = numOfRows;
pInput->totalRows = numOfRows;
if (pBlock->pBlockAgg != NULL) {
pInput->colDataAggIsSet = true;
for (int32_t j = 0; j < pExprInfo->base.numOfParams; ++j) { for (int32_t j = 0; j < pExprInfo->base.numOfParams; ++j) {
SFunctParam* pFuncParam = &pExprInfo->base.pParam[j]; SFunctParam* pFuncParam = &pExprInfo->base.pParam[j];
if (pFuncParam->type == FUNC_PARAM_TYPE_COLUMN) { if (pFuncParam->type == FUNC_PARAM_TYPE_COLUMN) {
int32_t slotId = pFuncParam->pCol->slotId; int32_t slotId = pFuncParam->pCol->slotId;
SInputColumnInfoData* pInput = &pCtx->input;
pInput->pColumnDataAgg[j] = &pBlock->pBlockAgg[slotId]; pInput->pColumnDataAgg[j] = &pBlock->pBlockAgg[slotId];
pInput->colDataAggIsSet = true;
pInput->numOfRows = pBlock->info.rows;
pInput->totalRows = pBlock->info.rows;
// Here we set the column info data since the data type for each column data is required, but // Here we set the column info data since the data type for each column data is required, but
// the data in the corresponding SColumnInfoData will not be used. // the data in the corresponding SColumnInfoData will not be used.
pInput->pData[j] = taosArrayGet(pBlock->pDataBlock, slotId); pInput->pData[j] = taosArrayGet(pBlock->pDataBlock, slotId);
} else if (pFuncParam->type == FUNC_PARAM_TYPE_VALUE) {
doCreateConstantValColumnAggInfo(pInput, pFuncParam, pFuncParam->param.nType, j, pBlock->info.rows);
} }
} }
} else { } else {
pCtx->input.colDataAggIsSet = false; pInput->colDataAggIsSet = false;
} }
// pCtx->hasNull = hasNull(pColumn, pAgg);
// set the statistics data for primary time stamp column // set the statistics data for primary time stamp column
// if (pCtx->functionId == FUNCTION_SPREAD && pColumn->colId == PRIMARYKEY_TIMESTAMP_COL_ID) { // if (pCtx->functionId == FUNCTION_SPREAD && pColumn->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
// pCtx->isAggSet = true; // pCtx->isAggSet = true;
@ -2224,33 +2240,6 @@ static bool overlapWithTimeWindow(STaskAttr* pQueryAttr, SDataBlockInfo* pBlockI
return false; return false;
} }
static int32_t doTSJoinFilter(STaskRuntimeEnv* pRuntimeEnv, TSKEY key, bool ascQuery) {
STSElem elem = tsBufGetElem(pRuntimeEnv->pTsBuf);
#if defined(_DEBUG_VIEW)
printf("elem in comp ts file:%" PRId64 ", key:%" PRId64 ", tag:%" PRIu64
", query order:%d, ts order:%d, traverse:%d, index:%d\n",
elem.ts, key, elem.tag.i, pQueryAttr->order.order, pRuntimeEnv->pTsBuf->tsOrder,
pRuntimeEnv->pTsBuf->cur.order, pRuntimeEnv->pTsBuf->cur.tsIndex);
#endif
if (ascQuery) {
if (key < elem.ts) {
return TS_JOIN_TS_NOT_EQUALS;
} else if (key > elem.ts) {
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_INCONSISTAN);
}
} else {
if (key > elem.ts) {
return TS_JOIN_TS_NOT_EQUALS;
} else if (key < elem.ts) {
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_INCONSISTAN);
}
}
return TS_JOIN_TS_EQUAL;
}
void doCompactSDataBlock(SSDataBlock* pBlock, int32_t numOfRows, int8_t* p) { void doCompactSDataBlock(SSDataBlock* pBlock, int32_t numOfRows, int8_t* p) {
int32_t len = 0; int32_t len = 0;
int32_t start = 0; int32_t start = 0;
@ -2300,54 +2289,6 @@ void doCompactSDataBlock(SSDataBlock* pBlock, int32_t numOfRows, int8_t* p) {
} }
} }
void filterColRowsInDataBlock(STaskRuntimeEnv* pRuntimeEnv, SSDataBlock* pBlock, bool ascQuery) {
int32_t numOfRows = pBlock->info.rows;
int8_t* p = NULL;
bool all = true;
if (pRuntimeEnv->pTsBuf != NULL) {
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, 0);
p = taosMemoryCalloc(numOfRows, sizeof(int8_t));
TSKEY* k = (TSKEY*)pColInfoData->pData;
for (int32_t i = 0; i < numOfRows; ++i) {
int32_t offset = ascQuery ? i : (numOfRows - i - 1);
int32_t ret = doTSJoinFilter(pRuntimeEnv, k[offset], ascQuery);
if (ret == TS_JOIN_TAG_NOT_EQUALS) {
break;
} else if (ret == TS_JOIN_TS_NOT_EQUALS) {
all = false;
continue;
} else {
assert(ret == TS_JOIN_TS_EQUAL);
p[offset] = true;
}
if (!tsBufNextPos(pRuntimeEnv->pTsBuf)) {
break;
}
}
// save the cursor status
// pRuntimeEnv->current->cur = tsBufGetCursor(pRuntimeEnv->pTsBuf);
} else {
// all = filterExecute(pRuntimeEnv->pQueryAttr->pFilters, numOfRows, &p, pBlock->pBlockAgg,
// pRuntimeEnv->pQueryAttr->numOfCols);
}
if (!all) {
if (p) {
doCompactSDataBlock(pBlock, numOfRows, p);
} else {
pBlock->info.rows = 0;
pBlock->pBlockAgg = NULL; // clean the block statistics info
}
}
taosMemoryFreeClear(p);
}
static SColumnInfo* doGetTagColumnInfoById(SColumnInfo* pTagColList, int32_t numOfTags, int16_t colId); static SColumnInfo* doGetTagColumnInfoById(SColumnInfo* pTagColList, int32_t numOfTags, int16_t colId);
static void doSetTagValueInParam(void* pTable, int32_t tagColId, SVariant* tag, int16_t type, int16_t bytes); static void doSetTagValueInParam(void* pTable, int32_t tagColId, SVariant* tag, int16_t type, int16_t bytes);
@ -2936,18 +2877,6 @@ void finalizeUpdatedResult(SqlFunctionCtx* pCtx, int32_t numOfOutput, SDiskbased
} }
} }
static bool hasMainOutput(STaskAttr* pQueryAttr) {
for (int32_t i = 0; i < pQueryAttr->numOfOutput; ++i) {
int32_t functionId = getExprFunctionId(&pQueryAttr->pExpr1[i]);
if (functionId != FUNCTION_TS && functionId != FUNCTION_TAG && functionId != FUNCTION_TAGPRJ) {
return true;
}
}
return false;
}
STableQueryInfo* createTableQueryInfo(void* buf, bool groupbyColumn, STimeWindow win) { STableQueryInfo* createTableQueryInfo(void* buf, bool groupbyColumn, STimeWindow win) {
STableQueryInfo* pTableQueryInfo = buf; STableQueryInfo* pTableQueryInfo = buf;
pTableQueryInfo->lastKey = win.skey; pTableQueryInfo->lastKey = win.skey;
@ -2987,8 +2916,12 @@ void setResultRowOutputBufInitCtx_rv(SResultRow* pResult, SqlFunctionCtx* pCtx,
continue; continue;
} }
if (!pResInfo->initialized && pCtx[i].functionId != -1) { if (!pResInfo->initialized) {
pCtx[i].fpSet.init(&pCtx[i], pResInfo); if (pCtx[i].functionId != -1) {
pCtx[i].fpSet.init(&pCtx[i], pResInfo);
} else {
pResInfo->initialized = true;
}
} }
} }
} }
@ -3111,48 +3044,6 @@ void setCtxTagForJoin(STaskRuntimeEnv* pRuntimeEnv, SqlFunctionCtx* pCtx, SExprI
// } // }
} }
int32_t setTimestampListJoinInfo(STaskRuntimeEnv* pRuntimeEnv, SVariant* pTag, STableQueryInfo* pTableQueryInfo) {
STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
assert(pRuntimeEnv->pTsBuf != NULL);
#if 0
// both the master and supplement scan needs to set the correct ts comp start position
if (pTableQueryInfo->cur.vgroupIndex == -1) {
taosVariantAssign(&pTableQueryInfo->tag, pTag);
STSElem elem = tsBufGetElemStartPos(pRuntimeEnv->pTsBuf, pQueryAttr->vgId, &pTableQueryInfo->tag);
// failed to find data with the specified tag value and vnodeId
if (!tsBufIsValidElem(&elem)) {
if (pTag->nType == TSDB_DATA_TYPE_BINARY || pTag->nType == TSDB_DATA_TYPE_NCHAR) {
//qError("QInfo:0x%"PRIx64" failed to find tag:%s in ts_comp", GET_TASKID(pRuntimeEnv), pTag->pz);
} else {
//qError("QInfo:0x%"PRIx64" failed to find tag:%" PRId64 " in ts_comp", GET_TASKID(pRuntimeEnv), pTag->i);
}
return -1;
}
// Keep the cursor info of current table
pTableQueryInfo->cur = tsBufGetCursor(pRuntimeEnv->pTsBuf);
if (pTag->nType == TSDB_DATA_TYPE_BINARY || pTag->nType == TSDB_DATA_TYPE_NCHAR) {
//qDebug("QInfo:0x%"PRIx64" find tag:%s start pos in ts_comp, blockIndex:%d, tsIndex:%d", GET_TASKID(pRuntimeEnv), pTag->pz, pTableQueryInfo->cur.blockIndex, pTableQueryInfo->cur.tsIndex);
} else {
//qDebug("QInfo:0x%"PRIx64" find tag:%"PRId64" start pos in ts_comp, blockIndex:%d, tsIndex:%d", GET_TASKID(pRuntimeEnv), pTag->i, pTableQueryInfo->cur.blockIndex, pTableQueryInfo->cur.tsIndex);
}
} else {
tsBufSetCursor(pRuntimeEnv->pTsBuf, &pTableQueryInfo->cur);
if (pTag->nType == TSDB_DATA_TYPE_BINARY || pTag->nType == TSDB_DATA_TYPE_NCHAR) {
//qDebug("QInfo:0x%"PRIx64" find tag:%s start pos in ts_comp, blockIndex:%d, tsIndex:%d", GET_TASKID(pRuntimeEnv), pTag->pz, pTableQueryInfo->cur.blockIndex, pTableQueryInfo->cur.tsIndex);
} else {
//qDebug("QInfo:0x%"PRIx64" find tag:%"PRId64" start pos in ts_comp, blockIndex:%d, tsIndex:%d", GET_TASKID(pRuntimeEnv), pTag->i, pTableQueryInfo->cur.blockIndex, pTableQueryInfo->cur.tsIndex);
}
}
#endif
return 0;
}
/* /*
* There are two cases to handle: * There are two cases to handle:
* *
@ -3323,19 +3214,15 @@ int32_t doFillTimeIntervalGapsInResults(struct SFillInfo* pFillInfo, SSDataBlock
return pOutput->info.rows; return pOutput->info.rows;
} }
void publishOperatorProfEvent(SOperatorInfo* operatorInfo, EQueryProfEventType eventType) { void publishOperatorProfEvent(SOperatorInfo* pOperator, EQueryProfEventType eventType) {
SQueryProfEvent event = {0}; SQueryProfEvent event = {0};
event.eventType = eventType; event.eventType = eventType;
event.eventTime = taosGetTimestampUs(); event.eventTime = taosGetTimestampUs();
event.operatorType = operatorInfo->operatorType; event.operatorType = pOperator->operatorType;
// if (pQInfo->summary.queryProfEvents) {
if (operatorInfo->pRuntimeEnv) { // taosArrayPush(pQInfo->summary.queryProfEvents, &event);
// SQInfo* pQInfo = operatorInfo->pRuntimeEnv->qinfo; // }
// if (pQInfo->summary.queryProfEvents) {
// taosArrayPush(pQInfo->summary.queryProfEvents, &event);
// }
}
} }
void publishQueryAbortEvent(SExecTaskInfo* pTaskInfo, int32_t code) { void publishQueryAbortEvent(SExecTaskInfo* pTaskInfo, int32_t code) {
@ -4587,7 +4474,7 @@ static SSDataBlock* doMerge(SOperatorInfo* pOperator) {
break; break;
} }
setInputDataBlock(pOperator, pInfo->binfo.pCtx, pDataBlock, TSDB_ORDER_ASC); setInputDataBlock(pOperator, pInfo->binfo.pCtx, pDataBlock, TSDB_ORDER_ASC, true);
// updateOutputBuf(&pInfo->binfo, &pAggInfo->bufCapacity, pBlock->info.rows * pAggInfo->resultRowFactor, // updateOutputBuf(&pInfo->binfo, &pAggInfo->bufCapacity, pBlock->info.rows * pAggInfo->resultRowFactor,
// pOperator->pRuntimeEnv, true); // pOperator->pRuntimeEnv, true);
doMergeImpl(pOperator, pOperator->numOfOutput, pDataBlock); doMergeImpl(pOperator, pOperator->numOfOutput, pDataBlock);
@ -4860,7 +4747,7 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
// the pDataBlock are always the same one, no need to call this again // the pDataBlock are always the same one, no need to call this again
setExecutionContext(pOperator->numOfOutput, pBlock->info.groupId, pTaskInfo, pAggInfo); setExecutionContext(pOperator->numOfOutput, pBlock->info.groupId, pTaskInfo, pAggInfo);
setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order); setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order, true);
doAggregateImpl(pOperator, 0, pInfo->pCtx); doAggregateImpl(pOperator, 0, pInfo->pCtx);
#if 0 // test for encode/decode result info #if 0 // test for encode/decode result info
@ -5086,7 +4973,7 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator, bool* newgroup)
// } // }
// the pDataBlock are always the same one, no need to call this again // the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pOperator, pInfo->pCtx, pBlock, TSDB_ORDER_ASC); setInputDataBlock(pOperator, pInfo->pCtx, pBlock, TSDB_ORDER_ASC, false);
blockDataEnsureCapacity(pInfo->pRes, pInfo->pRes->info.rows + pBlock->info.rows); blockDataEnsureCapacity(pInfo->pRes, pInfo->pRes->info.rows + pBlock->info.rows);
projectApplyFunctions(pOperator->pExpr, pInfo->pRes, pBlock, pInfo->pCtx, pOperator->numOfOutput, pProjectInfo->pPseudoColInfo); projectApplyFunctions(pOperator->pExpr, pInfo->pRes, pBlock, pInfo->pCtx, pOperator->numOfOutput, pProjectInfo->pPseudoColInfo);
@ -5172,7 +5059,7 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) {
// setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->pCtx, pOperator->numOfOutput); // setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->pCtx, pOperator->numOfOutput);
// the pDataBlock are always the same one, no need to call this again // the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, order); setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, order, true);
STableQueryInfo* pTableQueryInfo = pInfo->pCurrent; STableQueryInfo* pTableQueryInfo = pInfo->pCurrent;
setIntervalQueryRange(pTableQueryInfo, pBlock->info.window.skey, &pTaskInfo->window); setIntervalQueryRange(pTableQueryInfo, pBlock->info.window.skey, &pTaskInfo->window);
@ -5267,7 +5154,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo *pOperator, bool* newgroup
// setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->pCtx, pOperator->numOfOutput); // setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->pCtx, pOperator->numOfOutput);
// the pDataBlock are always the same one, no need to call this again // the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, order); setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, order, true);
pUpdated = hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, 0); pUpdated = hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, 0);
} }
@ -5312,8 +5199,8 @@ static SSDataBlock* doAllIntervalAgg(SOperatorInfo *pOperator, bool* newgroup) {
// setTagValue(pOperator, pRuntimeEnv->current->pTable, pIntervalInfo->pCtx, pOperator->numOfOutput); // setTagValue(pOperator, pRuntimeEnv->current->pTable, pIntervalInfo->pCtx, pOperator->numOfOutput);
// the pDataBlock are always the same one, no need to call this again // the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pOperator, pSliceInfo->binfo.pCtx, pBlock, order); setInputDataBlock(pOperator, pSliceInfo->binfo.pCtx, pBlock, order, true);
hashAllIntervalAgg(pOperator, &pSliceInfo->binfo.resultRowInfo, pBlock, 0); // hashAllIntervalAgg(pOperator, &pSliceInfo->binfo.resultRowInfo, pBlock, 0);
} }
// restore the value // restore the value
@ -5362,7 +5249,7 @@ static SSDataBlock* doSTableIntervalAgg(SOperatorInfo* pOperator, bool* newgroup
// the pDataBlock are always the same one, no need to call this again // the pDataBlock are always the same one, no need to call this again
// setTagValue(pOperator, pTableQueryInfo->pTable, pIntervalInfo->pCtx, pOperator->numOfOutput); // setTagValue(pOperator, pTableQueryInfo->pTable, pIntervalInfo->pCtx, pOperator->numOfOutput);
setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, TSDB_ORDER_ASC); setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, TSDB_ORDER_ASC, true);
STableQueryInfo* pTableQueryInfo = pInfo->pCurrent; STableQueryInfo* pTableQueryInfo = pInfo->pCurrent;
setIntervalQueryRange(pTableQueryInfo, pBlock->info.window.skey, &pTaskInfo->window); setIntervalQueryRange(pTableQueryInfo, pBlock->info.window.skey, &pTaskInfo->window);
@ -5387,57 +5274,6 @@ static SSDataBlock* doSTableIntervalAgg(SOperatorInfo* pOperator, bool* newgroup
return pInfo->binfo.pRes->info.rows == 0 ? NULL : pInfo->binfo.pRes; return pInfo->binfo.pRes->info.rows == 0 ? NULL : pInfo->binfo.pRes;
} }
static SSDataBlock* doAllSTableIntervalAgg(SOperatorInfo* pOperator, bool* newgroup) {
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
STableIntervalOperatorInfo* pIntervalInfo = pOperator->info;
STaskRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
if (pOperator->status == OP_RES_TO_RETURN) {
// copyToSDataBlock(pRuntimeEnv, 3000, pIntervalInfo->pRes, pIntervalInfo->rowCellInfoOffset);
if (pIntervalInfo->binfo.pRes->info.rows == 0 || !hasRemainData(&pRuntimeEnv->groupResInfo)) {
pOperator->status = OP_EXEC_DONE;
}
return pIntervalInfo->binfo.pRes;
}
SOperatorInfo* downstream = pOperator->pDownstream[0];
while (1) {
publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = downstream->getNextFn(downstream, newgroup);
publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
break;
}
// the pDataBlock are always the same one, no need to call this again
STableQueryInfo* pTableQueryInfo = pRuntimeEnv->current;
// setTagValue(pOperator, pTableQueryInfo->pTable, pIntervalInfo->pCtx, pOperator->numOfOutput);
// setInputDataBlock(pOperator, pIntervalInfo->binfo.pCtx, pBlock, pQueryAttr->order.order);
// setIntervalQueryRange(pRuntimeEnv, pBlock->info.window.skey);
// hashAllIntervalAgg(pOperator, &pTableQueryInfo->resInfo, pBlock, pTableQueryInfo->groupIndex);
}
pOperator->status = OP_RES_TO_RETURN;
// pQueryAttr->order.order = order; // TODO : restore the order
setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED);
int64_t st = taosGetTimestampUs();
// copyToSDataBlock(pRuntimeEnv, 3000, pIntervalInfo->pRes, pIntervalInfo->rowCellInfoOffset);
if (pIntervalInfo->binfo.pRes->info.rows == 0 || !hasRemainData(&pRuntimeEnv->groupResInfo)) {
pOperator->status = OP_EXEC_DONE;
}
return pIntervalInfo->binfo.pRes;
}
static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorInfo* pInfo, SSDataBlock* pBlock) { static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorInfo* pInfo, SSDataBlock* pBlock) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SOptrBasicInfo* pBInfo = &pInfo->binfo; SOptrBasicInfo* pBInfo = &pInfo->binfo;
@ -5541,7 +5377,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator, bool* newgroup) {
break; break;
} }
setInputDataBlock(pOperator, pBInfo->pCtx, pBlock, order); setInputDataBlock(pOperator, pBInfo->pCtx, pBlock, order, true);
doStateWindowAggImpl(pOperator, pInfo, pBlock); doStateWindowAggImpl(pOperator, pInfo, pBlock);
} }
@ -5589,7 +5425,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator, bool* newgroup)
} }
// the pDataBlock are always the same one, no need to call this again // the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pOperator, pBInfo->pCtx, pBlock, order); setInputDataBlock(pOperator, pBInfo->pCtx, pBlock, order, true);
doSessionWindowAggImpl(pOperator, pInfo, pBlock); doSessionWindowAggImpl(pOperator, pInfo, pBlock);
} }
@ -6480,8 +6316,9 @@ SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t*
pExp->pExpr->_function.num = 1; pExp->pExpr->_function.num = 1;
pExp->pExpr->_function.functionId = -1; pExp->pExpr->_function.functionId = -1;
int32_t type = nodeType(pTargetNode->pExpr);
// it is a project query, or group by column // it is a project query, or group by column
if (nodeType(pTargetNode->pExpr) == QUERY_NODE_COLUMN) { if (type == QUERY_NODE_COLUMN) {
pExp->pExpr->nodeType = QUERY_NODE_COLUMN; pExp->pExpr->nodeType = QUERY_NODE_COLUMN;
SColumnNode* pColNode = (SColumnNode*)pTargetNode->pExpr; SColumnNode* pColNode = (SColumnNode*)pTargetNode->pExpr;
@ -6492,7 +6329,7 @@ SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t*
pExp->base.resSchema = createResSchema(pType->type, pType->bytes, pTargetNode->slotId, pType->scale, pType->precision, pColNode->colName); pExp->base.resSchema = createResSchema(pType->type, pType->bytes, pTargetNode->slotId, pType->scale, pType->precision, pColNode->colName);
pExp->base.pParam[0].pCol = createColumn(pColNode->dataBlockId, pColNode->slotId, pType); pExp->base.pParam[0].pCol = createColumn(pColNode->dataBlockId, pColNode->slotId, pType);
pExp->base.pParam[0].type = FUNC_PARAM_TYPE_COLUMN; pExp->base.pParam[0].type = FUNC_PARAM_TYPE_COLUMN;
} else if (nodeType(pTargetNode->pExpr) == QUERY_NODE_VALUE) { } else if (type == QUERY_NODE_VALUE) {
pExp->pExpr->nodeType = QUERY_NODE_VALUE; pExp->pExpr->nodeType = QUERY_NODE_VALUE;
SValueNode* pValNode = (SValueNode*)pTargetNode->pExpr; SValueNode* pValNode = (SValueNode*)pTargetNode->pExpr;
@ -6503,7 +6340,7 @@ SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t*
pExp->base.resSchema = createResSchema(pType->type, pType->bytes, pTargetNode->slotId, pType->scale, pType->precision, pValNode->node.aliasName); pExp->base.resSchema = createResSchema(pType->type, pType->bytes, pTargetNode->slotId, pType->scale, pType->precision, pValNode->node.aliasName);
pExp->base.pParam[0].type = FUNC_PARAM_TYPE_VALUE; pExp->base.pParam[0].type = FUNC_PARAM_TYPE_VALUE;
valueNodeToVariant(pValNode, &pExp->base.pParam[0].param); valueNodeToVariant(pValNode, &pExp->base.pParam[0].param);
} else if (nodeType(pTargetNode->pExpr) == QUERY_NODE_FUNCTION) { } else if (type == QUERY_NODE_FUNCTION) {
pExp->pExpr->nodeType = QUERY_NODE_FUNCTION; pExp->pExpr->nodeType = QUERY_NODE_FUNCTION;
SFunctionNode* pFuncNode = (SFunctionNode*)pTargetNode->pExpr; SFunctionNode* pFuncNode = (SFunctionNode*)pTargetNode->pExpr;
@ -6514,14 +6351,13 @@ SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t*
pExp->pExpr->_function.pFunctNode = pFuncNode; pExp->pExpr->_function.pFunctNode = pFuncNode;
strncpy(pExp->pExpr->_function.functionName, pFuncNode->functionName, tListLen(pExp->pExpr->_function.functionName)); strncpy(pExp->pExpr->_function.functionName, pFuncNode->functionName, tListLen(pExp->pExpr->_function.functionName));
// TODO: value parameter needs to be handled
int32_t numOfParam = LIST_LENGTH(pFuncNode->pParameterList); int32_t numOfParam = LIST_LENGTH(pFuncNode->pParameterList);
pExp->base.pParam = taosMemoryCalloc(numOfParam, sizeof(SFunctParam)); pExp->base.pParam = taosMemoryCalloc(numOfParam, sizeof(SFunctParam));
pExp->base.numOfParams = numOfParam; pExp->base.numOfParams = numOfParam;
for (int32_t j = 0; j < numOfParam; ++j) { for (int32_t j = 0; j < numOfParam; ++j) {
SNode* p1 = nodesListGetNode(pFuncNode->pParameterList, j); SNode* p1 = nodesListGetNode(pFuncNode->pParameterList, j);
if (p1->type == QUERY_NODE_COLUMN) { if (p1->type == QUERY_NODE_COLUMN) {
SColumnNode* pcn = (SColumnNode*) p1; SColumnNode* pcn = (SColumnNode*) p1;
@ -6530,9 +6366,10 @@ SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t*
} else if (p1->type == QUERY_NODE_VALUE) { } else if (p1->type == QUERY_NODE_VALUE) {
SValueNode* pvn = (SValueNode*)p1; SValueNode* pvn = (SValueNode*)p1;
pExp->base.pParam[j].type = FUNC_PARAM_TYPE_VALUE; pExp->base.pParam[j].type = FUNC_PARAM_TYPE_VALUE;
valueNodeToVariant(pvn, &pExp->base.pParam[j].param);
} }
} }
} else if (nodeType(pTargetNode->pExpr) == QUERY_NODE_OPERATOR) { } else if (type == QUERY_NODE_OPERATOR) {
pExp->pExpr->nodeType = QUERY_NODE_OPERATOR; pExp->pExpr->nodeType = QUERY_NODE_OPERATOR;
SOperatorNode* pNode = (SOperatorNode*)pTargetNode->pExpr; SOperatorNode* pNode = (SOperatorNode*)pTargetNode->pExpr;
@ -6541,11 +6378,7 @@ SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t*
SDataType* pType = &pNode->node.resType; SDataType* pType = &pNode->node.resType;
pExp->base.resSchema = createResSchema(pType->type, pType->bytes, pTargetNode->slotId, pType->scale, pType->precision, pNode->node.aliasName); pExp->base.resSchema = createResSchema(pType->type, pType->bytes, pTargetNode->slotId, pType->scale, pType->precision, pNode->node.aliasName);
pExp->pExpr->_optrRoot.pRootNode = pTargetNode->pExpr; pExp->pExpr->_optrRoot.pRootNode = pTargetNode->pExpr;
// pExp->base.pParam[0].type = FUNC_PARAM_TYPE_COLUMN;
// pExp->base.pParam[0].pCol = createColumn(pTargetNode->dataBlockId, pTargetNode->slotId, pType);
} else { } else {
ASSERT(0); ASSERT(0);
} }

View File

@ -284,7 +284,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator, bool* newgrou
} }
// the pDataBlock are always the same one, no need to call this again // the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, order); setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, order, true);
// there is an scalar expression that needs to be calculated right before apply the group aggregation. // there is an scalar expression that needs to be calculated right before apply the group aggregation.
if (pInfo->pScalarExprInfo != NULL) { if (pInfo->pScalarExprInfo != NULL) {

View File

@ -237,33 +237,45 @@ static int32_t translateLength(SFunctionNode* pFunc, char* pErrBuf, int32_t len)
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t translateConcatImpl(SFunctionNode* pFunc, char* pErrBuf, int32_t len, int32_t minParaNum, int32_t maxParaNum, int32_t primaryParaNo) { static int32_t translateConcatImpl(SFunctionNode* pFunc, char* pErrBuf, int32_t len, int32_t minParaNum, int32_t maxParaNum, bool hasSep) {
int32_t paraNum = LIST_LENGTH(pFunc->pParameterList); int32_t paraNum = LIST_LENGTH(pFunc->pParameterList);
if (paraNum < minParaNum || paraNum > maxParaNum) { if (paraNum < minParaNum || paraNum > maxParaNum) {
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName); return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
} }
uint8_t resultType = TSDB_DATA_TYPE_NCHAR; uint8_t resultType = TSDB_DATA_TYPE_BINARY;
int32_t resultBytes = 0; int32_t resultBytes = 0;
int32_t sepBytes = 0; int32_t sepBytes = 0;
for (int32_t i = 0; i < LIST_LENGTH(pFunc->pParameterList); ++i) {
/* For concat/concat_ws function, if params have NCHAR type, promote the final result to NCHAR */
for (int32_t i = 0; i < paraNum; ++i) {
SNode* pPara = nodesListGetNode(pFunc->pParameterList, i); SNode* pPara = nodesListGetNode(pFunc->pParameterList, i);
uint8_t paraType = ((SExprNode*)pPara)->resType.type; uint8_t paraType = ((SExprNode*)pPara)->resType.type;
int32_t paraBytes = ((SExprNode*)pPara)->resType.bytes;
if (!IS_VAR_DATA_TYPE(paraType)) { if (!IS_VAR_DATA_TYPE(paraType)) {
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
} }
if (i < primaryParaNo) { if (TSDB_DATA_TYPE_NCHAR == paraType) {
sepBytes = paraBytes; resultType = paraType;
continue;
} }
if (TSDB_DATA_TYPE_BINARY == paraType) {
resultType = TSDB_DATA_TYPE_BINARY;
}
resultBytes += paraBytes;
} }
if (sepBytes > 0) {
resultBytes += sepBytes * (paraNum - 2); for (int32_t i = 0; i < paraNum; ++i) {
SNode* pPara = nodesListGetNode(pFunc->pParameterList, i);
uint8_t paraType = ((SExprNode*)pPara)->resType.type;
int32_t paraBytes = ((SExprNode*)pPara)->resType.bytes;
int32_t factor = 1;
if (TSDB_DATA_TYPE_NCHAR == resultType && TSDB_DATA_TYPE_VARCHAR == paraType) {
factor *= TSDB_NCHAR_SIZE;
}
resultBytes += paraBytes * factor;
if (i == 0) {
sepBytes = paraBytes * factor;
}
}
if (hasSep) {
resultBytes += sepBytes * (paraNum - 3);
} }
pFunc->node.resType = (SDataType) { .bytes = resultBytes, .type = resultType }; pFunc->node.resType = (SDataType) { .bytes = resultBytes, .type = resultType };
@ -271,11 +283,11 @@ static int32_t translateConcatImpl(SFunctionNode* pFunc, char* pErrBuf, int32_t
} }
static int32_t translateConcat(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { static int32_t translateConcat(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
return translateConcatImpl(pFunc, pErrBuf, len, 2, 8, 0); return translateConcatImpl(pFunc, pErrBuf, len, 2, 8, false);
} }
static int32_t translateConcatWs(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { static int32_t translateConcatWs(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
return translateConcatImpl(pFunc, pErrBuf, len, 3, 9, 1); return translateConcatImpl(pFunc, pErrBuf, len, 3, 9, true);
} }
static int32_t translateSubstr(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { static int32_t translateSubstr(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {

View File

@ -1226,7 +1226,7 @@ void valueNodeToVariant(const SValueNode* pNode, SVariant* pVal) {
case TSDB_DATA_TYPE_NCHAR: case TSDB_DATA_TYPE_NCHAR:
case TSDB_DATA_TYPE_VARCHAR: case TSDB_DATA_TYPE_VARCHAR:
case TSDB_DATA_TYPE_VARBINARY: case TSDB_DATA_TYPE_VARBINARY:
pVal->pz = pNode->datum.p + VARSTR_HEADER_SIZE; pVal->pz = pNode->datum.p;
break; break;
case TSDB_DATA_TYPE_JSON: case TSDB_DATA_TYPE_JSON:
case TSDB_DATA_TYPE_DECIMAL: case TSDB_DATA_TYPE_DECIMAL:

View File

@ -313,8 +313,8 @@ static int32_t doLengthFunction(SScalarParam *pInput, int32_t inputNum, SScalarP
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t concatCopyHelper(const char *input, char *output, bool hasNcharCol, int32_t type, int16_t *dataLen) { static int32_t concatCopyHelper(const char *input, char *output, bool hasNchar, int32_t type, int16_t *dataLen) {
if (hasNcharCol && type == TSDB_DATA_TYPE_VARCHAR) { if (hasNchar && type == TSDB_DATA_TYPE_VARCHAR) {
TdUcs4 *newBuf = taosMemoryCalloc((varDataLen(input) + 1) * TSDB_NCHAR_SIZE, 1); TdUcs4 *newBuf = taosMemoryCalloc((varDataLen(input) + 1) * TSDB_NCHAR_SIZE, 1);
bool ret = taosMbsToUcs4(varDataVal(input), varDataLen(input), newBuf, (varDataLen(input) + 1) * TSDB_NCHAR_SIZE, NULL); bool ret = taosMbsToUcs4(varDataVal(input), varDataLen(input), newBuf, (varDataLen(input) + 1) * TSDB_NCHAR_SIZE, NULL);
if (!ret) { if (!ret) {
@ -345,10 +345,6 @@ static int32_t getNumOfNullEntries(SColumnInfoData *pColumnInfoData, int32_t num
} }
int32_t concatFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) { int32_t concatFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
if (inputNum < 2 || inputNum > 8) { // concat accpet 2-8 input strings
return TSDB_CODE_FAILED;
}
SColumnInfoData **pInputData = taosMemoryCalloc(inputNum, sizeof(SColumnInfoData *)); SColumnInfoData **pInputData = taosMemoryCalloc(inputNum, sizeof(SColumnInfoData *));
SColumnInfoData *pOutputData = pOutput->columnData; SColumnInfoData *pOutputData = pOutput->columnData;
char **input = taosMemoryCalloc(inputNum, POINTER_BYTES); char **input = taosMemoryCalloc(inputNum, POINTER_BYTES);
@ -356,15 +352,8 @@ int32_t concatFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOu
int32_t inputLen = 0; int32_t inputLen = 0;
int32_t numOfRows = 0; int32_t numOfRows = 0;
bool hasNcharCol = false; bool hasNchar = (GET_PARAM_TYPE(pOutput) == TSDB_DATA_TYPE_NCHAR) ? true : false;
for (int32_t i = 0; i < inputNum; ++i) { for (int32_t i = 0; i < inputNum; ++i) {
int32_t type = GET_PARAM_TYPE(&pInput[i]);
if (!IS_VAR_DATA_TYPE(type)) {
return TSDB_CODE_FAILED;
}
if (type == TSDB_DATA_TYPE_NCHAR) {
hasNcharCol = true;
}
if (pInput[i].numOfRows > numOfRows) { if (pInput[i].numOfRows > numOfRows) {
numOfRows = pInput[i].numOfRows; numOfRows = pInput[i].numOfRows;
} }
@ -373,7 +362,7 @@ int32_t concatFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOu
pInputData[i] = pInput[i].columnData; pInputData[i] = pInput[i].columnData;
input[i] = pInputData[i]->pData + pInputData[i]->varmeta.offset[0]; input[i] = pInputData[i]->pData + pInputData[i]->varmeta.offset[0];
int32_t factor = 1; int32_t factor = 1;
if (hasNcharCol && (GET_PARAM_TYPE(&pInput[i]) == TSDB_DATA_TYPE_VARCHAR)) { if (hasNchar && (GET_PARAM_TYPE(&pInput[i]) == TSDB_DATA_TYPE_VARCHAR)) {
factor = TSDB_NCHAR_SIZE; factor = TSDB_NCHAR_SIZE;
} }
@ -405,7 +394,7 @@ int32_t concatFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOu
int16_t dataLen = 0; int16_t dataLen = 0;
for (int32_t i = 0; i < inputNum; ++i) { for (int32_t i = 0; i < inputNum; ++i) {
int32_t ret = concatCopyHelper(input[i], output, hasNcharCol, GET_PARAM_TYPE(&pInput[i]), &dataLen); int32_t ret = concatCopyHelper(input[i], output, hasNchar, GET_PARAM_TYPE(&pInput[i]), &dataLen);
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
return ret; return ret;
} }
@ -428,10 +417,6 @@ int32_t concatFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOu
int32_t concatWsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) { int32_t concatWsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
if (inputNum < 3 || inputNum > 9) { // concat accpet 3-9 input strings including the separator
return TSDB_CODE_FAILED;
}
SColumnInfoData **pInputData = taosMemoryCalloc(inputNum, sizeof(SColumnInfoData *)); SColumnInfoData **pInputData = taosMemoryCalloc(inputNum, sizeof(SColumnInfoData *));
SColumnInfoData *pOutputData = pOutput->columnData; SColumnInfoData *pOutputData = pOutput->columnData;
char **input = taosMemoryCalloc(inputNum, POINTER_BYTES); char **input = taosMemoryCalloc(inputNum, POINTER_BYTES);
@ -439,15 +424,8 @@ int32_t concatWsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p
int32_t inputLen = 0; int32_t inputLen = 0;
int32_t numOfRows = 0; int32_t numOfRows = 0;
bool hasNcharCol = false; bool hasNchar = (GET_PARAM_TYPE(pOutput) == TSDB_DATA_TYPE_NCHAR) ? true : false;
for (int32_t i = 1; i < inputNum; ++i) { for (int32_t i = 1; i < inputNum; ++i) {
int32_t type = GET_PARAM_TYPE(&pInput[i]);
if (!IS_VAR_DATA_TYPE(GET_PARAM_TYPE(&pInput[i]))) {
return TSDB_CODE_FAILED;
}
if (type == TSDB_DATA_TYPE_NCHAR) {
hasNcharCol = true;
}
if (pInput[i].numOfRows > numOfRows) { if (pInput[i].numOfRows > numOfRows) {
numOfRows = pInput[i].numOfRows; numOfRows = pInput[i].numOfRows;
} }
@ -456,7 +434,7 @@ int32_t concatWsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p
pInputData[i] = pInput[i].columnData; pInputData[i] = pInput[i].columnData;
input[i] = pInputData[i]->pData + pInputData[i]->varmeta.offset[0]; input[i] = pInputData[i]->pData + pInputData[i]->varmeta.offset[0];
int32_t factor = 1; int32_t factor = 1;
if (hasNcharCol && (GET_PARAM_TYPE(&pInput[i]) == TSDB_DATA_TYPE_VARCHAR)) { if (hasNchar && (GET_PARAM_TYPE(&pInput[i]) == TSDB_DATA_TYPE_VARCHAR)) {
factor = TSDB_NCHAR_SIZE; factor = TSDB_NCHAR_SIZE;
} }
@ -487,7 +465,7 @@ int32_t concatWsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p
continue; continue;
} }
int32_t ret = concatCopyHelper(input[i], output, hasNcharCol, GET_PARAM_TYPE(&pInput[i]), &dataLen); int32_t ret = concatCopyHelper(input[i], output, hasNchar, GET_PARAM_TYPE(&pInput[i]), &dataLen);
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
return ret; return ret;
} }
@ -499,7 +477,7 @@ int32_t concatWsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p
if (i < inputNum - 1) { if (i < inputNum - 1) {
//insert the separator //insert the separator
char *sep = pInputData[0]->pData; char *sep = pInputData[0]->pData;
int32_t ret = concatCopyHelper(sep, output, hasNcharCol, GET_PARAM_TYPE(&pInput[0]), &dataLen); int32_t ret = concatCopyHelper(sep, output, hasNchar, GET_PARAM_TYPE(&pInput[0]), &dataLen);
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
return ret; return ret;
} }