Merge pull request #20146 from taosdata/fix/TD-22538
fix:Ignore expression exception & add some log
This commit is contained in:
commit
be0e4b53e7
|
@ -107,7 +107,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
|
||||||
pOperator->status = OP_NOT_OPENED;
|
pOperator->status = OP_NOT_OPENED;
|
||||||
|
|
||||||
SStreamScanInfo* pInfo = pOperator->info;
|
SStreamScanInfo* pInfo = pOperator->info;
|
||||||
|
qDebug("stream set total blocks:%d, task id:%s" PRIx64, (int32_t)numOfBlocks, id);
|
||||||
ASSERT(pInfo->validBlockIndex == 0);
|
ASSERT(pInfo->validBlockIndex == 0);
|
||||||
ASSERT(taosArrayGetSize(pInfo->pBlockLists) == 0);
|
ASSERT(taosArrayGetSize(pInfo->pBlockLists) == 0);
|
||||||
|
|
||||||
|
|
|
@ -924,6 +924,7 @@ static void doStreamFillLinear(SStreamFillSupporter* pFillSup, SStreamFillInfo*
|
||||||
static void keepResultInDiscBuf(SOperatorInfo* pOperator, uint64_t groupId, SResultRowData* pRow, int32_t len) {
|
static void keepResultInDiscBuf(SOperatorInfo* pOperator, uint64_t groupId, SResultRowData* pRow, int32_t len) {
|
||||||
SWinKey key = {.groupId = groupId, .ts = pRow->key};
|
SWinKey key = {.groupId = groupId, .ts = pRow->key};
|
||||||
int32_t code = streamStateFillPut(pOperator->pTaskInfo->streamInfo.pState, &key, pRow->pRowVal, len);
|
int32_t code = streamStateFillPut(pOperator->pTaskInfo->streamInfo.pState, &key, pRow->pRowVal, len);
|
||||||
|
qDebug("===stream===fill operator save key ts:%" PRId64 " group id:%" PRIu64 " code:%d", key.ts, key.groupId, code);
|
||||||
ASSERT(code == TSDB_CODE_SUCCESS);
|
ASSERT(code == TSDB_CODE_SUCCESS);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1064,11 +1064,8 @@ static SSDataBlock* doStreamHashPartition(SOperatorInfo* pOperator) {
|
||||||
|
|
||||||
// there is an scalar expression that needs to be calculated right before apply the group aggregation.
|
// there is an scalar expression that needs to be calculated right before apply the group aggregation.
|
||||||
if (pInfo->scalarSup.pExprInfo != NULL) {
|
if (pInfo->scalarSup.pExprInfo != NULL) {
|
||||||
pTaskInfo->code = projectApplyFunctions(pInfo->scalarSup.pExprInfo, pBlock, pBlock, pInfo->scalarSup.pCtx,
|
projectApplyFunctions(pInfo->scalarSup.pExprInfo, pBlock, pBlock, pInfo->scalarSup.pCtx,
|
||||||
pInfo->scalarSup.numOfExprs, NULL);
|
pInfo->scalarSup.numOfExprs, NULL);
|
||||||
if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
|
|
||||||
longjmp(pTaskInfo->env, pTaskInfo->code);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
taosHashClear(pInfo->pPartitions);
|
taosHashClear(pInfo->pPartitions);
|
||||||
doStreamHashPartitionImpl(pInfo, pBlock);
|
doStreamHashPartitionImpl(pInfo, pBlock);
|
||||||
|
|
|
@ -1538,16 +1538,16 @@ static void deleteIntervalDiscBuf(SStreamState* pState, SHashObj* pPullDataMap,
|
||||||
code = streamStateGetKVByCur(pCur, &tmpKey, NULL, 0);
|
code = streamStateGetKVByCur(pCur, &tmpKey, NULL, 0);
|
||||||
if (code == TSDB_CODE_SUCCESS) {
|
if (code == TSDB_CODE_SUCCESS) {
|
||||||
STimeWindow tw = getFinalTimeWindow(tmpKey.ts, pInterval);
|
STimeWindow tw = getFinalTimeWindow(tmpKey.ts, pInterval);
|
||||||
qDebug("===stream===error stream state first key:%" PRId64 "-%" PRId64 ",%" PRId64 ",mark %" PRId64, tw.skey,
|
qDebug("===stream===error stream state first key:%" PRId64 "-%" PRId64 ",%" PRIu64 ",mark %" PRId64, tw.skey,
|
||||||
tw.ekey, tmpKey.groupId, mark);
|
tw.ekey, tmpKey.groupId, mark);
|
||||||
} else {
|
} else {
|
||||||
STimeWindow tw = getFinalTimeWindow(key->ts, pInterval);
|
STimeWindow tw = getFinalTimeWindow(key->ts, pInterval);
|
||||||
qDebug("===stream===stream state first key:%" PRId64 "-%" PRId64 ",%" PRId64 ",mark %" PRId64, tw.skey, tw.ekey,
|
qDebug("===stream===stream state first key:%" PRId64 "-%" PRId64 ",%" PRIu64 ",mark %" PRId64, tw.skey, tw.ekey,
|
||||||
key->groupId, mark);
|
key->groupId, mark);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
STimeWindow tw = getFinalTimeWindow(key->ts, pInterval);
|
STimeWindow tw = getFinalTimeWindow(key->ts, pInterval);
|
||||||
qDebug("===stream===stream state first key:%" PRId64 "-%" PRId64 ",%" PRId64 ",mark %" PRId64, tw.skey, tw.ekey,
|
qDebug("===stream===stream state first key:%" PRId64 "-%" PRId64 ",%" PRIu64 ",mark %" PRId64, tw.skey, tw.ekey,
|
||||||
key->groupId, mark);
|
key->groupId, mark);
|
||||||
}
|
}
|
||||||
streamStateFreeCur(pCur);
|
streamStateFreeCur(pCur);
|
||||||
|
|
Loading…
Reference in New Issue