Merge pull request #23542 from taosdata/fix/TD-27041
adj builtin function
This commit is contained in:
commit
acd20506f6
|
@ -580,6 +580,7 @@ typedef struct SStreamSessionAggOperatorInfo {
|
|||
bool reCkBlock;
|
||||
SSDataBlock* pCheckpointRes;
|
||||
bool clearState;
|
||||
bool recvGetAll;
|
||||
} SStreamSessionAggOperatorInfo;
|
||||
|
||||
typedef struct SStreamStateAggOperatorInfo {
|
||||
|
@ -603,6 +604,7 @@ typedef struct SStreamStateAggOperatorInfo {
|
|||
SArray* historyWins;
|
||||
bool reCkBlock;
|
||||
SSDataBlock* pCheckpointRes;
|
||||
bool recvGetAll;
|
||||
} SStreamStateAggOperatorInfo;
|
||||
|
||||
typedef struct SStreamPartitionOperatorInfo {
|
||||
|
|
|
@ -2535,6 +2535,15 @@ void doStreamSessionSaveCheckpoint(SOperatorInfo* pOperator) {
|
|||
taosMemoryFree(buf);
|
||||
}
|
||||
|
||||
static void resetUnCloseSessionWinInfo(SSHashObj* winMap) {
|
||||
void* pIte = NULL;
|
||||
int32_t iter = 0;
|
||||
while ((pIte = tSimpleHashIterate(winMap, pIte, &iter)) != NULL) {
|
||||
SResultWindowInfo* pResInfo = pIte;
|
||||
pResInfo->pStatePos->beUsed = true;
|
||||
}
|
||||
}
|
||||
|
||||
static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
|
||||
SExprSupp* pSup = &pOperator->exprSupp;
|
||||
SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
|
||||
|
@ -2549,6 +2558,12 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
|
|||
if (opRes) {
|
||||
return opRes;
|
||||
}
|
||||
|
||||
if (pInfo->recvGetAll) {
|
||||
pInfo->recvGetAll = false;
|
||||
resetUnCloseSessionWinInfo(pInfo->streamAggSup.pResultRows);
|
||||
}
|
||||
|
||||
setOperatorCompleted(pOperator);
|
||||
return NULL;
|
||||
}
|
||||
|
@ -2586,6 +2601,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
|
|||
taosArrayDestroy(pWins);
|
||||
continue;
|
||||
} else if (pBlock->info.type == STREAM_GET_ALL) {
|
||||
pInfo->recvGetAll = true;
|
||||
getAllSessionWindow(pAggSup->pResultRows, pInfo->pStUpdated);
|
||||
continue;
|
||||
} else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE) {
|
||||
|
@ -2841,6 +2857,8 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh
|
|||
|
||||
pInfo->pCheckpointRes = createSpecialDataBlock(STREAM_CHECKPOINT);
|
||||
pInfo->clearState = false;
|
||||
pInfo->recvGetAll = false;
|
||||
|
||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION;
|
||||
// for stream
|
||||
void* buff = NULL;
|
||||
|
@ -3457,6 +3475,11 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
|
|||
return resBlock;
|
||||
}
|
||||
|
||||
if (pInfo->recvGetAll) {
|
||||
pInfo->recvGetAll = false;
|
||||
resetUnCloseSessionWinInfo(pInfo->streamAggSup.pResultRows);
|
||||
}
|
||||
|
||||
setOperatorCompleted(pOperator);
|
||||
return NULL;
|
||||
}
|
||||
|
@ -3485,6 +3508,7 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
|
|||
taosArrayDestroy(pWins);
|
||||
continue;
|
||||
} else if (pBlock->info.type == STREAM_GET_ALL) {
|
||||
pInfo->recvGetAll = true;
|
||||
getAllSessionWindow(pInfo->streamAggSup.pResultRows, pInfo->pSeUpdated);
|
||||
continue;
|
||||
} else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE) {
|
||||
|
@ -3715,6 +3739,7 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys
|
|||
}
|
||||
|
||||
pInfo->pCheckpointRes = createSpecialDataBlock(STREAM_CHECKPOINT);
|
||||
pInfo->recvGetAll = false;
|
||||
|
||||
// for stream
|
||||
void* buff = NULL;
|
||||
|
|
|
@ -3312,26 +3312,6 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
|||
.sprocessFunc = castFunction,
|
||||
.finalizeFunc = NULL
|
||||
},
|
||||
{
|
||||
.name = "to_timestamp",
|
||||
.type = FUNCTION_TYPE_TO_TIMESTAMP,
|
||||
.classification = FUNC_MGT_SCALAR_FUNC | FUNC_MGT_DATETIME_FUNC,
|
||||
.translateFunc = translateToTimestamp,
|
||||
.getEnvFunc = NULL,
|
||||
.initFunc = NULL,
|
||||
.sprocessFunc = toTimestampFunction,
|
||||
.finalizeFunc = NULL
|
||||
},
|
||||
{
|
||||
.name = "to_char",
|
||||
.type = FUNCTION_TYPE_TO_CHAR,
|
||||
.classification = FUNC_MGT_SCALAR_FUNC,
|
||||
.translateFunc = translateToChar,
|
||||
.getEnvFunc = NULL,
|
||||
.initFunc = NULL,
|
||||
.sprocessFunc = toCharFunction,
|
||||
.finalizeFunc = NULL
|
||||
},
|
||||
{
|
||||
.name = "to_iso8601",
|
||||
.type = FUNCTION_TYPE_TO_ISO8601,
|
||||
|
@ -3709,6 +3689,26 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
|||
.sprocessFunc = qVgIdFunction,
|
||||
.finalizeFunc = NULL
|
||||
},
|
||||
{
|
||||
.name = "to_timestamp",
|
||||
.type = FUNCTION_TYPE_TO_TIMESTAMP,
|
||||
.classification = FUNC_MGT_SCALAR_FUNC | FUNC_MGT_DATETIME_FUNC,
|
||||
.translateFunc = translateToTimestamp,
|
||||
.getEnvFunc = NULL,
|
||||
.initFunc = NULL,
|
||||
.sprocessFunc = toTimestampFunction,
|
||||
.finalizeFunc = NULL
|
||||
},
|
||||
{
|
||||
.name = "to_char",
|
||||
.type = FUNCTION_TYPE_TO_CHAR,
|
||||
.classification = FUNC_MGT_SCALAR_FUNC,
|
||||
.translateFunc = translateToChar,
|
||||
.getEnvFunc = NULL,
|
||||
.initFunc = NULL,
|
||||
.sprocessFunc = toCharFunction,
|
||||
.finalizeFunc = NULL
|
||||
},
|
||||
|
||||
};
|
||||
// clang-format on
|
||||
|
|
Loading…
Reference in New Issue