From 84cf3ec69660a8c5f521ccfcbefee5458b572c85 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao@163.com> Date: Fri, 17 Feb 2023 14:20:45 +0800 Subject: [PATCH 1/2] fix:add log --- source/libs/executor/src/executor.c | 2 +- source/libs/executor/src/groupoperator.c | 5 +---- source/libs/executor/src/scanoperator.c | 2 +- 3 files changed, 3 insertions(+), 6 deletions(-) diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 366ef09e60..e5e9b45576 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -107,7 +107,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu pOperator->status = OP_NOT_OPENED; SStreamScanInfo* pInfo = pOperator->info; - +///// numOfBlocks总和 日志 ASSERT(pInfo->validBlockIndex == 0); ASSERT(taosArrayGetSize(pInfo->pBlockLists) == 0); diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 161fe52367..aa61d24b92 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -1064,11 +1064,8 @@ static SSDataBlock* doStreamHashPartition(SOperatorInfo* pOperator) { // there is an scalar expression that needs to be calculated right before apply the group aggregation. if (pInfo->scalarSup.pExprInfo != NULL) { - pTaskInfo->code = projectApplyFunctions(pInfo->scalarSup.pExprInfo, pBlock, pBlock, pInfo->scalarSup.pCtx, + projectApplyFunctions(pInfo->scalarSup.pExprInfo, pBlock, pBlock, pInfo->scalarSup.pCtx, pInfo->scalarSup.numOfExprs, NULL); - if (pTaskInfo->code != TSDB_CODE_SUCCESS) { - longjmp(pTaskInfo->env, pTaskInfo->code); - } } taosHashClear(pInfo->pPartitions); doStreamHashPartitionImpl(pInfo, pBlock); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 71e1068fb3..05f26332f2 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1798,7 +1798,7 @@ FETCH_NEXT_BLOCK: /*pOperator->status = OP_EXEC_DONE;*/ return NULL; } - +//////todo int32_t current = pInfo->validBlockIndex++; SSDataBlock* pBlock = taosArrayGetP(pInfo->pBlockLists, current); if (pBlock->info.id.groupId && pBlock->info.parTbName[0]) { From 40f9aaa7153327f05fb8096d888bd2e10ba3ac5e Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao@163.com> Date: Fri, 24 Feb 2023 11:00:08 +0800 Subject: [PATCH 2/2] fix:Ignore expression exception & add some log --- source/libs/executor/src/executor.c | 2 +- source/libs/executor/src/filloperator.c | 1 + source/libs/executor/src/scanoperator.c | 2 +- source/libs/executor/src/timewindowoperator.c | 6 +++--- 4 files changed, 6 insertions(+), 5 deletions(-) diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index e5e9b45576..82f079e2fb 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -107,7 +107,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu pOperator->status = OP_NOT_OPENED; SStreamScanInfo* pInfo = pOperator->info; -///// numOfBlocks总和 日志 + qDebug("stream set total blocks:%d, task id:%s" PRIx64, (int32_t)numOfBlocks, id); ASSERT(pInfo->validBlockIndex == 0); ASSERT(taosArrayGetSize(pInfo->pBlockLists) == 0); diff --git a/source/libs/executor/src/filloperator.c b/source/libs/executor/src/filloperator.c index 091b10a63e..a1426e2a96 100644 --- a/source/libs/executor/src/filloperator.c +++ b/source/libs/executor/src/filloperator.c @@ -924,6 +924,7 @@ static void doStreamFillLinear(SStreamFillSupporter* pFillSup, SStreamFillInfo* static void keepResultInDiscBuf(SOperatorInfo* pOperator, uint64_t groupId, SResultRowData* pRow, int32_t len) { SWinKey key = {.groupId = groupId, .ts = pRow->key}; int32_t code = streamStateFillPut(pOperator->pTaskInfo->streamInfo.pState, &key, pRow->pRowVal, len); + qDebug("===stream===fill operator save key ts:%" PRId64 " group id:%" PRIu64 " code:%d", key.ts, key.groupId, code); ASSERT(code == TSDB_CODE_SUCCESS); } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 05f26332f2..71e1068fb3 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1798,7 +1798,7 @@ FETCH_NEXT_BLOCK: /*pOperator->status = OP_EXEC_DONE;*/ return NULL; } -//////todo + int32_t current = pInfo->validBlockIndex++; SSDataBlock* pBlock = taosArrayGetP(pInfo->pBlockLists, current); if (pBlock->info.id.groupId && pBlock->info.parTbName[0]) { diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 54be30028e..62d68d5ca2 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1538,16 +1538,16 @@ static void deleteIntervalDiscBuf(SStreamState* pState, SHashObj* pPullDataMap, code = streamStateGetKVByCur(pCur, &tmpKey, NULL, 0); if (code == TSDB_CODE_SUCCESS) { STimeWindow tw = getFinalTimeWindow(tmpKey.ts, pInterval); - qDebug("===stream===error stream state first key:%" PRId64 "-%" PRId64 ",%" PRId64 ",mark %" PRId64, tw.skey, + qDebug("===stream===error stream state first key:%" PRId64 "-%" PRId64 ",%" PRIu64 ",mark %" PRId64, tw.skey, tw.ekey, tmpKey.groupId, mark); } else { STimeWindow tw = getFinalTimeWindow(key->ts, pInterval); - qDebug("===stream===stream state first key:%" PRId64 "-%" PRId64 ",%" PRId64 ",mark %" PRId64, tw.skey, tw.ekey, + qDebug("===stream===stream state first key:%" PRId64 "-%" PRId64 ",%" PRIu64 ",mark %" PRId64, tw.skey, tw.ekey, key->groupId, mark); } } else { STimeWindow tw = getFinalTimeWindow(key->ts, pInterval); - qDebug("===stream===stream state first key:%" PRId64 "-%" PRId64 ",%" PRId64 ",mark %" PRId64, tw.skey, tw.ekey, + qDebug("===stream===stream state first key:%" PRId64 "-%" PRId64 ",%" PRIu64 ",mark %" PRId64, tw.skey, tw.ekey, key->groupId, mark); } streamStateFreeCur(pCur);