Merge pull request #11268 from taosdata/feature/3.0_liaohj
Feature/3.0 liaohj
This commit is contained in:
commit
ba29bc21f4
|
@ -110,10 +110,11 @@ typedef struct SFileBlockInfo {
|
|||
#define FUNCTION_COV 38
|
||||
|
||||
typedef struct SResultRowEntryInfo {
|
||||
int8_t hasResult; // result generated, not NULL value
|
||||
bool initialized; // output buffer has been initialized
|
||||
bool complete; // query has completed
|
||||
uint32_t numOfRes; // num of output result in current buffer
|
||||
// int8_t hasResult:6; // result generated, not NULL value
|
||||
bool initialized:1; // output buffer has been initialized
|
||||
bool complete:1; // query has completed
|
||||
uint8_t isNullRes:6; // the result is null
|
||||
uint8_t numOfRes; // num of output result in current buffer
|
||||
} SResultRowEntryInfo;
|
||||
|
||||
// determine the real data need to calculated the result
|
||||
|
@ -156,7 +157,6 @@ typedef struct SResultDataInfo {
|
|||
|
||||
#define GET_RES_INFO(ctx) ((ctx)->resultInfo)
|
||||
#define GET_ROWCELL_INTERBUF(_c) ((void*) ((char*)(_c) + sizeof(SResultRowEntryInfo)))
|
||||
#define DATA_SET_FLAG ',' // to denote the output area has data, not null value
|
||||
|
||||
typedef struct SInputColumnInfoData {
|
||||
int32_t totalRows; // total rows in current columnar data
|
||||
|
|
|
@ -483,13 +483,14 @@ typedef struct STableIntervalOperatorInfo {
|
|||
SOptrBasicInfo binfo; // basic info
|
||||
SGroupResInfo groupResInfo; // multiple results build supporter
|
||||
SInterval interval; // interval info
|
||||
int32_t primaryTsIndex; // primary time stamp slot id from result of downstream operator.
|
||||
STimeWindow win; // query time range
|
||||
bool timeWindowInterpo; // interpolation needed or not
|
||||
char **pRow; // previous row/tuple of already processed datablock
|
||||
SAggSupporter aggSup; // aggregate supporter
|
||||
STableQueryInfo *pCurrent; // current tableQueryInfo struct
|
||||
int32_t order; // current SSDataBlock scan order
|
||||
EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model]
|
||||
EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model]
|
||||
SArray *pUpdatedWindow; // updated time window due to the input data block from the downstream operator.
|
||||
SColumnInfoData timeWindowData; // query time window info for scalar function execution.
|
||||
} STableIntervalOperatorInfo;
|
||||
|
@ -670,7 +671,7 @@ SOperatorInfo *createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pR
|
|||
SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t num, SArray* pSortInfo, SArray* pGroupInfo, SExecTaskInfo* pTaskInfo);
|
||||
SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, SSDataBlock* pResBlock, const SName* pName,
|
||||
SNode* pCondition, SEpSet epset, SArray* colList, SExecTaskInfo* pTaskInfo, bool showRewrite, int32_t accountId);
|
||||
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SInterval* pInterval,
|
||||
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlot,
|
||||
const STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo);
|
||||
SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, int64_t gap, SExecTaskInfo* pTaskInfo);
|
||||
SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock,
|
||||
|
|
|
@ -980,7 +980,7 @@ void doApplyFunctions(SqlFunctionCtx* pCtx, STimeWindow* pWin, SColumnInfoData*
|
|||
for (int32_t k = 0; k < numOfOutput; ++k) {
|
||||
pCtx[k].startTs = pWin->skey;
|
||||
|
||||
// keep it temporarialy
|
||||
// keep it temporarily
|
||||
bool hasAgg = pCtx[k].input.colDataAggIsSet;
|
||||
int32_t numOfRows = pCtx[k].input.numOfRows;
|
||||
int32_t startOffset = pCtx[k].input.startRowIndex;
|
||||
|
@ -1012,7 +1012,6 @@ void doApplyFunctions(SqlFunctionCtx* pCtx, STimeWindow* pWin, SColumnInfoData*
|
|||
SScalarParam tw = {.numOfRows = 5, .columnData = pTimeWindowData};
|
||||
pCtx[k].sfp.process(&tw, 1, &out);
|
||||
pEntryInfo->numOfRes = 1;
|
||||
pEntryInfo->hasResult = ',';
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -1511,7 +1510,7 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe
|
|||
|
||||
TSKEY* tsCols = NULL;
|
||||
if (pSDataBlock->pDataBlock != NULL) {
|
||||
SColumnInfoData* pColDataInfo = taosArrayGet(pSDataBlock->pDataBlock, 0);
|
||||
SColumnInfoData* pColDataInfo = taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex);
|
||||
tsCols = (int64_t*)pColDataInfo->pData;
|
||||
// assert(tsCols[0] == pSDataBlock->info.window.skey && tsCols[pSDataBlock->info.rows - 1] ==
|
||||
// pSDataBlock->info.window.ekey);
|
||||
|
@ -3144,11 +3143,6 @@ void finalizeMultiTupleQueryResult(SqlFunctionCtx* pCtx, int32_t numOfOutput, SD
|
|||
}
|
||||
|
||||
releaseBufPage(pBuf, bufPage);
|
||||
/*
|
||||
* set the number of output results for group by normal columns, the number of output rows usually is 1 except
|
||||
* the top and bottom query
|
||||
*/
|
||||
// buf->numOfRows = (uint16_t)getNumOfResult(pCtx, numOfOutput);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -3285,7 +3279,8 @@ void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock) {
|
|||
}
|
||||
|
||||
SFilterInfo* filter = NULL;
|
||||
int32_t code = filterInitFromNode((SNode*)pFilterNode, &filter, 0);
|
||||
|
||||
int32_t code = filterInitFromNode((SNode*)pFilterNode, &filter, 0);
|
||||
|
||||
SFilterColumnParam param1 = {.numOfCols = pBlock->info.numOfCols, .pDataBlock = pBlock->pDataBlock};
|
||||
code = filterSetDataFromSlotId(filter, ¶m1);
|
||||
|
@ -3296,6 +3291,7 @@ void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock) {
|
|||
SSDataBlock* px = createOneDataBlock(pBlock);
|
||||
blockDataEnsureCapacity(px, pBlock->info.rows);
|
||||
|
||||
// todo extract method
|
||||
int32_t numOfRow = 0;
|
||||
for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
|
||||
SColumnInfoData* pDst = taosArrayGet(px->pDataBlock, i);
|
||||
|
@ -3307,7 +3303,11 @@ void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock) {
|
|||
continue;
|
||||
}
|
||||
|
||||
colDataAppend(pDst, numOfRow, colDataGetData(pSrc, j), false);
|
||||
if (colDataIsNull_s(pSrc, j)) {
|
||||
colDataAppendNULL(pDst, numOfRow);
|
||||
} else {
|
||||
colDataAppend(pDst, numOfRow, colDataGetData(pSrc, j), false);
|
||||
}
|
||||
numOfRow += 1;
|
||||
}
|
||||
|
||||
|
@ -3525,7 +3525,7 @@ static int32_t doCopyToSDataBlock(SDiskbasedBuf* pBuf, SGroupResInfo* pGroupResI
|
|||
SResultRowEntryInfo* pEntryInfo = getResultCell(pRow, j, rowCellOffset);
|
||||
|
||||
char* in = GET_ROWCELL_INTERBUF(pEntryInfo);
|
||||
colDataAppend(pColInfoData, nrows, in, pEntryInfo->numOfRes == 0);
|
||||
colDataAppend(pColInfoData, nrows, in, pEntryInfo->isNullRes);
|
||||
}
|
||||
|
||||
releaseBufPage(pBuf, page);
|
||||
|
@ -6355,7 +6355,7 @@ _error:
|
|||
}
|
||||
|
||||
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
|
||||
SSDataBlock* pResBlock, SInterval* pInterval,
|
||||
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlot,
|
||||
const STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo) {
|
||||
STableIntervalOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(STableIntervalOperatorInfo));
|
||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||
|
@ -6371,6 +6371,8 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
|
|||
pInfo->win.skey = 0;
|
||||
pInfo->win.ekey = INT64_MAX;
|
||||
|
||||
pInfo->primaryTsIndex = primaryTsSlot;
|
||||
|
||||
int32_t numOfRows = 4096;
|
||||
int32_t code = initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, numOfRows, pResBlock, pTaskInfo->id.str);
|
||||
initExecTimeWindowInfo(&pInfo->timeWindowData, &pInfo->win);
|
||||
|
@ -7137,18 +7139,21 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
|||
|
||||
SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode;
|
||||
|
||||
// todo: set the correct primary timestamp key column
|
||||
int32_t num = 0;
|
||||
SExprInfo* pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &num);
|
||||
SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc);
|
||||
|
||||
SInterval interval = {.interval = pIntervalPhyNode->interval,
|
||||
.sliding = pIntervalPhyNode->sliding,
|
||||
.intervalUnit = pIntervalPhyNode->intervalUnit,
|
||||
.slidingUnit = pIntervalPhyNode->slidingUnit,
|
||||
.offset = pIntervalPhyNode->offset,
|
||||
.precision = TSDB_TIME_PRECISION_MILLI};
|
||||
return createIntervalOperatorInfo(op, pExprInfo, num, pResBlock, &interval, pTableGroupInfo, pTaskInfo);
|
||||
SInterval interval = {
|
||||
.interval = pIntervalPhyNode->interval,
|
||||
.sliding = pIntervalPhyNode->sliding,
|
||||
.intervalUnit = pIntervalPhyNode->intervalUnit,
|
||||
.slidingUnit = pIntervalPhyNode->slidingUnit,
|
||||
.offset = pIntervalPhyNode->offset,
|
||||
.precision = pIntervalPhyNode->precision
|
||||
};
|
||||
|
||||
int32_t primaryTsSlotId = ((SColumnNode*) pIntervalPhyNode->pTspk)->slotId;
|
||||
return createIntervalOperatorInfo(op, pExprInfo, num, pResBlock, &interval, primaryTsSlotId, pTableGroupInfo, pTaskInfo);
|
||||
}
|
||||
} else if (QUERY_NODE_PHYSICAL_PLAN_SORT == nodeType(pPhyNode)) {
|
||||
size_t size = LIST_LENGTH(pPhyNode->pChildren);
|
||||
|
|
|
@ -106,7 +106,7 @@ static bool groupKeyCompare(SGroupbyOperatorInfo* pInfo, SSDataBlock* pBlock, in
|
|||
return true;
|
||||
}
|
||||
|
||||
static void keepGroupKeys(SGroupbyOperatorInfo* pInfo, SSDataBlock* pBlock, int32_t rowIndex, int32_t numOfGroupCols) {
|
||||
static void recordNewGroupKeys(SGroupbyOperatorInfo* pInfo, SSDataBlock* pBlock, int32_t rowIndex, int32_t numOfGroupCols) {
|
||||
SColumnDataAgg* pColAgg = NULL;
|
||||
|
||||
for (int32_t i = 0; i < numOfGroupCols; ++i) {
|
||||
|
@ -131,7 +131,7 @@ static void keepGroupKeys(SGroupbyOperatorInfo* pInfo, SSDataBlock* pBlock, int3
|
|||
}
|
||||
}
|
||||
|
||||
static int32_t generatedHashKey(void* pKey, int32_t* length, SArray* pGroupColVals) {
|
||||
static int32_t buildGroupValKey(void* pKey, int32_t* length, SArray* pGroupColVals) {
|
||||
ASSERT(pKey != NULL);
|
||||
size_t numOfGroupCols = taosArrayGetSize(pGroupColVals);
|
||||
|
||||
|
@ -170,11 +170,12 @@ static void doAssignGroupKeys(SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t
|
|||
char* dest = GET_ROWCELL_INTERBUF(pEntryInfo);
|
||||
char* data = colDataGetData(pColInfoData, rowIndex);
|
||||
|
||||
// set result exists, todo refactor
|
||||
memcpy(dest, data, pColInfoData->info.bytes);
|
||||
pEntryInfo->hasResult = DATA_SET_FLAG;
|
||||
pEntryInfo->numOfRes = 1;
|
||||
} else { // it is a NULL value
|
||||
pEntryInfo->isNullRes = 1;
|
||||
}
|
||||
|
||||
pEntryInfo->numOfRes = 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -197,7 +198,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
|
|||
for (int32_t j = 0; j < pBlock->info.rows; ++j) {
|
||||
// Compare with the previous row of this column, and do not set the output buffer again if they are identical.
|
||||
if (!pInfo->isInit) {
|
||||
keepGroupKeys(pInfo, pBlock, j, numOfGroupCols);
|
||||
recordNewGroupKeys(pInfo, pBlock, j, numOfGroupCols);
|
||||
pInfo->isInit = true;
|
||||
num++;
|
||||
continue;
|
||||
|
@ -209,7 +210,14 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
|
|||
continue;
|
||||
}
|
||||
|
||||
/*int32_t ret = */ generatedHashKey(pInfo->keyBuf, &len, pInfo->pGroupColVals);
|
||||
// The first row of a new block does not belongs to the previous existed group
|
||||
if (!equal && j == 0) {
|
||||
num++;
|
||||
recordNewGroupKeys(pInfo, pBlock, j, numOfGroupCols);
|
||||
continue;
|
||||
}
|
||||
|
||||
/*int32_t ret = */ buildGroupValKey(pInfo->keyBuf, &len, pInfo->pGroupColVals);
|
||||
int32_t ret = setGroupResultOutputBuf_rv(&(pInfo->binfo), pOperator->numOfOutput, pInfo->keyBuf, TSDB_DATA_TYPE_VARCHAR, len, 0, pInfo->aggSup.pResultBuf, pTaskInfo, &pInfo->aggSup);
|
||||
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
|
||||
longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR);
|
||||
|
@ -220,12 +228,12 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
|
|||
|
||||
// assign the group keys or user input constant values if required
|
||||
doAssignGroupKeys(pCtx, pOperator->numOfOutput, pBlock->info.rows, rowIndex);
|
||||
keepGroupKeys(pInfo, pBlock, j, numOfGroupCols);
|
||||
recordNewGroupKeys(pInfo, pBlock, j, numOfGroupCols);
|
||||
num = 1;
|
||||
}
|
||||
|
||||
if (num > 0) {
|
||||
/*int32_t ret = */ generatedHashKey(pInfo->keyBuf, &len, pInfo->pGroupColVals);
|
||||
/*int32_t ret = */ buildGroupValKey(pInfo->keyBuf, &len, pInfo->pGroupColVals);
|
||||
int32_t ret =
|
||||
setGroupResultOutputBuf_rv(&(pInfo->binfo), pOperator->numOfOutput, pInfo->keyBuf, TSDB_DATA_TYPE_VARCHAR, len,
|
||||
0, pInfo->aggSup.pResultBuf, pTaskInfo, &pInfo->aggSup);
|
||||
|
@ -288,8 +296,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator, bool* newgrou
|
|||
initGroupResInfo(&pInfo->groupResInfo, &pInfo->binfo.resultRowInfo);
|
||||
|
||||
while(1) {
|
||||
toSDatablock(&pInfo->groupResInfo, pInfo->aggSup.pResultBuf, pRes, pInfo->binfo.capacity,
|
||||
pInfo->binfo.rowCellInfoOffset);
|
||||
toSDatablock(&pInfo->groupResInfo, pInfo->aggSup.pResultBuf, pRes, pInfo->binfo.capacity, pInfo->binfo.rowCellInfoOffset);
|
||||
doFilter(pInfo->pCondition, pRes);
|
||||
|
||||
bool hasRemain = hasRemainDataInCurrentGroup(&pInfo->groupResInfo);
|
||||
|
|
|
@ -25,7 +25,6 @@
|
|||
break; \
|
||||
} \
|
||||
(_info)->numOfRes = (res); \
|
||||
(_info)->hasResult = DATA_SET_FLAG; \
|
||||
} while (0)
|
||||
|
||||
typedef struct SSumRes {
|
||||
|
@ -49,11 +48,11 @@ bool functionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo) {
|
|||
return true;
|
||||
}
|
||||
|
||||
static void doFinalizer(SResultRowEntryInfo* pResInfo) { cleanupResultRowEntry(pResInfo); }
|
||||
|
||||
void functionFinalize(SqlFunctionCtx *pCtx) {
|
||||
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
||||
doFinalizer(pResInfo);
|
||||
|
||||
cleanupResultRowEntry(pResInfo);
|
||||
pResInfo->isNullRes = (pResInfo->numOfRes == 0)? 1:0;
|
||||
}
|
||||
|
||||
bool getCountFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
|
||||
|
@ -715,7 +714,6 @@ int32_t percentileFunction(SqlFunctionCtx *pCtx) {
|
|||
}
|
||||
|
||||
SET_VAL(pResInfo, notNullElems, 1);
|
||||
pResInfo->hasResult = DATA_SET_FLAG;
|
||||
}
|
||||
|
||||
// TODO set the correct parameter.
|
||||
|
@ -775,9 +773,7 @@ int32_t firstFunction(SqlFunctionCtx *pCtx) {
|
|||
// DO_UPDATE_TAG_COLUMNS(pCtx, k);
|
||||
// }
|
||||
|
||||
pResInfo->hasResult = DATA_SET_FLAG;
|
||||
pResInfo->complete = true;
|
||||
|
||||
numOfElems++;
|
||||
break;
|
||||
}
|
||||
|
@ -815,8 +811,6 @@ int32_t lastFunction(SqlFunctionCtx *pCtx) {
|
|||
|
||||
// TSKEY ts = pCtx->ptsList ? GET_TS_DATA(pCtx, i) : 0;
|
||||
// DO_UPDATE_TAG_COLUMNS(pCtx, ts);
|
||||
|
||||
pResInfo->hasResult = DATA_SET_FLAG;
|
||||
pResInfo->complete = true; // set query completed on this column
|
||||
numOfElems++;
|
||||
break;
|
||||
|
@ -830,10 +824,8 @@ int32_t lastFunction(SqlFunctionCtx *pCtx) {
|
|||
char* data = colDataGetData(pInputCol, i);
|
||||
TSKEY ts = pCtx->ptsList ? GET_TS_DATA(pCtx, i) : 0;
|
||||
|
||||
if (pResInfo->hasResult != DATA_SET_FLAG || (*(TSKEY*)buf) < ts) {
|
||||
pResInfo->hasResult = DATA_SET_FLAG;
|
||||
if (pResInfo->numOfRes == 0 || (*(TSKEY*)buf) < ts) {
|
||||
memcpy(buf, data, pCtx->inputBytes);
|
||||
|
||||
*(TSKEY*)buf = ts;
|
||||
// DO_UPDATE_TAG_COLUMNS(pCtx, ts);
|
||||
}
|
||||
|
|
|
@ -208,7 +208,7 @@ int32_t getNumOfResult(SqlFunctionCtx* pCtx, int32_t num, SSDataBlock* pResBlock
|
|||
SColumnInfoData* pCol = taosArrayGet(pResBlock->pDataBlock, i);
|
||||
|
||||
SResultRowEntryInfo *pResInfo = GET_RES_INFO(&pCtx[i]);
|
||||
if (!pResInfo->hasResult) {
|
||||
if (pResInfo->numOfRes == 0) {
|
||||
for(int32_t j = 0; j < pResInfo->numOfRes; ++j) {
|
||||
colDataAppend(pCol, j, NULL, true); // TODO add set null data api
|
||||
}
|
||||
|
|
|
@ -206,7 +206,7 @@ if $data02 != 2678400000 then
|
|||
endi
|
||||
|
||||
sql select _wstartts, count(tbcol), _wduration, _wstartts, count(*) from ct3 interval(1n, 1w) sliding(2w)
|
||||
print ===> select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(*) from ct3 interval(1n, 1w) sliding(2w)
|
||||
print ===> select _wstartts, count(tbcol), _wduration, _wstartts, count(*) from ct3 interval(1n, 1w) sliding(2w)
|
||||
print ===> rows: $rows
|
||||
print ===> rows0: $data00 $data01 $data02 $data03 $data04
|
||||
print ===> rows1: $data10 $data11 $data12 $data13 $data14
|
||||
|
@ -219,6 +219,7 @@ if $data00 != @21-11-30 08:00:00.000@ then
|
|||
return -1
|
||||
endi
|
||||
if $data01 != NULL then
|
||||
print expect null, actual: $data01
|
||||
return -1
|
||||
endi
|
||||
if $data31 != $data34 then
|
||||
|
|
Loading…
Reference in New Issue