Merge pull request #16395 from taosdata/szhou/save-op-state
fix: support window pseduo column _wstart,_wend,_wduration for fill operator
This commit is contained in:
commit
689caa1f97
|
@ -205,7 +205,7 @@ typedef struct SColumn {
|
|||
int16_t slotId;
|
||||
|
||||
char name[TSDB_COL_NAME_LEN];
|
||||
int8_t flag; // column type: normal column, tag, or user-input column (integer/float/string)
|
||||
int16_t colType; // column type: normal column, tag, or window column
|
||||
int16_t type;
|
||||
int32_t bytes;
|
||||
uint8_t precision;
|
||||
|
|
|
@ -57,7 +57,9 @@ typedef enum EColumnType {
|
|||
COLUMN_TYPE_COLUMN = 1,
|
||||
COLUMN_TYPE_TAG,
|
||||
COLUMN_TYPE_TBNAME,
|
||||
COLUMN_TYPE_WINDOW_PC,
|
||||
COLUMN_TYPE_WINDOW_START,
|
||||
COLUMN_TYPE_WINDOW_END,
|
||||
COLUMN_TYPE_WINDOW_DURATION,
|
||||
COLUMN_TYPE_GROUP_KEY
|
||||
} EColumnType;
|
||||
|
||||
|
|
|
@ -319,7 +319,9 @@ static EDealRes getColumn(SNode** pNode, void* pContext) {
|
|||
if (!data) {
|
||||
taosHashPut(pData->colHash, &pSColumnNode->colId, sizeof(pSColumnNode->colId), pNode, sizeof((*pNode)));
|
||||
pSColumnNode->slotId = pData->index++;
|
||||
SColumnInfo cInfo = {.colId = pSColumnNode->colId, .type = pSColumnNode->node.resType.type, .bytes = pSColumnNode->node.resType.bytes};
|
||||
SColumnInfo cInfo = {.colId = pSColumnNode->colId,
|
||||
.type = pSColumnNode->node.resType.type,
|
||||
.bytes = pSColumnNode->node.resType.bytes};
|
||||
#if TAG_FILTER_DEBUG
|
||||
qDebug("tagfilter build column info, slotId:%d, colId:%d, type:%d", pSColumnNode->slotId, cInfo.colId, cInfo.type);
|
||||
#endif
|
||||
|
@ -987,7 +989,7 @@ static SResSchema createResSchema(int32_t type, int32_t bytes, int32_t slotId, i
|
|||
return s;
|
||||
}
|
||||
|
||||
static SColumn* createColumn(int32_t blockId, int32_t slotId, int32_t colId, SDataType* pType) {
|
||||
static SColumn* createColumn(int32_t blockId, int32_t slotId, int32_t colId, SDataType* pType, EColumnType colType) {
|
||||
SColumn* pCol = taosMemoryCalloc(1, sizeof(SColumn));
|
||||
if (pCol == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
@ -1001,7 +1003,7 @@ static SColumn* createColumn(int32_t blockId, int32_t slotId, int32_t colId, SDa
|
|||
pCol->scale = pType->scale;
|
||||
pCol->precision = pType->precision;
|
||||
pCol->dataBlockId = blockId;
|
||||
|
||||
pCol->colType = colType;
|
||||
return pCol;
|
||||
}
|
||||
|
||||
|
@ -1045,7 +1047,8 @@ SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t*
|
|||
SDataType* pType = &pColNode->node.resType;
|
||||
pExp->base.resSchema = createResSchema(pType->type, pType->bytes, pTargetNode->slotId, pType->scale,
|
||||
pType->precision, pColNode->colName);
|
||||
pExp->base.pParam[0].pCol = createColumn(pColNode->dataBlockId, pColNode->slotId, pColNode->colId, pType);
|
||||
pExp->base.pParam[0].pCol =
|
||||
createColumn(pColNode->dataBlockId, pColNode->slotId, pColNode->colId, pType, pColNode->colType);
|
||||
pExp->base.pParam[0].type = FUNC_PARAM_TYPE_COLUMN;
|
||||
} else if (type == QUERY_NODE_VALUE) {
|
||||
pExp->pExpr->nodeType = QUERY_NODE_VALUE;
|
||||
|
@ -1097,7 +1100,8 @@ SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t*
|
|||
SColumnNode* pcn = (SColumnNode*)p1;
|
||||
|
||||
pExp->base.pParam[j].type = FUNC_PARAM_TYPE_COLUMN;
|
||||
pExp->base.pParam[j].pCol = createColumn(pcn->dataBlockId, pcn->slotId, pcn->colId, &pcn->node.resType);
|
||||
pExp->base.pParam[j].pCol =
|
||||
createColumn(pcn->dataBlockId, pcn->slotId, pcn->colId, &pcn->node.resType, pcn->colType);
|
||||
} else if (p1->type == QUERY_NODE_VALUE) {
|
||||
SValueNode* pvn = (SValueNode*)p1;
|
||||
pExp->base.pParam[j].type = FUNC_PARAM_TYPE_VALUE;
|
||||
|
|
|
@ -148,20 +148,6 @@ static int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock,
|
|||
static void initCtxOutputBuffer(SqlFunctionCtx* pCtx, int32_t size);
|
||||
static void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId);
|
||||
|
||||
// setup the output buffer for each operator
|
||||
static bool hasNull(SColumn* pColumn, SColumnDataAgg* pStatis) {
|
||||
if (TSDB_COL_IS_TAG(pColumn->flag) || TSDB_COL_IS_UD_COL(pColumn->flag) ||
|
||||
pColumn->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (pStatis != NULL && pStatis->numOfNull == 0) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
#if 0
|
||||
static bool chkResultRowFromKey(STaskRuntimeEnv* pRuntimeEnv, SResultRowInfo* pResultRowInfo, char* pData,
|
||||
int16_t bytes, bool masterscan, uint64_t uid) {
|
||||
|
|
|
@ -36,6 +36,7 @@
|
|||
#define GET_DEST_SLOT_ID(_p) ((_p)->pExpr->base.resSchema.slotId)
|
||||
|
||||
static void doSetVal(SColumnInfoData* pDstColInfoData, int32_t rowIndex, const SGroupKeys* pKey);
|
||||
static bool fillIfWindowPseudoColumn(SFillInfo* pFillInfo, SFillColInfo* pCol, SColumnInfoData* pDstColInfoData, int32_t rowIndex);
|
||||
|
||||
static void setNullRow(SSDataBlock* pBlock, SFillInfo* pFillInfo, int32_t rowIndex) {
|
||||
for(int32_t i = 0; i < pFillInfo->numOfCols; ++i) {
|
||||
|
@ -43,9 +44,8 @@ static void setNullRow(SSDataBlock* pBlock, SFillInfo* pFillInfo, int32_t rowInd
|
|||
int32_t dstSlotId = GET_DEST_SLOT_ID(pCol);
|
||||
SColumnInfoData* pDstColInfo = taosArrayGet(pBlock->pDataBlock, dstSlotId);
|
||||
if (pCol->notFillCol) {
|
||||
if (pDstColInfo->info.type == TSDB_DATA_TYPE_TIMESTAMP) {
|
||||
colDataAppend(pDstColInfo, rowIndex, (const char*)&pFillInfo->currentKey, false);
|
||||
} else {
|
||||
bool filled = fillIfWindowPseudoColumn(pFillInfo, pCol, pDstColInfo, rowIndex);
|
||||
if (!filled) {
|
||||
SArray* p = FILL_IS_ASC_FILL(pFillInfo) ? pFillInfo->prev.pRowVal : pFillInfo->next.pRowVal;
|
||||
SGroupKeys* pKey = taosArrayGet(p, i);
|
||||
doSetVal(pDstColInfo, rowIndex, pKey);
|
||||
|
@ -76,6 +76,35 @@ static void doSetUserSpecifiedValue(SColumnInfoData* pDst, SVariant* pVar, int32
|
|||
}
|
||||
}
|
||||
|
||||
//fill windows pseudo column, _wstart, _wend, _wduration and return true, otherwise return false
|
||||
static bool fillIfWindowPseudoColumn(SFillInfo* pFillInfo, SFillColInfo* pCol, SColumnInfoData* pDstColInfoData, int32_t rowIndex) {
|
||||
if (!pCol->notFillCol) {
|
||||
return false;
|
||||
}
|
||||
if (pCol->pExpr->pExpr->nodeType == QUERY_NODE_COLUMN) {
|
||||
if (pCol->pExpr->base.numOfParams != 1) {
|
||||
return false;
|
||||
}
|
||||
if (pCol->pExpr->base.pParam[0].pCol->colType == COLUMN_TYPE_WINDOW_START) {
|
||||
colDataAppend(pDstColInfoData, rowIndex, (const char*)&pFillInfo->currentKey, false);
|
||||
return true;
|
||||
} else if (pCol->pExpr->base.pParam[0].pCol->colType == COLUMN_TYPE_WINDOW_END) {
|
||||
//TODO: include endpoint
|
||||
SInterval* pInterval = &pFillInfo->interval;
|
||||
int32_t step = (pFillInfo->order == TSDB_ORDER_ASC) ? 1 : -1;
|
||||
int64_t windowEnd =
|
||||
taosTimeAdd(pFillInfo->currentKey, pInterval->sliding * step, pInterval->slidingUnit, pInterval->precision);
|
||||
colDataAppend(pDstColInfoData, rowIndex, (const char*)&windowEnd, false);
|
||||
return true;
|
||||
} else if (pCol->pExpr->base.pParam[0].pCol->colType == COLUMN_TYPE_WINDOW_DURATION) {
|
||||
//TODO: include endpoint
|
||||
colDataAppend(pDstColInfoData, rowIndex, (const char*)&pFillInfo->interval.sliding, false);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
static void doFillOneRow(SFillInfo* pFillInfo, SSDataBlock* pBlock, SSDataBlock* pSrcBlock, int64_t ts,
|
||||
bool outOfBound) {
|
||||
SPoint point1, point2, point;
|
||||
|
@ -92,10 +121,8 @@ static void doFillOneRow(SFillInfo* pFillInfo, SSDataBlock* pBlock, SSDataBlock*
|
|||
SFillColInfo* pCol = &pFillInfo->pFillCol[i];
|
||||
|
||||
SColumnInfoData* pDstColInfoData = taosArrayGet(pBlock->pDataBlock, GET_DEST_SLOT_ID(pCol));
|
||||
|
||||
if (pDstColInfoData->info.type == TSDB_DATA_TYPE_TIMESTAMP) {
|
||||
colDataAppend(pDstColInfoData, index, (const char*)&pFillInfo->currentKey, false);
|
||||
} else {
|
||||
bool filled = fillIfWindowPseudoColumn(pFillInfo, pCol, pDstColInfoData, index);
|
||||
if (!filled) {
|
||||
SGroupKeys* pKey = taosArrayGet(p, i);
|
||||
doSetVal(pDstColInfoData, index, pKey);
|
||||
}
|
||||
|
@ -106,10 +133,8 @@ static void doFillOneRow(SFillInfo* pFillInfo, SSDataBlock* pBlock, SSDataBlock*
|
|||
for (int32_t i = 0; i < pFillInfo->numOfCols; ++i) {
|
||||
SFillColInfo* pCol = &pFillInfo->pFillCol[i];
|
||||
SColumnInfoData* pDstColInfoData = taosArrayGet(pBlock->pDataBlock, GET_DEST_SLOT_ID(pCol));
|
||||
|
||||
if (pDstColInfoData->info.type == TSDB_DATA_TYPE_TIMESTAMP) {
|
||||
colDataAppend(pDstColInfoData, index, (const char*)&pFillInfo->currentKey, false);
|
||||
} else {
|
||||
bool filled = fillIfWindowPseudoColumn(pFillInfo, pCol, pDstColInfoData, index);
|
||||
if (!filled) {
|
||||
SGroupKeys* pKey = taosArrayGet(p, i);
|
||||
doSetVal(pDstColInfoData, index, pKey);
|
||||
}
|
||||
|
@ -127,9 +152,8 @@ static void doFillOneRow(SFillInfo* pFillInfo, SSDataBlock* pBlock, SSDataBlock*
|
|||
int16_t type = pDstCol->info.type;
|
||||
|
||||
if (pCol->notFillCol) {
|
||||
if (type == TSDB_DATA_TYPE_TIMESTAMP) {
|
||||
colDataAppend(pDstCol, index, (const char*)&pFillInfo->currentKey, false);
|
||||
} else {
|
||||
bool filled = fillIfWindowPseudoColumn(pFillInfo, pCol, pDstCol, index);
|
||||
if (!filled) {
|
||||
SArray* p = FILL_IS_ASC_FILL(pFillInfo) ? pFillInfo->prev.pRowVal : pFillInfo->next.pRowVal;
|
||||
SGroupKeys* pKey = taosArrayGet(p, i);
|
||||
doSetVal(pDstCol, index, pKey);
|
||||
|
@ -170,9 +194,8 @@ static void doFillOneRow(SFillInfo* pFillInfo, SSDataBlock* pBlock, SSDataBlock*
|
|||
SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, slotId);
|
||||
|
||||
if (pCol->notFillCol) {
|
||||
if (pDst->info.type == TSDB_DATA_TYPE_TIMESTAMP) {
|
||||
colDataAppend(pDst, index, (const char*)&pFillInfo->currentKey, false);
|
||||
} else {
|
||||
bool filled = fillIfWindowPseudoColumn(pFillInfo, pCol, pDst, index);
|
||||
if (!filled) {
|
||||
SArray* p = FILL_IS_ASC_FILL(pFillInfo) ? pFillInfo->prev.pRowVal : pFillInfo->next.pRowVal;
|
||||
SGroupKeys* pKey = taosArrayGet(p, i);
|
||||
doSetVal(pDst, index, pKey);
|
||||
|
|
|
@ -1947,8 +1947,8 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator
|
|||
if (gid != pRowSup->groupId || pInfo->winSup.prevTs == INT64_MIN) {
|
||||
doKeepNewWindowStartInfo(pRowSup, tsList, j, gid);
|
||||
doKeepTuple(pRowSup, tsList[j], gid);
|
||||
} else if ((tsList[j] - pRowSup->prevTs >= 0) && tsList[j] - pRowSup->prevTs <= gap ||
|
||||
(pRowSup->prevTs - tsList[j] >= 0) && (pRowSup->prevTs - tsList[j] <= gap)) {
|
||||
} else if (((tsList[j] - pRowSup->prevTs >= 0) && (tsList[j] - pRowSup->prevTs <= gap)) ||
|
||||
((pRowSup->prevTs - tsList[j] >= 0) && (pRowSup->prevTs - tsList[j] <= gap))) {
|
||||
// The gap is less than the threshold, so it belongs to current session window that has been opened already.
|
||||
doKeepTuple(pRowSup, tsList[j], gid);
|
||||
if (j == 0 && pRowSup->startRowIndex != 0) {
|
||||
|
|
|
@ -44,12 +44,15 @@ static void setColumnInfo(SFunctionNode* pFunc, SColumnNode* pCol) {
|
|||
pCol->colType = COLUMN_TYPE_TBNAME;
|
||||
break;
|
||||
case FUNCTION_TYPE_WSTART:
|
||||
pCol->colId = PRIMARYKEY_TIMESTAMP_COL_ID;
|
||||
pCol->colType = COLUMN_TYPE_WINDOW_START;
|
||||
break;
|
||||
case FUNCTION_TYPE_WEND:
|
||||
pCol->colId = PRIMARYKEY_TIMESTAMP_COL_ID;
|
||||
pCol->colType = COLUMN_TYPE_WINDOW_PC;
|
||||
pCol->colType = COLUMN_TYPE_WINDOW_END;
|
||||
break;
|
||||
case FUNCTION_TYPE_WDURATION:
|
||||
pCol->colType = COLUMN_TYPE_WINDOW_PC;
|
||||
pCol->colType = COLUMN_TYPE_WINDOW_DURATION;
|
||||
break;
|
||||
case FUNCTION_TYPE_GROUP_KEY:
|
||||
pCol->colType = COLUMN_TYPE_GROUP_KEY;
|
||||
|
@ -784,7 +787,10 @@ static int32_t createWindowLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSele
|
|||
static EDealRes needFillValueImpl(SNode* pNode, void* pContext) {
|
||||
if (QUERY_NODE_COLUMN == nodeType(pNode)) {
|
||||
SColumnNode* pCol = (SColumnNode*)pNode;
|
||||
if (COLUMN_TYPE_WINDOW_PC != pCol->colType && COLUMN_TYPE_GROUP_KEY != pCol->colType) {
|
||||
if (COLUMN_TYPE_WINDOW_START != pCol->colType &&
|
||||
COLUMN_TYPE_WINDOW_END != pCol->colType &&
|
||||
COLUMN_TYPE_WINDOW_DURATION != pCol->colType &&
|
||||
COLUMN_TYPE_GROUP_KEY != pCol->colType) {
|
||||
*(bool*)pContext = true;
|
||||
return DEAL_RES_END;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue