Merge branch 'main' into fix/droptask
This commit is contained in:
commit
c726420081
|
@ -0,0 +1,315 @@
|
||||||
|
name: TDengine CI Test
|
||||||
|
|
||||||
|
on:
|
||||||
|
pull_request:
|
||||||
|
branches:
|
||||||
|
- 'main'
|
||||||
|
- '3.0'
|
||||||
|
- '3.1'
|
||||||
|
paths-ignore:
|
||||||
|
- 'packaging/**'
|
||||||
|
- 'docs/**'
|
||||||
|
repository_dispatch:
|
||||||
|
types: [run-tests]
|
||||||
|
|
||||||
|
concurrency:
|
||||||
|
group: ${{ github.workflow }}-${{ github.ref }}
|
||||||
|
cancel-in-progress: true
|
||||||
|
|
||||||
|
env:
|
||||||
|
CONTAINER_NAME: 'taosd-test'
|
||||||
|
WKDIR: '/var/lib/jenkins/workspace'
|
||||||
|
WK: '/var/lib/jenkins/workspace/TDinternal'
|
||||||
|
WKC: '/var/lib/jenkins/workspace/TDinternal/community'
|
||||||
|
|
||||||
|
jobs:
|
||||||
|
fetch-parameters:
|
||||||
|
runs-on:
|
||||||
|
group: CI
|
||||||
|
labels: [self-hosted, Linux, X64, testing]
|
||||||
|
outputs:
|
||||||
|
tdinternal: ${{ steps.parameters.outputs.tdinternal }}
|
||||||
|
run_function_test: ${{ steps.parameters.outputs.run_function_test }}
|
||||||
|
run_tdgpt_test: ${{ steps.parameters.outputs.run_tdgpt_test }}
|
||||||
|
source_branch: ${{ steps.parameters.outputs.source_branch }}
|
||||||
|
target_branch: ${{ steps.parameters.outputs.target_branch }}
|
||||||
|
pr_number: ${{ steps.parameters.outputs.pr_number }}
|
||||||
|
steps:
|
||||||
|
- name: Determine trigger source and fetch parameters
|
||||||
|
id: parameters
|
||||||
|
run: |
|
||||||
|
set -euo pipefail
|
||||||
|
# check the trigger source and get branch information
|
||||||
|
if [ "${{ github.event_name }}" == "repository_dispatch" ]; then
|
||||||
|
tdinternal="true"
|
||||||
|
source_branch=${{ github.event.client_payload.tdinternal_source_branch }}
|
||||||
|
target_branch=${{ github.event.client_payload.tdinternal_target_branch }}
|
||||||
|
pr_number=${{ github.event.client_payload.tdinternal_pr_number }}
|
||||||
|
run_tdgpt_test="true"
|
||||||
|
run_function_test="true"
|
||||||
|
else
|
||||||
|
tdinternal="false"
|
||||||
|
source_branch=${{ github.event.pull_request.head.ref }}
|
||||||
|
target_branch=${{ github.event.pull_request.base.ref }}
|
||||||
|
pr_number=${{ github.event.pull_request.number }}
|
||||||
|
|
||||||
|
# check whether to run tdgpt test cases
|
||||||
|
cd ${{ env.WKC }}
|
||||||
|
changed_files_non_doc=$(git --no-pager diff --name-only FETCH_HEAD `git merge-base FETCH_HEAD $target_branch`|grep -v "^docs/en/"|grep -v "^docs/zh/"|grep -v ".md$" | tr '\n' ' ' || :)
|
||||||
|
|
||||||
|
if [[ "$changed_files_non_doc" != '' && "$changed_files_non_doc" =~ /forecastoperator.c|anomalywindowoperator.c|tanalytics.h|tanalytics.c|tdgpt_cases.task|analytics/ ]]; then
|
||||||
|
run_tdgpt_test="true"
|
||||||
|
else
|
||||||
|
run_tdgpt_test="false"
|
||||||
|
fi
|
||||||
|
|
||||||
|
# check whether to run function test cases
|
||||||
|
changed_files_non_tdgpt=$(git --no-pager diff --name-only FETCH_HEAD `git merge-base FETCH_HEAD $target_branch`|grep -v "^docs/en/"|grep -v "^docs/zh/"|grep -v ".md$" | grep -Ev "forecastoperator.c|anomalywindowoperator.c|tanalytics.h|tanalytics.c|tdgpt_cases.task|analytics" | tr '\n' ' ' ||:)
|
||||||
|
if [ $changed_files_non_tdgpt != '' ]; then
|
||||||
|
run_function_test="true"
|
||||||
|
else
|
||||||
|
run_function_test="false"
|
||||||
|
fi
|
||||||
|
fi
|
||||||
|
|
||||||
|
echo "tdinternal=$tdinternal" >> $GITHUB_OUTPUT
|
||||||
|
echo "run_function_test=$run_function_test" >> $GITHUB_OUTPUT
|
||||||
|
echo "run_tdgpt_test=$run_tdgpt_test" >> $GITHUB_OUTPUT
|
||||||
|
echo "source_branch=$source_branch" >> $GITHUB_OUTPUT
|
||||||
|
echo "target_branch=$target_branch" >> $GITHUB_OUTPUT
|
||||||
|
echo "pr_number=$pr_number" >> $GITHUB_OUTPUT
|
||||||
|
|
||||||
|
run-tests-on-linux:
|
||||||
|
needs: fetch-parameters
|
||||||
|
runs-on:
|
||||||
|
group: CI
|
||||||
|
labels: [self-hosted, Linux, X64, testing]
|
||||||
|
timeout-minutes: 200
|
||||||
|
env:
|
||||||
|
IS_TDINTERNAL: ${{ needs.fetch-parameters.outputs.tdinternal }}
|
||||||
|
RUN_RUNCTION_TEST: ${{ needs.fetch-parameters.outputs.run_function_test }}
|
||||||
|
RUN_TDGPT_TEST: ${{ needs.fetch-parameters.outputs.run_tdgpt_tests }}
|
||||||
|
SOURCE_BRANCH: ${{ needs.fetch-parameters.outputs.source_branch }}
|
||||||
|
TARGET_BRANCH: ${{ needs.fetch-parameters.outputs.target_branch }}
|
||||||
|
PR_NUMBER: ${{ needs.fetch-parameters.outputs.pr_number }}
|
||||||
|
steps:
|
||||||
|
- name: Output the environment information
|
||||||
|
run: |
|
||||||
|
echo "::group::Environment Info"
|
||||||
|
date
|
||||||
|
hostname
|
||||||
|
env
|
||||||
|
echo "Runner: ${{ runner.name }}"
|
||||||
|
echo "Trigger Source from TDinternal: ${{ env.IS_TDINTERNAL }}"
|
||||||
|
echo "Workspace: ${{ env.WKDIR }}"
|
||||||
|
git --version
|
||||||
|
echo "${{ env.WKDIR }}/restore.sh -p ${{ env.PR_NUMBER }} -n ${{ github.run_number }} -c ${{ env.CONTAINER_NAME }}"
|
||||||
|
echo "::endgroup::"
|
||||||
|
|
||||||
|
- name: Prepare repositories
|
||||||
|
run: |
|
||||||
|
set -euo pipefail
|
||||||
|
prepare_environment() {
|
||||||
|
cd "$1"
|
||||||
|
git reset --hard
|
||||||
|
git clean -f
|
||||||
|
git remote prune origin
|
||||||
|
git fetch
|
||||||
|
git checkout "$2"
|
||||||
|
}
|
||||||
|
prepare_environment "${{ env.WK }}" "${{ env.TARGET_BRANCH }}"
|
||||||
|
prepare_environment "${{ env.WKC }}" "${{ env.TARGET_BRANCH }}"
|
||||||
|
|
||||||
|
- name: Get latest codes and logs for TDinternal PR
|
||||||
|
if: ${{ env.IS_TDINTERNAL == 'true' }}
|
||||||
|
run: |
|
||||||
|
cd ${{ env.WK }}
|
||||||
|
git pull >/dev/null
|
||||||
|
git log -5
|
||||||
|
echo "`date "+%Y%m%d-%H%M%S"` TDinternalTest/${{ env.PR_NUMBER }}:${{ github.run_number }}:${{ env.TARGET_BRANCH }}" >>${{ env.WKDIR }}/jenkins.log
|
||||||
|
echo "CHANGE_BRANCH:${{ env.SOURCE_BRANCH }}" >>${{ env.WKDIR }}/jenkins.log
|
||||||
|
echo "TDinternal log: `git log -5`" >>${{ env.WKDIR }}/jenkins.log
|
||||||
|
git fetch origin +refs/pull/${{ env.PR_NUMBER }}/merge
|
||||||
|
git checkout -qf FETCH_HEAD
|
||||||
|
git log -5
|
||||||
|
echo "TDinternal log merged: `git log -5`" >>${{ env.WKDIR }}/jenkins.log
|
||||||
|
cd ${{ env.WKC }}
|
||||||
|
git remote prune origin
|
||||||
|
git pull >/dev/null
|
||||||
|
git log -5
|
||||||
|
echo "community log: `git log -5`" >>${{ env.WKDIR }}/jenkins.log
|
||||||
|
- name: Get latest codes and logs for TDengine PR
|
||||||
|
if: ${{ env.IS_TDINTERNAL == 'false' }}
|
||||||
|
run: |
|
||||||
|
cd ${{ env.WKC }}
|
||||||
|
git remote prune origin
|
||||||
|
git pull >/dev/null
|
||||||
|
git log -5
|
||||||
|
echo "`date "+%Y%m%d-%H%M%S"` TDengineTest/${{ env.PR_NUMBER }}:${{ github.run_number }}:${{ env.TARGET_BRANCH }}" >>${{ env.WKDIR }}/jenkins.log
|
||||||
|
echo "CHANGE_BRANCH:${{ env.SOURCE_BRANCH }}" >>${{ env.WKDIR }}/jenkins.log
|
||||||
|
echo "community log: `git log -5`" >>${{ env.WKDIR }}/jenkins.log
|
||||||
|
git fetch origin +refs/pull/${{ env.PR_NUMBER }}/merge
|
||||||
|
git checkout -qf FETCH_HEAD
|
||||||
|
git log -5
|
||||||
|
echo "community log merged: `git log -5`" >>${{ env.WKDIR }}/jenkins.log
|
||||||
|
cd ${{ env.WK }}
|
||||||
|
git pull >/dev/null
|
||||||
|
git log -5
|
||||||
|
echo "TDinternal log: `git log -5`" >>${{ env.WKDIR }}/jenkins.log
|
||||||
|
- name: Update submodule
|
||||||
|
run: |
|
||||||
|
cd ${{ env.WKC }}
|
||||||
|
git submodule update --init --recursive
|
||||||
|
- name: Output the 'file_no_doc_changed' information to the file
|
||||||
|
if: ${{ env.IS_TDINTERNAL == 'false' }}
|
||||||
|
run: |
|
||||||
|
mkdir -p ${{ env.WKDIR }}/tmp/${{ env.PR_NUMBER }}_${{ github.run_number }}
|
||||||
|
changed_files_non_doc=$(git --no-pager diff --name-only FETCH_HEAD `git merge-base FETCH_HEAD ${{ env.TARGET_BRANCH }}`|grep -v "^docs/en/"|grep -v "^docs/zh/"|grep -v ".md$" | tr '\n' ' ' || :)
|
||||||
|
echo $changed_files_non_doc > ${{ env.WKDIR }}/tmp/${{ env.PR_NUMBER }}_${{ github.run_number }}/docs_changed.txt
|
||||||
|
- name: Check assert testing
|
||||||
|
run: |
|
||||||
|
cd ${{ env.WKC }}/tests/parallel_test
|
||||||
|
./run_check_assert_container.sh -d ${{ env.WKDIR }}
|
||||||
|
- name: Check void function testing
|
||||||
|
run: |
|
||||||
|
cd ${{ env.WKC }}/tests/parallel_test
|
||||||
|
./run_check_void_container.sh -d ${{ env.WKDIR }}
|
||||||
|
- name: Build docker container
|
||||||
|
run: |
|
||||||
|
date
|
||||||
|
rm -rf ${{ env.WKC }}/debug
|
||||||
|
cd ${{ env.WKC }}/tests/parallel_test
|
||||||
|
time ./container_build.sh -w ${{ env.WKDIR }} -e
|
||||||
|
- name: Get parameters for testing
|
||||||
|
id: get_param
|
||||||
|
run: |
|
||||||
|
log_server_file="/home/log_server.json"
|
||||||
|
timeout_cmd=""
|
||||||
|
extra_param=""
|
||||||
|
|
||||||
|
if [ -f "$log_server_file" ]; then
|
||||||
|
log_server_enabled=$(jq '.enabled' "$log_server_file")
|
||||||
|
timeout_param=$(jq '.timeout' "$log_server_file")
|
||||||
|
if [ "$timeout_param" != "null" ] && [ "$timeout_param" != "0" ]; then
|
||||||
|
timeout_cmd="timeout $timeout_param"
|
||||||
|
fi
|
||||||
|
|
||||||
|
if [ "$log_server_enabled" == "1" ]; then
|
||||||
|
log_server=$(jq '.server' "$log_server_file" | sed 's/\\\"//g')
|
||||||
|
if [ "$log_server" != "null" ] && [ "$log_server" != "" ]; then
|
||||||
|
extra_param="-w $log_server"
|
||||||
|
fi
|
||||||
|
fi
|
||||||
|
fi
|
||||||
|
echo "timeout_cmd=$timeout_cmd" >> $GITHUB_OUTPUT
|
||||||
|
echo "extra_param=$extra_param" >> $GITHUB_OUTPUT
|
||||||
|
- name: Run function returns with a null pointer scan testing
|
||||||
|
run: |
|
||||||
|
cd ${{ env.WKC }}/tests/parallel_test
|
||||||
|
./run_scan_container.sh -d ${{ env.WKDIR }} -b ${{ env.PR_NUMBER }}_${{ github.run_number }} -f ${{ env.WKDIR }}/tmp/${{ env.PR_NUMBER }}_${{ github.run_number }}/docs_changed.txt ${{ steps.get_param.outputs.extra_param }}
|
||||||
|
- name: Run tdgpt test cases
|
||||||
|
if: ${{ env.IS_TDINTERNAL }} == 'false' && ${{ env.RUN_TDGPT_TEST }} == 'true'
|
||||||
|
run: |
|
||||||
|
cd ${{ env.WKC }}/tests/parallel_test
|
||||||
|
export DEFAULT_RETRY_TIME=2
|
||||||
|
date
|
||||||
|
timeout 600 time ./run.sh -e -m /home/m.json -t tdgpt_cases.task -b ${{ env.PR_NUMBER }}_${{ github.run_number }} -l ${{ env.WKDIR }}/log -o 300 ${{ steps.get_param.outputs.extra_param }}
|
||||||
|
- name: Run function test cases
|
||||||
|
if: ${{ env.RUN_RUNCTION_TEST }} == 'true'
|
||||||
|
run: |
|
||||||
|
cd ${{ env.WKC }}/tests/parallel_test
|
||||||
|
export DEFAULT_RETRY_TIME=2
|
||||||
|
date
|
||||||
|
${{ steps.get_param.outputs.timeout_cmd }} time ./run.sh -e -m /home/m.json -t cases.task -b ${{ env.PR_NUMBER }}_${{ github.run_number }} -l ${{ env.WKDIR }}/log -o 1200 ${{ steps.get_param.outputs.extra_param }}
|
||||||
|
|
||||||
|
run-tests-on-mac:
|
||||||
|
needs: fetch-parameters
|
||||||
|
if: ${{ needs.fetch-parameters.outputs.run_function_test == 'false' }}
|
||||||
|
runs-on:
|
||||||
|
group: CI
|
||||||
|
labels: [self-hosted, macOS, ARM64, testing]
|
||||||
|
timeout-minutes: 60
|
||||||
|
env:
|
||||||
|
IS_TDINTERNAL: ${{ needs.fetch-parameters.outputs.tdinternal }}
|
||||||
|
SOURCE_BRANCH: ${{ needs.fetch-parameters.outputs.source_branch }}
|
||||||
|
TARGET_BRANCH: ${{ needs.fetch-parameters.outputs.target_branch }}
|
||||||
|
PR_NUMBER: ${{ needs.fetch-parameters.outputs.pr_number }}
|
||||||
|
steps:
|
||||||
|
- name: Output the environment information
|
||||||
|
run: |
|
||||||
|
echo "::group::Environment Info"
|
||||||
|
date
|
||||||
|
hostname
|
||||||
|
env
|
||||||
|
echo "Runner: ${{ runner.name }}"
|
||||||
|
echo "Trigger Source from TDinternal: ${{ env.IS_TDINTERNAL }}"
|
||||||
|
echo "Workspace: ${{ env.WKDIR }}"
|
||||||
|
git --version
|
||||||
|
echo "${{ env.WKDIR }}/restore.sh -p ${{ env.PR_NUMBER }} -n ${{ github.run_number }} -c ${{ env.CONTAINER_NAME }}"
|
||||||
|
echo "::endgroup::"
|
||||||
|
- name: Prepare repositories
|
||||||
|
run: |
|
||||||
|
set -euo pipefail
|
||||||
|
prepare_environment() {
|
||||||
|
cd "$1"
|
||||||
|
git reset --hard
|
||||||
|
git clean -f
|
||||||
|
git remote prune origin
|
||||||
|
git fetch
|
||||||
|
git checkout "$2"
|
||||||
|
}
|
||||||
|
prepare_environment "${{ env.WK }}" "${{ env.TARGET_BRANCH }}"
|
||||||
|
prepare_environment "${{ env.WKC }}" "${{ env.TARGET_BRANCH }}"
|
||||||
|
- name: Get latest codes and logs for TDinternal PR
|
||||||
|
if: ${{ env.IS_TDINTERNAL == 'true' }}
|
||||||
|
run: |
|
||||||
|
cd ${{ env.WK }}
|
||||||
|
git pull >/dev/null
|
||||||
|
git log -5
|
||||||
|
echo "`date "+%Y%m%d-%H%M%S"` TDinternalTest/${{ env.PR_NUMBER }}:${{ github.run_number }}:${{ env.TARGET_BRANCH }}" >>${{ env.WKDIR }}/jenkins.log
|
||||||
|
echo "CHANGE_BRANCH:${{ env.SOURCE_BRANCH }}" >>${{ env.WKDIR }}/jenkins.log
|
||||||
|
echo "TDinternal log: `git log -5`" >>${{ env.WKDIR }}/jenkins.log
|
||||||
|
git fetch origin +refs/pull/${{ env.PR_NUMBER }}/merge
|
||||||
|
git checkout -qf FETCH_HEAD
|
||||||
|
git log -5
|
||||||
|
echo "TDinternal log merged: `git log -5`" >>${{ env.WKDIR }}/jenkins.log
|
||||||
|
cd ${{ env.WKC }}
|
||||||
|
git remote prune origin
|
||||||
|
git pull >/dev/null
|
||||||
|
git log -5
|
||||||
|
echo "community log: `git log -5`" >>${{ env.WKDIR }}/jenkins.log
|
||||||
|
- name: Get latest codes and logs for TDengine PR
|
||||||
|
if: ${{ env.IS_TDINTERNAL == 'false' }}
|
||||||
|
run: |
|
||||||
|
cd ${{ env.WKC }}
|
||||||
|
git remote prune origin
|
||||||
|
git pull >/dev/null
|
||||||
|
git log -5
|
||||||
|
echo "`date "+%Y%m%d-%H%M%S"` TDengineTest/${{ env.PR_NUMBER }}:${{ github.run_number }}:${{ env.TARGET_BRANCH }}" >>${{ env.WKDIR }}/jenkins.log
|
||||||
|
echo "CHANGE_BRANCH:${{ env.SOURCE_BRANCH }}" >>${{ env.WKDIR }}/jenkins.log
|
||||||
|
echo "community log: `git log -5`" >>${{ env.WKDIR }}/jenkins.log
|
||||||
|
git fetch origin +refs/pull/${{ env.PR_NUMBER }}/merge
|
||||||
|
git checkout -qf FETCH_HEAD
|
||||||
|
git log -5
|
||||||
|
echo "community log merged: `git log -5`" >>${{ env.WKDIR }}/jenkins.log
|
||||||
|
cd ${{ env.WK }}
|
||||||
|
git pull >/dev/null
|
||||||
|
git log -5
|
||||||
|
echo "TDinternal log: `git log -5`" >>${{ env.WKDIR }}/jenkins.log
|
||||||
|
- name: Update submodule
|
||||||
|
run: |
|
||||||
|
cd ${{ env.WKC }}
|
||||||
|
git submodule update --init --recursive
|
||||||
|
- name: Run tests
|
||||||
|
run: |
|
||||||
|
date
|
||||||
|
cd ${{ env.WK }}
|
||||||
|
rm -rf debug
|
||||||
|
mkdir debug
|
||||||
|
cd ${{ env.WK }}/debug
|
||||||
|
echo $PATH
|
||||||
|
echo "PATH=/opt/homebrew/bin:$PATH" >> $GITHUB_ENV
|
||||||
|
cmake .. -DBUILD_TEST=true -DBUILD_HTTPS=false -DCMAKE_BUILD_TYPE=Release
|
||||||
|
make -j10
|
||||||
|
ctest -j10 || exit 7
|
||||||
|
date
|
|
@ -1,17 +1,14 @@
|
||||||
name: TDengine Doc Build
|
name: TDengine Doc Build
|
||||||
|
|
||||||
on:
|
on:
|
||||||
workflow_call:
|
pull_request:
|
||||||
inputs:
|
branches:
|
||||||
target_branch:
|
- 'main'
|
||||||
description: "Target branch name of for building the document"
|
- '3.0'
|
||||||
required: true
|
- '3.1'
|
||||||
type: string
|
paths:
|
||||||
|
- 'docs/**'
|
||||||
target_pr_number:
|
- '*.md'
|
||||||
description: "PR number of target branch to merge for building the document"
|
|
||||||
required: true
|
|
||||||
type: string
|
|
||||||
|
|
||||||
env:
|
env:
|
||||||
DOC_WKC: "/root/doc_ci_work"
|
DOC_WKC: "/root/doc_ci_work"
|
||||||
|
@ -21,81 +18,32 @@ env:
|
||||||
TOOLS_REPO: "taos-tools"
|
TOOLS_REPO: "taos-tools"
|
||||||
|
|
||||||
jobs:
|
jobs:
|
||||||
check:
|
build-doc:
|
||||||
runs-on:
|
|
||||||
group: CI
|
|
||||||
labels: [self-hosted, doc-build]
|
|
||||||
outputs:
|
|
||||||
changed_files_zh: ${{ steps.set_output.outputs.changed_files_zh }}
|
|
||||||
changed_files_en: ${{ steps.set_output.outputs.changed_files_en }}
|
|
||||||
changed_files_non_doc: ${{ steps.set_output.outputs.changed_files_non_doc }}
|
|
||||||
changed_files_non_tdgpt: ${{ steps.set_output.outputs.changed_files_non_tdgpt }}
|
|
||||||
steps:
|
|
||||||
- name: Get the latest document contents from the repository
|
|
||||||
run: |
|
|
||||||
set -e
|
|
||||||
# ./.github/scripts/update_repo.sh ${{ env.DOC_WKC }}/${{ env.TD_REPO }} ${{ inputs.target_branch }} ${{ inputs.target_pr_number }}
|
|
||||||
cd ${{ env.DOC_WKC }}/${{ env.TD_REPO }}
|
|
||||||
git reset --hard
|
|
||||||
git clean -f
|
|
||||||
git remote prune origin
|
|
||||||
git fetch
|
|
||||||
git checkout ${{ inputs.target_branch }}
|
|
||||||
git pull >/dev/null
|
|
||||||
git fetch origin +refs/pull/${{ inputs.target_pr_number }}/merge
|
|
||||||
git checkout -qf FETCH_HEAD
|
|
||||||
- name: Check whether the document is changed and set output variables
|
|
||||||
id: set_output
|
|
||||||
run: |
|
|
||||||
set -e
|
|
||||||
cd ${{ env.DOC_WKC }}/${{ env.TD_REPO }}
|
|
||||||
changed_files_zh=$(git --no-pager diff --name-only FETCH_HEAD `git merge-base FETCH_HEAD ${{ inputs.tartget_branch }}`| grep "^docs/zh/" | tr '\n' ' ' || :)
|
|
||||||
changed_files_en=$(git --no-pager diff --name-only FETCH_HEAD `git merge-base FETCH_HEAD ${{ inputs.tartget_branch }}`| grep "^docs/en/" | tr '\n' ' ' || :)
|
|
||||||
changed_files_non_doc=$(git --no-pager diff --name-only FETCH_HEAD `git merge-base FETCH_HEAD ${{ inputs.tartget_branch }}`|grep -v "^docs/en/"|grep -v "^docs/zh/"|grep -v ".md$" | tr '\n' ' ' || :)
|
|
||||||
changed_files_non_tdgpt=$(git --no-pager diff --name-only FETCH_HEAD `git merge-base FETCH_HEAD ${{ inputs.tartget_branch }}`|grep -v "^docs/en/"|grep -v "^docs/zh/"|grep -v ".md$" | grep -Ev "forecastoperator.c|anomalywindowoperator.c|tanalytics.h|tanalytics.c|tdgpt_cases.task|analytics" | tr '\n' ' ' ||:)
|
|
||||||
echo "changed_files_zh=${changed_files_zh}" >> $GITHUB_OUTPUT
|
|
||||||
echo "changed_files_en=${changed_files_en}" >> $GITHUB_OUTPUT
|
|
||||||
echo "changed_files_non_doc=${changed_files_non_doc}" >> $GITHUB_OUTPUT
|
|
||||||
echo "changed_files_non_tdgpt=${changed_files_non_tdgpt}" >> $GITHUB_OUTPUT
|
|
||||||
|
|
||||||
build:
|
|
||||||
needs: check
|
|
||||||
runs-on:
|
runs-on:
|
||||||
group: CI
|
group: CI
|
||||||
labels: [self-hosted, doc-build]
|
labels: [self-hosted, doc-build]
|
||||||
if: ${{ needs.check.outputs.changed_files_zh != '' || needs.check.outputs.changed_files_en != '' }}
|
|
||||||
|
|
||||||
steps:
|
steps:
|
||||||
- name: Get the latest document contents
|
- name: Get the latest document contents
|
||||||
run: |
|
run: |
|
||||||
set -e
|
set -e
|
||||||
#./.github/scripts/update_repo.sh ${{ env.DOC_WKC }}/${{ env.TD_REPO }} ${{ inputs.target_branch }} ${{ inputs.target_pr_number }}
|
|
||||||
cd ${{ env.DOC_WKC }}/${{ env.TD_REPO }}
|
cd ${{ env.DOC_WKC }}/${{ env.TD_REPO }}
|
||||||
git reset --hard
|
git reset --hard
|
||||||
git clean -f
|
git clean -f
|
||||||
git remote prune origin
|
git remote prune origin
|
||||||
git fetch
|
git fetch
|
||||||
git checkout ${{ inputs.target_branch }}
|
git checkout ${{ github.event.pull_request.base.ref }}
|
||||||
git pull >/dev/null
|
git pull >/dev/null
|
||||||
git fetch origin +refs/pull/${{ inputs.target_pr_number }}/merge
|
git fetch origin +refs/pull/${{ github.event.pull_request.number }}/merge
|
||||||
git checkout -qf FETCH_HEAD
|
git checkout -qf FETCH_HEAD
|
||||||
|
|
||||||
- name: Build the chinese document
|
- name: Build the chinese document
|
||||||
if: ${{ needs.check.outputs.changed_files_zh != '' }}
|
|
||||||
run: |
|
run: |
|
||||||
cd ${{ env.DOC_WKC }}/${{ env.ZH_DOC_REPO }}
|
cd ${{ env.DOC_WKC }}/${{ env.ZH_DOC_REPO }}
|
||||||
yarn ass local
|
yarn ass local
|
||||||
yarn build
|
yarn build
|
||||||
|
|
||||||
- name: Build the english document
|
- name: Build the english document
|
||||||
if: ${{ needs.check.outputs.changed_files_en != '' }}
|
|
||||||
run: |
|
run: |
|
||||||
cd ${{ env.DOC_WKC }}/${{ env.EN_DOC_REPO }}
|
cd ${{ env.DOC_WKC }}/${{ env.EN_DOC_REPO }}
|
||||||
yarn ass local
|
yarn ass local
|
||||||
yarn build
|
yarn build
|
||||||
|
|
||||||
outputs:
|
|
||||||
changed_files_zh: ${{ needs.check.outputs.changed_files_zh }}
|
|
||||||
changed_files_en: ${{ needs.check.outputs.changed_files_en }}
|
|
||||||
changed_files_non_doc: ${{ needs.check.outputs.changed_files_non_doc }}
|
|
||||||
changed_files_non_tdgpt: ${{ needs.check.outputs.changed_files_non_tdgpt }}
|
|
||||||
|
|
|
@ -19,7 +19,7 @@
|
||||||
// message process
|
// message process
|
||||||
int32_t tqStreamTaskStartAsync(SStreamMeta* pMeta, SMsgCb* cb, bool restart);
|
int32_t tqStreamTaskStartAsync(SStreamMeta* pMeta, SMsgCb* cb, bool restart);
|
||||||
int32_t tqStreamStartOneTaskAsync(SStreamMeta* pMeta, SMsgCb* cb, int64_t streamId, int32_t taskId);
|
int32_t tqStreamStartOneTaskAsync(SStreamMeta* pMeta, SMsgCb* cb, int64_t streamId, int32_t taskId);
|
||||||
int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pMsg, bool restored);
|
int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pMsg, bool restored, bool isLeader);
|
||||||
int32_t tqStreamTaskProcessDispatchReq(SStreamMeta* pMeta, SRpcMsg* pMsg);
|
int32_t tqStreamTaskProcessDispatchReq(SStreamMeta* pMeta, SRpcMsg* pMsg);
|
||||||
int32_t tqStreamTaskProcessDispatchRsp(SStreamMeta* pMeta, SRpcMsg* pMsg);
|
int32_t tqStreamTaskProcessDispatchRsp(SStreamMeta* pMeta, SRpcMsg* pMsg);
|
||||||
int32_t tqStreamTaskProcessRetrieveReq(SStreamMeta* pMeta, SRpcMsg* pMsg);
|
int32_t tqStreamTaskProcessRetrieveReq(SStreamMeta* pMeta, SRpcMsg* pMsg);
|
||||||
|
|
|
@ -137,6 +137,7 @@ enum {
|
||||||
STREAM_QUEUE__SUCESS = 1,
|
STREAM_QUEUE__SUCESS = 1,
|
||||||
STREAM_QUEUE__FAILED,
|
STREAM_QUEUE__FAILED,
|
||||||
STREAM_QUEUE__PROCESSING,
|
STREAM_QUEUE__PROCESSING,
|
||||||
|
STREAM_QUEUE__CHKPTFAILED,
|
||||||
};
|
};
|
||||||
|
|
||||||
typedef enum EStreamTaskEvent {
|
typedef enum EStreamTaskEvent {
|
||||||
|
|
|
@ -8,18 +8,42 @@ ARG cpuType
|
||||||
RUN echo ${pkgFile} && echo ${dirName}
|
RUN echo ${pkgFile} && echo ${dirName}
|
||||||
|
|
||||||
COPY ${pkgFile} /root/
|
COPY ${pkgFile} /root/
|
||||||
|
|
||||||
ENV TINI_VERSION v0.19.0
|
ENV TINI_VERSION v0.19.0
|
||||||
ADD https://github.com/krallin/tini/releases/download/${TINI_VERSION}/tini-${cpuType} /tini
|
|
||||||
ENV DEBIAN_FRONTEND=noninteractive
|
ENV DEBIAN_FRONTEND=noninteractive
|
||||||
WORKDIR /root/
|
|
||||||
RUN tar -zxf ${pkgFile} && cd /root/${dirName}/ && /bin/bash install.sh -e no && cd /root && rm /root/${pkgFile} && rm -rf /root/${dirName} && apt-get update && apt-get install -y locales tzdata netcat curl gdb vim tmux less net-tools valgrind && locale-gen en_US.UTF-8 && apt-get clean && rm -rf /var/lib/apt/lists/ && chmod +x /tini
|
ADD https://github.com/krallin/tini/releases/download/${TINI_VERSION}/tini-${cpuType} /tini
|
||||||
|
RUN chmod +x /tini
|
||||||
|
|
||||||
|
RUN tar -zxf ${pkgFile} && \
|
||||||
|
/bin/bash /root/${dirName}/install.sh -e no && \
|
||||||
|
rm /root/${pkgFile} && \
|
||||||
|
rm -rf /root/${dirName} && \
|
||||||
|
apt-get update && \
|
||||||
|
apt-get install -y --no-install-recommends \
|
||||||
|
locales \
|
||||||
|
tzdata \
|
||||||
|
netcat \
|
||||||
|
curl \
|
||||||
|
gdb \
|
||||||
|
vim \
|
||||||
|
tmux \
|
||||||
|
less \
|
||||||
|
net-tools \
|
||||||
|
valgrind \
|
||||||
|
rsync && \
|
||||||
|
apt-get clean && \
|
||||||
|
rm -rf /var/lib/apt/lists/* && \
|
||||||
|
locale-gen en_US.UTF-8
|
||||||
|
|
||||||
ENV LD_LIBRARY_PATH="$LD_LIBRARY_PATH:/usr/lib" \
|
ENV LD_LIBRARY_PATH="$LD_LIBRARY_PATH:/usr/lib" \
|
||||||
LC_CTYPE=en_US.UTF-8 \
|
LC_CTYPE=en_US.UTF-8 \
|
||||||
LANG=en_US.UTF-8 \
|
LANG=en_US.UTF-8 \
|
||||||
LC_ALL=en_US.UTF-8
|
LC_ALL=en_US.UTF-8
|
||||||
|
|
||||||
COPY ./bin/* /usr/bin/
|
COPY ./bin/* /usr/bin/
|
||||||
|
|
||||||
ENTRYPOINT ["/tini", "--", "/usr/bin/entrypoint.sh"]
|
ENTRYPOINT ["/tini", "--", "/usr/bin/entrypoint.sh"]
|
||||||
CMD ["taosd"]
|
CMD ["taosd"]
|
||||||
VOLUME [ "/var/lib/taos", "/var/log/taos", "/corefile" ]
|
|
||||||
|
VOLUME [ "/var/lib/taos", "/var/log/taos", "/corefile" ]
|
|
@ -157,7 +157,7 @@ int32_t sndProcessWriteMsg(SSnode *pSnode, SRpcMsg *pMsg, SRpcMsg *pRsp) {
|
||||||
case TDMT_STREAM_TASK_DROP:
|
case TDMT_STREAM_TASK_DROP:
|
||||||
return tqStreamTaskProcessDropReq(pSnode->pMeta, pMsg->pCont, pMsg->contLen);
|
return tqStreamTaskProcessDropReq(pSnode->pMeta, pMsg->pCont, pMsg->contLen);
|
||||||
case TDMT_VND_STREAM_TASK_UPDATE:
|
case TDMT_VND_STREAM_TASK_UPDATE:
|
||||||
return tqStreamTaskProcessUpdateReq(pSnode->pMeta, &pSnode->msgCb, pMsg, true);
|
return tqStreamTaskProcessUpdateReq(pSnode->pMeta, &pSnode->msgCb, pMsg, true, true);
|
||||||
case TDMT_VND_STREAM_TASK_RESET:
|
case TDMT_VND_STREAM_TASK_RESET:
|
||||||
return tqStreamTaskProcessTaskResetReq(pSnode->pMeta, pMsg->pCont);
|
return tqStreamTaskProcessTaskResetReq(pSnode->pMeta, pMsg->pCont);
|
||||||
case TDMT_STREAM_TASK_PAUSE:
|
case TDMT_STREAM_TASK_PAUSE:
|
||||||
|
|
|
@ -1364,7 +1364,8 @@ int32_t tqProcessTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
|
int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
return tqStreamTaskProcessUpdateReq(pTq->pStreamMeta, &pTq->pVnode->msgCb, pMsg, pTq->pVnode->restored);
|
return tqStreamTaskProcessUpdateReq(pTq->pStreamMeta, &pTq->pVnode->msgCb, pMsg,
|
||||||
|
pTq->pVnode->restored, (pTq->pStreamMeta->role == NODE_ROLE_LEADER));
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg) {
|
int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
|
|
|
@ -87,6 +87,8 @@ static void doStartScanWal(void* param, void* tmrId) {
|
||||||
tmr_h pTimer = NULL;
|
tmr_h pTimer = NULL;
|
||||||
SBuildScanWalMsgParam* pParam = (SBuildScanWalMsgParam*)param;
|
SBuildScanWalMsgParam* pParam = (SBuildScanWalMsgParam*)param;
|
||||||
|
|
||||||
|
tqDebug("start to do scan wal in tmr, metaRid:%" PRId64, pParam->metaId);
|
||||||
|
|
||||||
SStreamMeta* pMeta = taosAcquireRef(streamMetaRefPool, pParam->metaId);
|
SStreamMeta* pMeta = taosAcquireRef(streamMetaRefPool, pParam->metaId);
|
||||||
if (pMeta == NULL) {
|
if (pMeta == NULL) {
|
||||||
tqError("metaRid:%" PRId64 " not valid now, stream meta has been freed", pParam->metaId);
|
tqError("metaRid:%" PRId64 " not valid now, stream meta has been freed", pParam->metaId);
|
||||||
|
@ -131,7 +133,7 @@ static void doStartScanWal(void* param, void* tmrId) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pMeta->startInfo.startAllTasks) {
|
if (pMeta->startInfo.startAllTasks) {
|
||||||
tqTrace("vgId:%d in restart procedure, not ready to scan wal", vgId);
|
tqDebug("vgId:%d in restart procedure, not ready to scan wal", vgId);
|
||||||
goto _end;
|
goto _end;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -158,7 +160,7 @@ static void doStartScanWal(void* param, void* tmrId) {
|
||||||
goto _end;
|
goto _end;
|
||||||
}
|
}
|
||||||
|
|
||||||
tqTrace("vgId:%d create msg to start wal scan, numOfTasks:%d", vgId, numOfTasks);
|
tqDebug("vgId:%d create msg to start wal scan, numOfTasks:%d", vgId, numOfTasks);
|
||||||
|
|
||||||
#if 0
|
#if 0
|
||||||
// wait for the vnode is freed, and invalid read may occur.
|
// wait for the vnode is freed, and invalid read may occur.
|
||||||
|
@ -320,9 +322,13 @@ bool taskReadyForDataFromWal(SStreamTask* pTask) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
// check if input queue is full or not
|
// check whether input queue is full or not
|
||||||
if (streamQueueIsFull(pTask->inputq.queue)) {
|
if (streamQueueIsFull(pTask->inputq.queue)) {
|
||||||
tqTrace("s-task:%s input queue is full, do nothing", pTask->id.idStr);
|
tqTrace("s-task:%s input queue is full, launch task without scanning wal", pTask->id.idStr);
|
||||||
|
int32_t code = streamTrySchedExec(pTask);
|
||||||
|
if (code) {
|
||||||
|
tqError("s-task:%s failed to start task while inputQ is full", pTask->id.idStr);
|
||||||
|
}
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -139,7 +139,7 @@ int32_t tqStreamStartOneTaskAsync(SStreamMeta* pMeta, SMsgCb* cb, int64_t stream
|
||||||
}
|
}
|
||||||
|
|
||||||
// this is to process request from transaction, always return true.
|
// this is to process request from transaction, always return true.
|
||||||
int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pMsg, bool restored) {
|
int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pMsg, bool restored, bool isLeader) {
|
||||||
int32_t vgId = pMeta->vgId;
|
int32_t vgId = pMeta->vgId;
|
||||||
char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
|
char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
|
||||||
int32_t len = pMsg->contLen - sizeof(SMsgHead);
|
int32_t len = pMsg->contLen - sizeof(SMsgHead);
|
||||||
|
@ -298,14 +298,19 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM
|
||||||
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
|
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
|
||||||
int32_t updateTasks = taosHashGetSize(pMeta->updateInfo.pTasks);
|
int32_t updateTasks = taosHashGetSize(pMeta->updateInfo.pTasks);
|
||||||
|
|
||||||
if (restored) {
|
if (restored && isLeader) {
|
||||||
tqDebug("vgId:%d s-task:0x%x update epset transId:%d, set the restart flag", vgId, req.taskId, req.transId);
|
tqDebug("vgId:%d s-task:0x%x update epset transId:%d, set the restart flag", vgId, req.taskId, req.transId);
|
||||||
pMeta->startInfo.tasksWillRestart = 1;
|
pMeta->startInfo.tasksWillRestart = 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (updateTasks < numOfTasks) {
|
if (updateTasks < numOfTasks) {
|
||||||
tqDebug("vgId:%d closed tasks:%d, unclosed:%d, all tasks will be started when nodeEp update completed", vgId,
|
if (isLeader) {
|
||||||
updateTasks, (numOfTasks - updateTasks));
|
tqDebug("vgId:%d closed tasks:%d, unclosed:%d, all tasks will be started when nodeEp update completed", vgId,
|
||||||
|
updateTasks, (numOfTasks - updateTasks));
|
||||||
|
} else {
|
||||||
|
tqDebug("vgId:%d closed tasks:%d, unclosed:%d, follower not restart tasks", vgId, updateTasks,
|
||||||
|
(numOfTasks - updateTasks));
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
if ((code = streamMetaCommit(pMeta)) < 0) {
|
if ((code = streamMetaCommit(pMeta)) < 0) {
|
||||||
// always return true
|
// always return true
|
||||||
|
@ -316,17 +321,21 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM
|
||||||
|
|
||||||
streamMetaClearSetUpdateTaskListComplete(pMeta);
|
streamMetaClearSetUpdateTaskListComplete(pMeta);
|
||||||
|
|
||||||
if (!restored) {
|
if (isLeader) {
|
||||||
tqDebug("vgId:%d vnode restore not completed, not start all tasks", vgId);
|
if (!restored) {
|
||||||
} else {
|
tqDebug("vgId:%d vnode restore not completed, not start all tasks", vgId);
|
||||||
tqDebug("vgId:%d all %d task(s) nodeEp updated and closed, transId:%d", vgId, numOfTasks, req.transId);
|
} else {
|
||||||
|
tqDebug("vgId:%d all %d task(s) nodeEp updated and closed, transId:%d", vgId, numOfTasks, req.transId);
|
||||||
#if 0
|
#if 0
|
||||||
taosMSleep(5000);// for test purpose, to trigger the leader election
|
taosMSleep(5000);// for test purpose, to trigger the leader election
|
||||||
#endif
|
#endif
|
||||||
code = tqStreamTaskStartAsync(pMeta, cb, true);
|
code = tqStreamTaskStartAsync(pMeta, cb, true);
|
||||||
if (code) {
|
if (code) {
|
||||||
tqError("vgId:%d async start all tasks, failed, code:%s", vgId, tstrerror(code));
|
tqError("vgId:%d async start all tasks, failed, code:%s", vgId, tstrerror(code));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
tqDebug("vgId:%d follower nodes not restart tasks", vgId);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -7406,6 +7406,52 @@ static SNode* createSetOperProject(const char* pTableAlias, SNode* pNode) {
|
||||||
return (SNode*)pCol;
|
return (SNode*)pCol;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static bool isUionOperator(SNode* pNode) {
|
||||||
|
return QUERY_NODE_SET_OPERATOR == nodeType(pNode) && (((SSetOperator*)pNode)->opType == SET_OP_TYPE_UNION ||
|
||||||
|
((SSetOperator*)pNode)->opType == SET_OP_TYPE_UNION_ALL);
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t pushdownCastForUnion(STranslateContext* pCxt, SNode* pNode, SExprNode* pExpr, int pos) {
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
if (isUionOperator(pNode)) {
|
||||||
|
SSetOperator* pSetOperator = (SSetOperator*)pNode;
|
||||||
|
SNodeList* pLeftProjections = getProjectList(pSetOperator->pLeft);
|
||||||
|
SNodeList* pRightProjections = getProjectList(pSetOperator->pRight);
|
||||||
|
if (LIST_LENGTH(pLeftProjections) != LIST_LENGTH(pRightProjections)) {
|
||||||
|
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INCORRECT_NUM_OF_COL);
|
||||||
|
}
|
||||||
|
|
||||||
|
SNode* pLeft = NULL;
|
||||||
|
SNode* pRight = NULL;
|
||||||
|
int32_t index = 0;
|
||||||
|
FORBOTH(pLeft, pLeftProjections, pRight, pRightProjections) {
|
||||||
|
++index;
|
||||||
|
if (index < pos) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
SNode* pRightFunc = NULL;
|
||||||
|
code = createCastFunc(pCxt, pRight, pExpr->resType, &pRightFunc);
|
||||||
|
if (TSDB_CODE_SUCCESS != code || NULL == pRightFunc) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
REPLACE_LIST2_NODE(pRightFunc);
|
||||||
|
code = pushdownCastForUnion(pCxt, pSetOperator->pRight, (SExprNode*)pRightFunc, index);
|
||||||
|
if (TSDB_CODE_SUCCESS != code ) return code;
|
||||||
|
|
||||||
|
SNode* pLeftFunc = NULL;
|
||||||
|
code = createCastFunc(pCxt, pLeft, pExpr->resType, &pLeftFunc);
|
||||||
|
if (TSDB_CODE_SUCCESS != code || NULL == pLeftFunc) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
REPLACE_LIST1_NODE(pLeftFunc);
|
||||||
|
code = pushdownCastForUnion(pCxt, pSetOperator->pLeft, (SExprNode*)pLeftFunc, index);
|
||||||
|
if (TSDB_CODE_SUCCESS != code ) return code;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t translateSetOperProject(STranslateContext* pCxt, SSetOperator* pSetOperator) {
|
static int32_t translateSetOperProject(STranslateContext* pCxt, SSetOperator* pSetOperator) {
|
||||||
SNodeList* pLeftProjections = getProjectList(pSetOperator->pLeft);
|
SNodeList* pLeftProjections = getProjectList(pSetOperator->pLeft);
|
||||||
SNodeList* pRightProjections = getProjectList(pSetOperator->pRight);
|
SNodeList* pRightProjections = getProjectList(pSetOperator->pRight);
|
||||||
|
@ -7415,9 +7461,11 @@ static int32_t translateSetOperProject(STranslateContext* pCxt, SSetOperator* pS
|
||||||
|
|
||||||
SNode* pLeft = NULL;
|
SNode* pLeft = NULL;
|
||||||
SNode* pRight = NULL;
|
SNode* pRight = NULL;
|
||||||
|
int32_t index = 0;
|
||||||
FORBOTH(pLeft, pLeftProjections, pRight, pRightProjections) {
|
FORBOTH(pLeft, pLeftProjections, pRight, pRightProjections) {
|
||||||
SExprNode* pLeftExpr = (SExprNode*)pLeft;
|
SExprNode* pLeftExpr = (SExprNode*)pLeft;
|
||||||
SExprNode* pRightExpr = (SExprNode*)pRight;
|
SExprNode* pRightExpr = (SExprNode*)pRight;
|
||||||
|
++index;
|
||||||
int32_t comp = dataTypeComp(&pLeftExpr->resType, &pRightExpr->resType);
|
int32_t comp = dataTypeComp(&pLeftExpr->resType, &pRightExpr->resType);
|
||||||
if (comp > 0) {
|
if (comp > 0) {
|
||||||
SNode* pRightFunc = NULL;
|
SNode* pRightFunc = NULL;
|
||||||
|
@ -7427,6 +7475,8 @@ static int32_t translateSetOperProject(STranslateContext* pCxt, SSetOperator* pS
|
||||||
}
|
}
|
||||||
REPLACE_LIST2_NODE(pRightFunc);
|
REPLACE_LIST2_NODE(pRightFunc);
|
||||||
pRightExpr = (SExprNode*)pRightFunc;
|
pRightExpr = (SExprNode*)pRightFunc;
|
||||||
|
code = pushdownCastForUnion(pCxt, pSetOperator->pRight, pRightExpr, index);
|
||||||
|
if (TSDB_CODE_SUCCESS != code ) return code;
|
||||||
} else if (comp < 0) {
|
} else if (comp < 0) {
|
||||||
SNode* pLeftFunc = NULL;
|
SNode* pLeftFunc = NULL;
|
||||||
int32_t code = createCastFunc(pCxt, pLeft, pRightExpr->resType, &pLeftFunc);
|
int32_t code = createCastFunc(pCxt, pLeft, pRightExpr->resType, &pLeftFunc);
|
||||||
|
@ -7439,6 +7489,8 @@ static int32_t translateSetOperProject(STranslateContext* pCxt, SSetOperator* pS
|
||||||
snprintf(pLeftFuncExpr->userAlias, sizeof(pLeftFuncExpr->userAlias), "%s", pLeftExpr->userAlias);
|
snprintf(pLeftFuncExpr->userAlias, sizeof(pLeftFuncExpr->userAlias), "%s", pLeftExpr->userAlias);
|
||||||
pLeft = pLeftFunc;
|
pLeft = pLeftFunc;
|
||||||
pLeftExpr = pLeftFuncExpr;
|
pLeftExpr = pLeftFuncExpr;
|
||||||
|
code = pushdownCastForUnion(pCxt, pSetOperator->pLeft, pLeftExpr, index);
|
||||||
|
if (TSDB_CODE_SUCCESS != code ) return code;
|
||||||
}
|
}
|
||||||
snprintf(pRightExpr->aliasName, sizeof(pRightExpr->aliasName), "%s", pLeftExpr->aliasName);
|
snprintf(pRightExpr->aliasName, sizeof(pRightExpr->aliasName), "%s", pLeftExpr->aliasName);
|
||||||
SNode* pProj = createSetOperProject(pSetOperator->stmtName, pLeft);
|
SNode* pProj = createSetOperProject(pSetOperator->stmtName, pLeft);
|
||||||
|
|
|
@ -144,6 +144,8 @@ struct SStreamQueue {
|
||||||
STaosQall* qall;
|
STaosQall* qall;
|
||||||
void* qItem;
|
void* qItem;
|
||||||
int8_t status;
|
int8_t status;
|
||||||
|
STaosQueue* pChkptQueue;
|
||||||
|
void* qChkptItem;
|
||||||
};
|
};
|
||||||
|
|
||||||
struct SStreamQueueItem {
|
struct SStreamQueueItem {
|
||||||
|
|
|
@ -1098,6 +1098,8 @@ static void chkptReadyMsgSendMonitorFn(void* param, void* tmrId) {
|
||||||
pActiveInfo = pTask->chkInfo.pActiveInfo;
|
pActiveInfo = pTask->chkInfo.pActiveInfo;
|
||||||
pTmrInfo = &pActiveInfo->chkptReadyMsgTmr;
|
pTmrInfo = &pActiveInfo->chkptReadyMsgTmr;
|
||||||
|
|
||||||
|
stDebug("s-task:%s acquire task, refId:%" PRId64, id, taskRefId);
|
||||||
|
|
||||||
// check the status every 100ms
|
// check the status every 100ms
|
||||||
if (streamTaskShouldStop(pTask)) {
|
if (streamTaskShouldStop(pTask)) {
|
||||||
streamCleanBeforeQuitTmr(pTmrInfo, param);
|
streamCleanBeforeQuitTmr(pTmrInfo, param);
|
||||||
|
|
|
@ -915,8 +915,35 @@ bool streamTaskReadyToRun(const SStreamTask* pTask, char** pStatus) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static bool shouldNotCont(SStreamTask* pTask) {
|
||||||
|
int32_t level = pTask->info.taskLevel;
|
||||||
|
SStreamQueue* pQueue = pTask->inputq.queue;
|
||||||
|
ETaskStatus status = streamTaskGetStatus(pTask).state;
|
||||||
|
|
||||||
|
// 1. task should jump out
|
||||||
|
bool quit = (status == TASK_STATUS__STOP) || (status == TASK_STATUS__PAUSE) || (status == TASK_STATUS__DROPPING);
|
||||||
|
|
||||||
|
// 2. checkpoint procedure, the source task's checkpoint queue is empty, not read from ordinary queue
|
||||||
|
bool emptyCkQueue = (taosQueueItemSize(pQueue->pChkptQueue) == 0);
|
||||||
|
|
||||||
|
// 3. no data in ordinary queue
|
||||||
|
bool emptyBlockQueue = (streamQueueGetNumOfItems(pQueue) == 0);
|
||||||
|
|
||||||
|
if (quit) {
|
||||||
|
return true;
|
||||||
|
} else {
|
||||||
|
if (status == TASK_STATUS__CK && level == TASK_LEVEL__SOURCE) {
|
||||||
|
// in checkpoint procedure, we only check whether the controller queue is empty or not
|
||||||
|
return emptyCkQueue;
|
||||||
|
} else { // otherwise, if the block queue is empty, not continue.
|
||||||
|
return emptyBlockQueue && emptyCkQueue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
int32_t streamResumeTask(SStreamTask* pTask) {
|
int32_t streamResumeTask(SStreamTask* pTask) {
|
||||||
const char* id = pTask->id.idStr;
|
const char* id = pTask->id.idStr;
|
||||||
|
int32_t level = pTask->info.taskLevel;
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
if (pTask->status.schedStatus != TASK_SCHED_STATUS__ACTIVE) {
|
if (pTask->status.schedStatus != TASK_SCHED_STATUS__ACTIVE) {
|
||||||
|
@ -929,11 +956,10 @@ int32_t streamResumeTask(SStreamTask* pTask) {
|
||||||
if (code) {
|
if (code) {
|
||||||
stError("s-task:%s failed to exec stream task, code:%s, continue", id, tstrerror(code));
|
stError("s-task:%s failed to exec stream task, code:%s, continue", id, tstrerror(code));
|
||||||
}
|
}
|
||||||
// check if continue
|
|
||||||
streamMutexLock(&pTask->lock);
|
streamMutexLock(&pTask->lock);
|
||||||
|
|
||||||
int32_t numOfItems = streamQueueGetNumOfItems(pTask->inputq.queue);
|
if (shouldNotCont(pTask)) {
|
||||||
if ((numOfItems == 0) || streamTaskShouldStop(pTask) || streamTaskShouldPause(pTask)) {
|
|
||||||
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
|
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
|
||||||
streamTaskClearSchedIdleInfo(pTask);
|
streamTaskClearSchedIdleInfo(pTask);
|
||||||
streamMutexUnlock(&pTask->lock);
|
streamMutexUnlock(&pTask->lock);
|
||||||
|
|
|
@ -32,11 +32,12 @@ typedef struct SQueueReader {
|
||||||
static bool streamTaskExtractAvailableToken(STokenBucket* pBucket, const char* id);
|
static bool streamTaskExtractAvailableToken(STokenBucket* pBucket, const char* id);
|
||||||
static void streamTaskPutbackToken(STokenBucket* pBucket);
|
static void streamTaskPutbackToken(STokenBucket* pBucket);
|
||||||
static void streamTaskConsumeQuota(STokenBucket* pBucket, int32_t bytes);
|
static void streamTaskConsumeQuota(STokenBucket* pBucket, int32_t bytes);
|
||||||
|
static void streamQueueNextItemInSourceQ(SStreamQueue* pQueue, SStreamQueueItem** pItem, ETaskStatus status, const char* id);
|
||||||
|
|
||||||
static void streamQueueCleanup(SStreamQueue* pQueue) {
|
static void streamQueueCleanup(SStreamQueue* pQueue) {
|
||||||
SStreamQueueItem* qItem = NULL;
|
SStreamQueueItem* qItem = NULL;
|
||||||
while (1) {
|
while (1) {
|
||||||
streamQueueNextItem(pQueue, &qItem);
|
streamQueueNextItemInSourceQ(pQueue, &qItem, TASK_STATUS__READY, NULL);
|
||||||
if (qItem == NULL) {
|
if (qItem == NULL) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -47,7 +48,9 @@ static void streamQueueCleanup(SStreamQueue* pQueue) {
|
||||||
|
|
||||||
int32_t streamQueueOpen(int64_t cap, SStreamQueue** pQ) {
|
int32_t streamQueueOpen(int64_t cap, SStreamQueue** pQ) {
|
||||||
*pQ = NULL;
|
*pQ = NULL;
|
||||||
|
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
int32_t lino = 0;
|
||||||
|
|
||||||
SStreamQueue* pQueue = taosMemoryCalloc(1, sizeof(SStreamQueue));
|
SStreamQueue* pQueue = taosMemoryCalloc(1, sizeof(SStreamQueue));
|
||||||
if (pQueue == NULL) {
|
if (pQueue == NULL) {
|
||||||
|
@ -55,24 +58,26 @@ int32_t streamQueueOpen(int64_t cap, SStreamQueue** pQ) {
|
||||||
}
|
}
|
||||||
|
|
||||||
code = taosOpenQueue(&pQueue->pQueue);
|
code = taosOpenQueue(&pQueue->pQueue);
|
||||||
if (code) {
|
TSDB_CHECK_CODE(code, lino, _error);
|
||||||
taosMemoryFreeClear(pQueue);
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
code = taosAllocateQall(&pQueue->qall);
|
code = taosAllocateQall(&pQueue->qall);
|
||||||
if (code) {
|
TSDB_CHECK_CODE(code, lino, _error);
|
||||||
taosCloseQueue(pQueue->pQueue);
|
|
||||||
taosMemoryFree(pQueue);
|
code = taosOpenQueue(&pQueue->pChkptQueue);
|
||||||
return code;
|
TSDB_CHECK_CODE(code, lino, _error);
|
||||||
}
|
|
||||||
|
|
||||||
pQueue->status = STREAM_QUEUE__SUCESS;
|
pQueue->status = STREAM_QUEUE__SUCESS;
|
||||||
|
|
||||||
taosSetQueueCapacity(pQueue->pQueue, cap);
|
taosSetQueueCapacity(pQueue->pQueue, cap);
|
||||||
taosSetQueueMemoryCapacity(pQueue->pQueue, cap * 1024);
|
taosSetQueueMemoryCapacity(pQueue->pQueue, cap * 1024);
|
||||||
|
|
||||||
*pQ = pQueue;
|
*pQ = pQueue;
|
||||||
return code;
|
return code;
|
||||||
|
|
||||||
|
_error:
|
||||||
|
streamQueueClose(pQueue, 0);
|
||||||
|
stError("failed to open stream queue at line:%d, code:%s", lino, tstrerror(code));
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
void streamQueueClose(SStreamQueue* pQueue, int32_t taskId) {
|
void streamQueueClose(SStreamQueue* pQueue, int32_t taskId) {
|
||||||
|
@ -82,6 +87,11 @@ void streamQueueClose(SStreamQueue* pQueue, int32_t taskId) {
|
||||||
|
|
||||||
taosFreeQall(pQueue->qall);
|
taosFreeQall(pQueue->qall);
|
||||||
taosCloseQueue(pQueue->pQueue);
|
taosCloseQueue(pQueue->pQueue);
|
||||||
|
pQueue->pQueue = NULL;
|
||||||
|
|
||||||
|
taosCloseQueue(pQueue->pChkptQueue);
|
||||||
|
pQueue->pChkptQueue = NULL;
|
||||||
|
|
||||||
taosMemoryFree(pQueue);
|
taosMemoryFree(pQueue);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -94,6 +104,7 @@ void streamQueueNextItem(SStreamQueue* pQueue, SStreamQueueItem** pItem) {
|
||||||
} else {
|
} else {
|
||||||
pQueue->qItem = NULL;
|
pQueue->qItem = NULL;
|
||||||
(void) taosGetQitem(pQueue->qall, &pQueue->qItem);
|
(void) taosGetQitem(pQueue->qall, &pQueue->qItem);
|
||||||
|
|
||||||
if (pQueue->qItem == NULL) {
|
if (pQueue->qItem == NULL) {
|
||||||
(void) taosReadAllQitems(pQueue->pQueue, pQueue->qall);
|
(void) taosReadAllQitems(pQueue->pQueue, pQueue->qall);
|
||||||
(void) taosGetQitem(pQueue->qall, &pQueue->qItem);
|
(void) taosGetQitem(pQueue->qall, &pQueue->qItem);
|
||||||
|
@ -103,6 +114,56 @@ void streamQueueNextItem(SStreamQueue* pQueue, SStreamQueueItem** pItem) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void streamQueueNextItemInSourceQ(SStreamQueue* pQueue, SStreamQueueItem** pItem, ETaskStatus status, const char* id) {
|
||||||
|
*pItem = NULL;
|
||||||
|
int8_t flag = atomic_exchange_8(&pQueue->status, STREAM_QUEUE__PROCESSING);
|
||||||
|
|
||||||
|
if (flag == STREAM_QUEUE__CHKPTFAILED) {
|
||||||
|
*pItem = pQueue->qChkptItem;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (flag == STREAM_QUEUE__FAILED) {
|
||||||
|
*pItem = pQueue->qItem;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
pQueue->qChkptItem = NULL;
|
||||||
|
taosReadQitem(pQueue->pChkptQueue, (void**)&pQueue->qChkptItem);
|
||||||
|
if (pQueue->qChkptItem != NULL) {
|
||||||
|
stDebug("s-task:%s read data from checkpoint queue, status:%d", id, status);
|
||||||
|
*pItem = pQueue->qChkptItem;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// if in checkpoint status, not read data from ordinary input q.
|
||||||
|
if (status == TASK_STATUS__CK) {
|
||||||
|
stDebug("s-task:%s in checkpoint status, not read data in block queue, status:%d", id, status);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// let's try the ordinary input q
|
||||||
|
pQueue->qItem = NULL;
|
||||||
|
int32_t code = taosGetQitem(pQueue->qall, &pQueue->qItem);
|
||||||
|
if (code) {
|
||||||
|
stError("s-task:%s failed to get item in inputq, code:%s", id, tstrerror(code));
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pQueue->qItem == NULL) {
|
||||||
|
code = taosReadAllQitems(pQueue->pQueue, pQueue->qall);
|
||||||
|
if (code) {
|
||||||
|
stError("s-task:%s failed to get all items in inputq, code:%s", id, tstrerror(code));
|
||||||
|
}
|
||||||
|
|
||||||
|
code = taosGetQitem(pQueue->qall, &pQueue->qItem);
|
||||||
|
if (code) {
|
||||||
|
stError("s-task:%s failed to get item in inputq, code:%s", id, tstrerror(code));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
*pItem = streamQueueCurItem(pQueue);
|
||||||
|
}
|
||||||
|
|
||||||
void streamQueueProcessSuccess(SStreamQueue* queue) {
|
void streamQueueProcessSuccess(SStreamQueue* queue) {
|
||||||
if (atomic_load_8(&queue->status) != STREAM_QUEUE__PROCESSING) {
|
if (atomic_load_8(&queue->status) != STREAM_QUEUE__PROCESSING) {
|
||||||
stError("invalid queue status:%d, expect:%d", atomic_load_8(&queue->status), STREAM_QUEUE__PROCESSING);
|
stError("invalid queue status:%d, expect:%d", atomic_load_8(&queue->status), STREAM_QUEUE__PROCESSING);
|
||||||
|
@ -110,6 +171,7 @@ void streamQueueProcessSuccess(SStreamQueue* queue) {
|
||||||
}
|
}
|
||||||
|
|
||||||
queue->qItem = NULL;
|
queue->qItem = NULL;
|
||||||
|
queue->qChkptItem = NULL;
|
||||||
atomic_store_8(&queue->status, STREAM_QUEUE__SUCESS);
|
atomic_store_8(&queue->status, STREAM_QUEUE__SUCESS);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -121,6 +183,14 @@ void streamQueueProcessFail(SStreamQueue* queue) {
|
||||||
atomic_store_8(&queue->status, STREAM_QUEUE__FAILED);
|
atomic_store_8(&queue->status, STREAM_QUEUE__FAILED);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void streamQueueGetSourceChkptFailed(SStreamQueue* pQueue) {
|
||||||
|
if (atomic_load_8(&pQueue->status) != STREAM_QUEUE__PROCESSING) {
|
||||||
|
stError("invalid queue status:%d, expect:%d", atomic_load_8(&pQueue->status), STREAM_QUEUE__PROCESSING);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
atomic_store_8(&pQueue->status, STREAM_QUEUE__CHKPTFAILED);
|
||||||
|
}
|
||||||
|
|
||||||
bool streamQueueIsFull(const SStreamQueue* pQueue) {
|
bool streamQueueIsFull(const SStreamQueue* pQueue) {
|
||||||
int32_t numOfItems = streamQueueGetNumOfItems(pQueue);
|
int32_t numOfItems = streamQueueGetNumOfItems(pQueue);
|
||||||
if (numOfItems >= STREAM_TASK_QUEUE_CAPACITY) {
|
if (numOfItems >= STREAM_TASK_QUEUE_CAPACITY) {
|
||||||
|
@ -175,8 +245,9 @@ const char* streamQueueItemGetTypeStr(int32_t type) {
|
||||||
|
|
||||||
EExtractDataCode streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks,
|
EExtractDataCode streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks,
|
||||||
int32_t* blockSize) {
|
int32_t* blockSize) {
|
||||||
const char* id = pTask->id.idStr;
|
const char* id = pTask->id.idStr;
|
||||||
int32_t taskLevel = pTask->info.taskLevel;
|
int32_t taskLevel = pTask->info.taskLevel;
|
||||||
|
SStreamQueue* pQueue = pTask->inputq.queue;
|
||||||
|
|
||||||
*pInput = NULL;
|
*pInput = NULL;
|
||||||
*numOfBlocks = 0;
|
*numOfBlocks = 0;
|
||||||
|
@ -189,13 +260,19 @@ EExtractDataCode streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueIte
|
||||||
}
|
}
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
if (streamTaskShouldPause(pTask) || streamTaskShouldStop(pTask)) {
|
ETaskStatus status = streamTaskGetStatus(pTask).state;
|
||||||
stDebug("s-task:%s task should pause, extract input blocks:%d", id, *numOfBlocks);
|
if (status == TASK_STATUS__PAUSE || status == TASK_STATUS__STOP) {
|
||||||
|
stDebug("s-task:%s task should pause/stop, extract input blocks:%d", id, *numOfBlocks);
|
||||||
return EXEC_CONTINUE;
|
return EXEC_CONTINUE;
|
||||||
}
|
}
|
||||||
|
|
||||||
SStreamQueueItem* qItem = NULL;
|
SStreamQueueItem* qItem = NULL;
|
||||||
streamQueueNextItem(pTask->inputq.queue, (SStreamQueueItem**)&qItem);
|
if (taskLevel == TASK_LEVEL__SOURCE) {
|
||||||
|
streamQueueNextItemInSourceQ(pQueue, &qItem, status, id);
|
||||||
|
} else {
|
||||||
|
streamQueueNextItem(pQueue, &qItem);
|
||||||
|
}
|
||||||
|
|
||||||
if (qItem == NULL) {
|
if (qItem == NULL) {
|
||||||
// restore the token to bucket
|
// restore the token to bucket
|
||||||
if (*numOfBlocks > 0) {
|
if (*numOfBlocks > 0) {
|
||||||
|
@ -225,14 +302,19 @@ EExtractDataCode streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueIte
|
||||||
*numOfBlocks = 1;
|
*numOfBlocks = 1;
|
||||||
*pInput = qItem;
|
*pInput = qItem;
|
||||||
return EXEC_CONTINUE;
|
return EXEC_CONTINUE;
|
||||||
} else { // previous existed blocks needs to be handle, before handle the checkpoint msg block
|
} else { // previous existed blocks needs to be handled, before handle the checkpoint msg block
|
||||||
stDebug("s-task:%s %s msg extracted, handle previous blocks, numOfBlocks:%d", id, p, *numOfBlocks);
|
stDebug("s-task:%s %s msg extracted, handle previous blocks, numOfBlocks:%d", id, p, *numOfBlocks);
|
||||||
*blockSize = streamQueueItemGetSize(*pInput);
|
*blockSize = streamQueueItemGetSize(*pInput);
|
||||||
if (taskLevel == TASK_LEVEL__SINK) {
|
if (taskLevel == TASK_LEVEL__SINK) {
|
||||||
streamTaskConsumeQuota(pTask->outputInfo.pTokenBucket, *blockSize);
|
streamTaskConsumeQuota(pTask->outputInfo.pTokenBucket, *blockSize);
|
||||||
}
|
}
|
||||||
|
|
||||||
streamQueueProcessFail(pTask->inputq.queue);
|
if ((type == STREAM_INPUT__CHECKPOINT_TRIGGER || type == STREAM_INPUT__CHECKPOINT) &&
|
||||||
|
(taskLevel == TASK_LEVEL__SOURCE)) {
|
||||||
|
streamQueueGetSourceChkptFailed(pQueue);
|
||||||
|
} else {
|
||||||
|
streamQueueProcessFail(pQueue);
|
||||||
|
}
|
||||||
return EXEC_CONTINUE;
|
return EXEC_CONTINUE;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
@ -252,7 +334,7 @@ EExtractDataCode streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueIte
|
||||||
streamTaskConsumeQuota(pTask->outputInfo.pTokenBucket, *blockSize);
|
streamTaskConsumeQuota(pTask->outputInfo.pTokenBucket, *blockSize);
|
||||||
}
|
}
|
||||||
|
|
||||||
streamQueueProcessFail(pTask->inputq.queue);
|
streamQueueProcessFail(pQueue);
|
||||||
return EXEC_CONTINUE;
|
return EXEC_CONTINUE;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -260,7 +342,7 @@ EExtractDataCode streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueIte
|
||||||
}
|
}
|
||||||
|
|
||||||
*numOfBlocks += 1;
|
*numOfBlocks += 1;
|
||||||
streamQueueProcessSuccess(pTask->inputq.queue);
|
streamQueueProcessSuccess(pQueue);
|
||||||
|
|
||||||
if (*numOfBlocks >= MAX_STREAM_EXEC_BATCH_NUM) {
|
if (*numOfBlocks >= MAX_STREAM_EXEC_BATCH_NUM) {
|
||||||
stDebug("s-task:%s batch size limit:%d reached, start to process blocks", id, MAX_STREAM_EXEC_BATCH_NUM);
|
stDebug("s-task:%s batch size limit:%d reached, start to process blocks", id, MAX_STREAM_EXEC_BATCH_NUM);
|
||||||
|
@ -279,6 +361,7 @@ EExtractDataCode streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueIte
|
||||||
int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem) {
|
int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem) {
|
||||||
int8_t type = pItem->type;
|
int8_t type = pItem->type;
|
||||||
STaosQueue* pQueue = pTask->inputq.queue->pQueue;
|
STaosQueue* pQueue = pTask->inputq.queue->pQueue;
|
||||||
|
int32_t level = pTask->info.taskLevel;
|
||||||
int32_t total = streamQueueGetNumOfItems(pTask->inputq.queue) + 1;
|
int32_t total = streamQueueGetNumOfItems(pTask->inputq.queue) + 1;
|
||||||
|
|
||||||
if (type == STREAM_INPUT__DATA_SUBMIT) {
|
if (type == STREAM_INPUT__DATA_SUBMIT) {
|
||||||
|
@ -326,15 +409,28 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem)
|
||||||
stDebug("s-task:%s blockdata enqueue, total in queue:%d, size:%.2fMiB", pTask->id.idStr, total, size);
|
stDebug("s-task:%s blockdata enqueue, total in queue:%d, size:%.2fMiB", pTask->id.idStr, total, size);
|
||||||
} else if (type == STREAM_INPUT__CHECKPOINT || type == STREAM_INPUT__CHECKPOINT_TRIGGER ||
|
} else if (type == STREAM_INPUT__CHECKPOINT || type == STREAM_INPUT__CHECKPOINT_TRIGGER ||
|
||||||
type == STREAM_INPUT__TRANS_STATE || type == STREAM_INPUT__DATA_RETRIEVE) {
|
type == STREAM_INPUT__TRANS_STATE || type == STREAM_INPUT__DATA_RETRIEVE) {
|
||||||
int32_t code = taosWriteQitem(pQueue, pItem);
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
|
||||||
streamFreeQitem(pItem);
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue));
|
int32_t code = 0;
|
||||||
stDebug("s-task:%s level:%d %s blockdata enqueue, total in queue:%d, size:%.2fMiB", pTask->id.idStr,
|
if ((type == STREAM_INPUT__CHECKPOINT_TRIGGER || type == STREAM_INPUT__CHECKPOINT) && (level == TASK_LEVEL__SOURCE)) {
|
||||||
pTask->info.taskLevel, streamQueueItemGetTypeStr(type), total, size);
|
STaosQueue* pChkptQ = pTask->inputq.queue->pChkptQueue;
|
||||||
|
code = taosWriteQitem(pChkptQ, pItem);
|
||||||
|
|
||||||
|
double size = SIZE_IN_MiB(taosQueueMemorySize(pChkptQ));
|
||||||
|
int32_t num = taosQueueItemSize(pChkptQ);
|
||||||
|
|
||||||
|
stDebug("s-task:%s level:%d %s checkpoint enqueue ctrl queue, total in queue:%d, size:%.2fMiB, data queue:%d",
|
||||||
|
pTask->id.idStr, pTask->info.taskLevel, streamQueueItemGetTypeStr(type), num, size, (total - 1));
|
||||||
|
} else {
|
||||||
|
code = taosWriteQitem(pQueue, pItem);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
streamFreeQitem(pItem);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue));
|
||||||
|
stDebug("s-task:%s level:%d %s blockdata enqueue, total in queue:%d, size:%.2fMiB", pTask->id.idStr,
|
||||||
|
pTask->info.taskLevel, streamQueueItemGetTypeStr(type), total, size);
|
||||||
|
}
|
||||||
} else if (type == STREAM_INPUT__GET_RES) {
|
} else if (type == STREAM_INPUT__GET_RES) {
|
||||||
// use the default memory limit, refactor later.
|
// use the default memory limit, refactor later.
|
||||||
int32_t code = taosWriteQitem(pQueue, pItem);
|
int32_t code = taosWriteQitem(pQueue, pItem);
|
||||||
|
|
|
@ -325,12 +325,97 @@ class TDTestCase(TBase):
|
||||||
tdSql.query("select * from t1 where ts > '2025-01-01 00:00:00';")
|
tdSql.query("select * from t1 where ts > '2025-01-01 00:00:00';")
|
||||||
tdSql.checkRows(0)
|
tdSql.checkRows(0)
|
||||||
|
|
||||||
|
def FIX_TS_6058(self):
|
||||||
|
tdSql.execute("create database iot_60j_production_eqp;")
|
||||||
|
tdSql.execute("create table iot_60j_production_eqp.realtime_data_collections (device_time TIMESTAMP, item_value VARCHAR(64), \
|
||||||
|
upload_time TIMESTAMP) tags(bu_id VARCHAR(64), district_id VARCHAR(64), factory_id VARCHAR(64), production_line_id VARCHAR(64), \
|
||||||
|
production_processes_id VARCHAR(64), work_center_id VARCHAR(64), station_id VARCHAR(64), device_name VARCHAR(64), item_name VARCHAR(64));")
|
||||||
|
|
||||||
|
sub1 = " SELECT '实际速度' as name, 0 as rank, '当月' as cycle,\
|
||||||
|
CASE \
|
||||||
|
WHEN COUNT(item_value) = 0 THEN NULL\
|
||||||
|
ELSE AVG(CAST(item_value AS double))\
|
||||||
|
END AS item_value\
|
||||||
|
FROM iot_60j_production_eqp.realtime_data_collections\
|
||||||
|
WHERE device_time >= TO_TIMESTAMP(CONCAT(substring(TO_CHAR(today ,'YYYY-MM-dd'), 1,7), '-01 00:00:00'), 'YYYY-mm-dd')\
|
||||||
|
AND item_name = 'Premixer_SpindleMotor_ActualSpeed' "
|
||||||
|
|
||||||
|
sub2 = " SELECT '实际速度' as name, 3 as rank, TO_CHAR(TODAY(),'YYYY-MM-dd') as cycle,\
|
||||||
|
CASE \
|
||||||
|
WHEN COUNT(item_value) = 0 THEN NULL\
|
||||||
|
ELSE AVG(CAST(item_value AS double))\
|
||||||
|
END AS item_value\
|
||||||
|
FROM iot_60j_production_eqp.realtime_data_collections\
|
||||||
|
WHERE device_time >= TODAY()-1d and device_time <= now()\
|
||||||
|
AND item_name = 'Premixer_SpindleMotor_ActualSpeed' "
|
||||||
|
|
||||||
|
sub3 = " SELECT '设定速度' as name, 1 as rank, CAST(CONCAT('WEEK-',CAST(WEEKOFYEAR(TODAY()-1w) as VARCHAR)) as VARCHAR) as cycle,\
|
||||||
|
CASE \
|
||||||
|
WHEN COUNT(item_value) = 0 THEN NULL\
|
||||||
|
ELSE AVG(CAST(item_value AS double))\
|
||||||
|
END AS item_value\
|
||||||
|
FROM iot_60j_production_eqp.realtime_data_collections\
|
||||||
|
where \
|
||||||
|
item_name = 'Premixer_SpindleMotor_SettingSpeed'\
|
||||||
|
AND (\
|
||||||
|
(WEEKDAY(now) = 0 AND device_time >= today()-8d and device_time <= today()-1d) OR\
|
||||||
|
(WEEKDAY(now) = 1 AND device_time >= today()-9d and device_time <= today()-2d) OR\
|
||||||
|
(WEEKDAY(now) = 2 AND device_time >= today()-10d and device_time <= today()-3d) OR\
|
||||||
|
(WEEKDAY(now) = 3 AND device_time >= today()-11d and device_time <= today()-4d) OR\
|
||||||
|
(WEEKDAY(now) = 4 AND device_time >= today()-12d and device_time <= today()-5d) OR\
|
||||||
|
(WEEKDAY(now) = 5 AND device_time >= today()-13d and device_time <= today()-6d) OR\
|
||||||
|
(WEEKDAY(now) = 6 AND device_time >= today()-14d and device_time <= today()-7d)\
|
||||||
|
) "
|
||||||
|
|
||||||
|
sub4 = " SELECT '设定速度2' as name, 1 as rank, CAST(CONCAT('WEEK-',CAST(WEEKOFYEAR(TODAY()-1w) as VARCHAR)) as VARCHAR(5000)) as cycle,\
|
||||||
|
CASE \
|
||||||
|
WHEN COUNT(item_value) = 0 THEN NULL\
|
||||||
|
ELSE AVG(CAST(item_value AS double))\
|
||||||
|
END AS item_value\
|
||||||
|
FROM iot_60j_production_eqp.realtime_data_collections\
|
||||||
|
where \
|
||||||
|
item_name = 'Premixer_SpindleMotor_SettingSpeed'\
|
||||||
|
AND (\
|
||||||
|
(WEEKDAY(now) = 0 AND device_time >= today()-8d and device_time <= today()-1d) OR\
|
||||||
|
(WEEKDAY(now) = 1 AND device_time >= today()-9d and device_time <= today()-2d) OR\
|
||||||
|
(WEEKDAY(now) = 2 AND device_time >= today()-10d and device_time <= today()-3d) OR\
|
||||||
|
(WEEKDAY(now) = 3 AND device_time >= today()-11d and device_time <= today()-4d) OR\
|
||||||
|
(WEEKDAY(now) = 4 AND device_time >= today()-12d and device_time <= today()-5d) OR\
|
||||||
|
(WEEKDAY(now) = 5 AND device_time >= today()-13d and device_time <= today()-6d) OR\
|
||||||
|
(WEEKDAY(now) = 6 AND device_time >= today()-14d and device_time <= today()-7d)\
|
||||||
|
) "
|
||||||
|
for uiontype in ["union" ,"union all"]:
|
||||||
|
repeatLines = 1
|
||||||
|
if uiontype == "union":
|
||||||
|
repeatLines = 0
|
||||||
|
for i in range(1, 10):
|
||||||
|
tdLog.debug(f"test: realtime_data_collections {i} times...")
|
||||||
|
tdSql.query(f"select name,cycle,item_value from ( {sub1} {uiontype} {sub2} {uiontype} {sub3}) order by rank,name,cycle;", queryTimes = 1)
|
||||||
|
tdSql.checkRows(3)
|
||||||
|
tdSql.query(f"select name,cycle,item_value from ( {sub1} {uiontype} {sub2} {uiontype} {sub4}) order by rank,name,cycle;", queryTimes = 1)
|
||||||
|
tdSql.checkRows(3)
|
||||||
|
tdSql.query(f"select name,cycle,item_value from ( {sub3} {uiontype} {sub2} {uiontype} {sub1}) order by rank,name,cycle;", queryTimes = 1)
|
||||||
|
tdSql.checkRows(3)
|
||||||
|
tdSql.query(f"select name,cycle,item_value from ( {sub3} {uiontype} {sub2} {uiontype} {sub1}) order by rank,name,cycle;", queryTimes = 1)
|
||||||
|
tdSql.checkRows(3)
|
||||||
|
tdSql.query(f"select name,cycle,item_value from ( {sub2} {uiontype} {sub4} {uiontype} {sub1}) order by rank,name,cycle;", queryTimes = 1)
|
||||||
|
tdSql.checkRows(3)
|
||||||
|
tdSql.query(f"select name,cycle,item_value from ( {sub3} {uiontype} {sub2} {uiontype} {sub1} {uiontype} {sub4}) order by rank,name,cycle;", queryTimes = 1)
|
||||||
|
tdSql.checkRows(4)
|
||||||
|
tdSql.query(f"select name,cycle,item_value from ( {sub2} {uiontype} {sub3} {uiontype} {sub1} {uiontype} {sub4}) order by rank,name,cycle;", queryTimes = 1)
|
||||||
|
tdSql.checkRows(4)
|
||||||
|
tdSql.query(f"select name,cycle,item_value from ( {sub3} {uiontype} {sub4} {uiontype} {sub1} {uiontype} {sub2}) order by rank,name,cycle;", queryTimes = 1)
|
||||||
|
tdSql.checkRows(4)
|
||||||
|
tdSql.query(f"select name,cycle,item_value from ( {sub3} {uiontype} {sub4} {uiontype} {sub1} {uiontype} {sub2} {uiontype} {sub4}) order by rank,name,cycle;", queryTimes = 1)
|
||||||
|
tdSql.checkRows(4 + repeatLines)
|
||||||
|
tdSql.query(f"select name,cycle,item_value from ( {sub3} {uiontype} {sub2} {uiontype} {sub1} {uiontype} {sub2} {uiontype} {sub4}) order by rank,name,cycle;", queryTimes = 1)
|
||||||
|
tdSql.checkRows(4 + repeatLines)
|
||||||
|
|
||||||
# run
|
# run
|
||||||
def run(self):
|
def run(self):
|
||||||
tdLog.debug(f"start to excute {__file__}")
|
tdLog.debug(f"start to excute {__file__}")
|
||||||
|
|
||||||
self.ts5946()
|
self.ts5946()
|
||||||
|
|
||||||
# TD BUGS
|
# TD BUGS
|
||||||
self.FIX_TD_30686()
|
self.FIX_TD_30686()
|
||||||
self.FIX_TD_31684()
|
self.FIX_TD_31684()
|
||||||
|
@ -340,6 +425,7 @@ class TDTestCase(TBase):
|
||||||
self.FIX_TS_5143()
|
self.FIX_TS_5143()
|
||||||
self.FIX_TS_5239()
|
self.FIX_TS_5239()
|
||||||
self.FIX_TS_5984()
|
self.FIX_TS_5984()
|
||||||
|
self.FIX_TS_6058()
|
||||||
|
|
||||||
tdLog.success(f"{__file__} successfully executed")
|
tdLog.success(f"{__file__} successfully executed")
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue