fix: support window pseduo column for fill operator

This commit is contained in:
shenglian zhou 2022-08-24 17:05:38 +08:00
parent b62e0c0235
commit b250efeee4
7 changed files with 121 additions and 118 deletions

View File

@ -181,7 +181,7 @@ typedef struct SColumn {
int16_t slotId; int16_t slotId;
char name[TSDB_COL_NAME_LEN]; char name[TSDB_COL_NAME_LEN];
int8_t flag; // column type: normal column, tag, or user-input column (integer/float/string) int16_t colType; // column type: normal column, tag, or window column
int16_t type; int16_t type;
int32_t bytes; int32_t bytes;
uint8_t precision; uint8_t precision;

View File

@ -57,7 +57,9 @@ typedef enum EColumnType {
COLUMN_TYPE_COLUMN = 1, COLUMN_TYPE_COLUMN = 1,
COLUMN_TYPE_TAG, COLUMN_TYPE_TAG,
COLUMN_TYPE_TBNAME, COLUMN_TYPE_TBNAME,
COLUMN_TYPE_WINDOW_PC, COLUMN_TYPE_WINDOW_START,
COLUMN_TYPE_WINDOW_END,
COLUMN_TYPE_WINDOW_DURATION,
COLUMN_TYPE_GROUP_KEY COLUMN_TYPE_GROUP_KEY
} EColumnType; } EColumnType;

View File

@ -319,7 +319,9 @@ static EDealRes getColumn(SNode** pNode, void* pContext) {
if (!data) { if (!data) {
taosHashPut(pData->colHash, &pSColumnNode->colId, sizeof(pSColumnNode->colId), pNode, sizeof((*pNode))); taosHashPut(pData->colHash, &pSColumnNode->colId, sizeof(pSColumnNode->colId), pNode, sizeof((*pNode)));
pSColumnNode->slotId = pData->index++; pSColumnNode->slotId = pData->index++;
SColumnInfo cInfo = {.colId = pSColumnNode->colId, .type = pSColumnNode->node.resType.type, .bytes = pSColumnNode->node.resType.bytes}; SColumnInfo cInfo = {.colId = pSColumnNode->colId,
.type = pSColumnNode->node.resType.type,
.bytes = pSColumnNode->node.resType.bytes};
#if TAG_FILTER_DEBUG #if TAG_FILTER_DEBUG
qDebug("tagfilter build column info, slotId:%d, colId:%d, type:%d", pSColumnNode->slotId, cInfo.colId, cInfo.type); qDebug("tagfilter build column info, slotId:%d, colId:%d, type:%d", pSColumnNode->slotId, cInfo.colId, cInfo.type);
#endif #endif
@ -984,7 +986,7 @@ static SResSchema createResSchema(int32_t type, int32_t bytes, int32_t slotId, i
return s; return s;
} }
static SColumn* createColumn(int32_t blockId, int32_t slotId, int32_t colId, SDataType* pType) { static SColumn* createColumn(int32_t blockId, int32_t slotId, int32_t colId, SDataType* pType, EColumnType colType) {
SColumn* pCol = taosMemoryCalloc(1, sizeof(SColumn)); SColumn* pCol = taosMemoryCalloc(1, sizeof(SColumn));
if (pCol == NULL) { if (pCol == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
@ -998,7 +1000,7 @@ static SColumn* createColumn(int32_t blockId, int32_t slotId, int32_t colId, SDa
pCol->scale = pType->scale; pCol->scale = pType->scale;
pCol->precision = pType->precision; pCol->precision = pType->precision;
pCol->dataBlockId = blockId; pCol->dataBlockId = blockId;
pCol->colType = colType;
return pCol; return pCol;
} }
@ -1042,7 +1044,8 @@ SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t*
SDataType* pType = &pColNode->node.resType; SDataType* pType = &pColNode->node.resType;
pExp->base.resSchema = createResSchema(pType->type, pType->bytes, pTargetNode->slotId, pType->scale, pExp->base.resSchema = createResSchema(pType->type, pType->bytes, pTargetNode->slotId, pType->scale,
pType->precision, pColNode->colName); pType->precision, pColNode->colName);
pExp->base.pParam[0].pCol = createColumn(pColNode->dataBlockId, pColNode->slotId, pColNode->colId, pType); pExp->base.pParam[0].pCol =
createColumn(pColNode->dataBlockId, pColNode->slotId, pColNode->colId, pType, pColNode->colType);
pExp->base.pParam[0].type = FUNC_PARAM_TYPE_COLUMN; pExp->base.pParam[0].type = FUNC_PARAM_TYPE_COLUMN;
} else if (type == QUERY_NODE_VALUE) { } else if (type == QUERY_NODE_VALUE) {
pExp->pExpr->nodeType = QUERY_NODE_VALUE; pExp->pExpr->nodeType = QUERY_NODE_VALUE;
@ -1094,7 +1097,8 @@ SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t*
SColumnNode* pcn = (SColumnNode*)p1; SColumnNode* pcn = (SColumnNode*)p1;
pExp->base.pParam[j].type = FUNC_PARAM_TYPE_COLUMN; pExp->base.pParam[j].type = FUNC_PARAM_TYPE_COLUMN;
pExp->base.pParam[j].pCol = createColumn(pcn->dataBlockId, pcn->slotId, pcn->colId, &pcn->node.resType); pExp->base.pParam[j].pCol =
createColumn(pcn->dataBlockId, pcn->slotId, pcn->colId, &pcn->node.resType, pcn->colType);
} else if (p1->type == QUERY_NODE_VALUE) { } else if (p1->type == QUERY_NODE_VALUE) {
SValueNode* pvn = (SValueNode*)p1; SValueNode* pvn = (SValueNode*)p1;
pExp->base.pParam[j].type = FUNC_PARAM_TYPE_VALUE; pExp->base.pParam[j].type = FUNC_PARAM_TYPE_VALUE;

View File

@ -148,20 +148,6 @@ static int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock,
static void initCtxOutputBuffer(SqlFunctionCtx* pCtx, int32_t size); static void initCtxOutputBuffer(SqlFunctionCtx* pCtx, int32_t size);
static void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId); static void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId);
// setup the output buffer for each operator
static bool hasNull(SColumn* pColumn, SColumnDataAgg* pStatis) {
if (TSDB_COL_IS_TAG(pColumn->flag) || TSDB_COL_IS_UD_COL(pColumn->flag) ||
pColumn->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
return false;
}
if (pStatis != NULL && pStatis->numOfNull == 0) {
return false;
}
return true;
}
#if 0 #if 0
static bool chkResultRowFromKey(STaskRuntimeEnv* pRuntimeEnv, SResultRowInfo* pResultRowInfo, char* pData, static bool chkResultRowFromKey(STaskRuntimeEnv* pRuntimeEnv, SResultRowInfo* pResultRowInfo, char* pData,
int16_t bytes, bool masterscan, uint64_t uid) { int16_t bytes, bool masterscan, uint64_t uid) {

View File

@ -76,6 +76,11 @@ static void doSetUserSpecifiedValue(SColumnInfoData* pDst, SVariant* pVar, int32
} }
} }
static bool fillWindowPseudoColumn(SFillInfo* pFillInfo, SFillColInfo* pCol, SColumnInfoData* pDstColInfoData, int32_t rowIndex) {
//fill windows pseduo column, _wstart, _wend, _wduration and return true
return false;
}
static void doFillOneRow(SFillInfo* pFillInfo, SSDataBlock* pBlock, SSDataBlock* pSrcBlock, int64_t ts, static void doFillOneRow(SFillInfo* pFillInfo, SSDataBlock* pBlock, SSDataBlock* pSrcBlock, int64_t ts,
bool outOfBound) { bool outOfBound) {
SPoint point1, point2, point; SPoint point1, point2, point;

View File

@ -1918,8 +1918,8 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator
if (gid != pRowSup->groupId || pInfo->winSup.prevTs == INT64_MIN) { if (gid != pRowSup->groupId || pInfo->winSup.prevTs == INT64_MIN) {
doKeepNewWindowStartInfo(pRowSup, tsList, j, gid); doKeepNewWindowStartInfo(pRowSup, tsList, j, gid);
doKeepTuple(pRowSup, tsList[j], gid); doKeepTuple(pRowSup, tsList[j], gid);
} else if ((tsList[j] - pRowSup->prevTs >= 0) && tsList[j] - pRowSup->prevTs <= gap || } else if (((tsList[j] - pRowSup->prevTs >= 0) && (tsList[j] - pRowSup->prevTs <= gap)) ||
(pRowSup->prevTs - tsList[j] >= 0) && (pRowSup->prevTs - tsList[j] <= gap)) { ((pRowSup->prevTs - tsList[j] >= 0) && (pRowSup->prevTs - tsList[j] <= gap))) {
// The gap is less than the threshold, so it belongs to current session window that has been opened already. // The gap is less than the threshold, so it belongs to current session window that has been opened already.
doKeepTuple(pRowSup, tsList[j], gid); doKeepTuple(pRowSup, tsList[j], gid);
if (j == 0 && pRowSup->startRowIndex != 0) { if (j == 0 && pRowSup->startRowIndex != 0) {

View File

@ -44,12 +44,15 @@ static void setColumnInfo(SFunctionNode* pFunc, SColumnNode* pCol) {
pCol->colType = COLUMN_TYPE_TBNAME; pCol->colType = COLUMN_TYPE_TBNAME;
break; break;
case FUNCTION_TYPE_WSTART: case FUNCTION_TYPE_WSTART:
pCol->colId = PRIMARYKEY_TIMESTAMP_COL_ID;
pCol->colType = COLUMN_TYPE_WINDOW_START;
break;
case FUNCTION_TYPE_WEND: case FUNCTION_TYPE_WEND:
pCol->colId = PRIMARYKEY_TIMESTAMP_COL_ID; pCol->colId = PRIMARYKEY_TIMESTAMP_COL_ID;
pCol->colType = COLUMN_TYPE_WINDOW_PC; pCol->colType = COLUMN_TYPE_WINDOW_END;
break; break;
case FUNCTION_TYPE_WDURATION: case FUNCTION_TYPE_WDURATION:
pCol->colType = COLUMN_TYPE_WINDOW_PC; pCol->colType = COLUMN_TYPE_WINDOW_DURATION;
break; break;
case FUNCTION_TYPE_GROUP_KEY: case FUNCTION_TYPE_GROUP_KEY:
pCol->colType = COLUMN_TYPE_GROUP_KEY; pCol->colType = COLUMN_TYPE_GROUP_KEY;
@ -784,7 +787,10 @@ static int32_t createWindowLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSele
static EDealRes needFillValueImpl(SNode* pNode, void* pContext) { static EDealRes needFillValueImpl(SNode* pNode, void* pContext) {
if (QUERY_NODE_COLUMN == nodeType(pNode)) { if (QUERY_NODE_COLUMN == nodeType(pNode)) {
SColumnNode* pCol = (SColumnNode*)pNode; SColumnNode* pCol = (SColumnNode*)pNode;
if (COLUMN_TYPE_WINDOW_PC != pCol->colType && COLUMN_TYPE_GROUP_KEY != pCol->colType) { if (COLUMN_TYPE_WINDOW_START != pCol->colType &&
COLUMN_TYPE_WINDOW_END != pCol->colType &&
COLUMN_TYPE_WINDOW_DURATION != pCol->colType &&
COLUMN_TYPE_GROUP_KEY != pCol->colType) {
*(bool*)pContext = true; *(bool*)pContext = true;
return DEAL_RES_END; return DEAL_RES_END;
} }