From 047ba9ae85928e926d873f4f42f784d966324563 Mon Sep 17 00:00:00 2001 From: chenhaoran Date: Wed, 26 Feb 2025 20:40:41 +0800 Subject: [PATCH 01/21] feat: add rsync to Dockerfile --- packaging/docker/Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packaging/docker/Dockerfile b/packaging/docker/Dockerfile index a67724d5a8..e2ba30bca6 100644 --- a/packaging/docker/Dockerfile +++ b/packaging/docker/Dockerfile @@ -12,7 +12,7 @@ ENV TINI_VERSION v0.19.0 ADD https://github.com/krallin/tini/releases/download/${TINI_VERSION}/tini-${cpuType} /tini ENV DEBIAN_FRONTEND=noninteractive WORKDIR /root/ -RUN tar -zxf ${pkgFile} && cd /root/${dirName}/ && /bin/bash install.sh -e no && cd /root && rm /root/${pkgFile} && rm -rf /root/${dirName} && apt-get update && apt-get install -y locales tzdata netcat curl gdb vim tmux less net-tools valgrind && locale-gen en_US.UTF-8 && apt-get clean && rm -rf /var/lib/apt/lists/ && chmod +x /tini +RUN tar -zxf ${pkgFile} && cd /root/${dirName}/ && /bin/bash install.sh -e no && cd /root && rm /root/${pkgFile} && rm -rf /root/${dirName} && apt-get update && apt-get install -y locales tzdata netcat curl gdb vim tmux less net-tools valgrind rsync && locale-gen en_US.UTF-8 && apt-get clean && rm -rf /var/lib/apt/lists/ && chmod +x /tini ENV LD_LIBRARY_PATH="$LD_LIBRARY_PATH:/usr/lib" \ LC_CTYPE=en_US.UTF-8 \ From 73e8720b0a8cbae2c90fb5874019dcfaca6ec493 Mon Sep 17 00:00:00 2001 From: WANG Xu Date: Wed, 26 Feb 2025 21:09:16 +0800 Subject: [PATCH 02/21] refactor: break RUN command into multiple lines Signed-off-by: WANG Xu --- packaging/docker/Dockerfile | 32 ++++++++++++++++++++++++++++---- 1 file changed, 28 insertions(+), 4 deletions(-) diff --git a/packaging/docker/Dockerfile b/packaging/docker/Dockerfile index e2ba30bca6..c0aadac376 100644 --- a/packaging/docker/Dockerfile +++ b/packaging/docker/Dockerfile @@ -8,18 +8,42 @@ ARG cpuType RUN echo ${pkgFile} && echo ${dirName} COPY ${pkgFile} /root/ + ENV TINI_VERSION v0.19.0 -ADD https://github.com/krallin/tini/releases/download/${TINI_VERSION}/tini-${cpuType} /tini ENV DEBIAN_FRONTEND=noninteractive -WORKDIR /root/ -RUN tar -zxf ${pkgFile} && cd /root/${dirName}/ && /bin/bash install.sh -e no && cd /root && rm /root/${pkgFile} && rm -rf /root/${dirName} && apt-get update && apt-get install -y locales tzdata netcat curl gdb vim tmux less net-tools valgrind rsync && locale-gen en_US.UTF-8 && apt-get clean && rm -rf /var/lib/apt/lists/ && chmod +x /tini + +ADD https://github.com/krallin/tini/releases/download/${TINI_VERSION}/tini-${cpuType} /tini +RUN chmod +x /tini + +RUN tar -zxf ${pkgFile} && \ + /bin/bash /root/${dirName}/install.sh -e no && \ + rm /root/${pkgFile} && \ + rm -rf /root/${dirName} && \ + apt-get update && \ + apt-get install -y --no-install-recommends \ + locales \ + tzdata \ + netcat \ + curl \ + gdb \ + vim \ + tmux \ + less \ + net-tools \ + valgrind \ + rsync && \ + apt-get clean && \ + rm -rf /var/lib/apt/lists/* && \ + locale-gen en_US.UTF-8 ENV LD_LIBRARY_PATH="$LD_LIBRARY_PATH:/usr/lib" \ LC_CTYPE=en_US.UTF-8 \ LANG=en_US.UTF-8 \ LC_ALL=en_US.UTF-8 + COPY ./bin/* /usr/bin/ ENTRYPOINT ["/tini", "--", "/usr/bin/entrypoint.sh"] CMD ["taosd"] -VOLUME [ "/var/lib/taos", "/var/log/taos", "/corefile" ] + +VOLUME [ "/var/lib/taos", "/var/log/taos", "/corefile" ] \ No newline at end of file From 67853b4a5461d72739f49f853f1f6abaa3f85b12 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 26 Jan 2025 18:19:37 +0800 Subject: [PATCH 03/21] enh(stream): add checkpoint queue for source tasks. --- include/libs/stream/tstream.h | 1 + source/libs/stream/inc/streamInt.h | 2 + source/libs/stream/src/streamQueue.c | 136 ++++++++++++++++++++++----- 3 files changed, 113 insertions(+), 26 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index c4c0aaf742..5188e3c667 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -136,6 +136,7 @@ enum { enum { STREAM_QUEUE__SUCESS = 1, STREAM_QUEUE__FAILED, + STREAM_QUEUE__CHKPTFAILED, STREAM_QUEUE__PROCESSING, }; diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index d9778a6a05..0e5aaac58c 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -144,6 +144,8 @@ struct SStreamQueue { STaosQall* qall; void* qItem; int8_t status; + STaosQueue* pChkptQueue; + void* qChkptItem; }; struct SStreamQueueItem { diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index 401aa7530d..2a1332c0c4 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -47,7 +47,9 @@ static void streamQueueCleanup(SStreamQueue* pQueue) { int32_t streamQueueOpen(int64_t cap, SStreamQueue** pQ) { *pQ = NULL; + int32_t code = 0; + int32_t lino = 0; SStreamQueue* pQueue = taosMemoryCalloc(1, sizeof(SStreamQueue)); if (pQueue == NULL) { @@ -55,24 +57,26 @@ int32_t streamQueueOpen(int64_t cap, SStreamQueue** pQ) { } code = taosOpenQueue(&pQueue->pQueue); - if (code) { - taosMemoryFreeClear(pQueue); - return code; - } + TSDB_CHECK_CODE(code, lino, _error); code = taosAllocateQall(&pQueue->qall); - if (code) { - taosCloseQueue(pQueue->pQueue); - taosMemoryFree(pQueue); - return code; - } + TSDB_CHECK_CODE(code, lino, _error); + + code = taosOpenQueue(&pQueue->pChkptQueue); + TSDB_CHECK_CODE(code, lino, _error); pQueue->status = STREAM_QUEUE__SUCESS; + taosSetQueueCapacity(pQueue->pQueue, cap); taosSetQueueMemoryCapacity(pQueue->pQueue, cap * 1024); *pQ = pQueue; return code; + +_error: + streamQueueClose(pQueue, 0); + stError("failed to open stream queue at line:%d, code:%s", lino, tstrerror(code)); + return code; } void streamQueueClose(SStreamQueue* pQueue, int32_t taskId) { @@ -82,6 +86,11 @@ void streamQueueClose(SStreamQueue* pQueue, int32_t taskId) { taosFreeQall(pQueue->qall); taosCloseQueue(pQueue->pQueue); + pQueue->pQueue = NULL; + + taosCloseQueue(pQueue->pChkptQueue); + pQueue->pChkptQueue = NULL; + taosMemoryFree(pQueue); } @@ -94,6 +103,46 @@ void streamQueueNextItem(SStreamQueue* pQueue, SStreamQueueItem** pItem) { } else { pQueue->qItem = NULL; (void) taosGetQitem(pQueue->qall, &pQueue->qItem); + + if (pQueue->qItem == NULL) { + (void) taosReadAllQitems(pQueue->pQueue, pQueue->qall); + (void) taosGetQitem(pQueue->qall, &pQueue->qItem); + } + + *pItem = streamQueueCurItem(pQueue); + } +} + +void streamQueueNextItemInSourceQ(SStreamQueue* pQueue, SStreamQueueItem** pItem, ETaskStatus status) { + *pItem = NULL; + int8_t flag = atomic_exchange_8(&pQueue->status, STREAM_QUEUE__PROCESSING); + + if (flag == STREAM_QUEUE__CHKPTFAILED) { + *pItem = pQueue->qChkptItem; + } else { + pQueue->qChkptItem = NULL; + taosReadQitem(pQueue->pChkptQueue, (void**) &pQueue->qChkptItem); + if (pQueue->qChkptItem != NULL) { + stDebug("read data from checkpoint queue, status:%d", status); + + *pItem = pQueue->qChkptItem; + return; + } + + // if in checkpoint status, not read data from ordinary input q. + if (status == TASK_STATUS__CK) { + stDebug("in checkpoint status, not ready data in normal queue"); + return; + } + } + + // let's try the ordinary input q + if (flag == STREAM_QUEUE__FAILED) { + *pItem = streamQueueCurItem(pQueue); + } else { + pQueue->qItem = NULL; + (void) taosGetQitem(pQueue->qall, &pQueue->qItem); + if (pQueue->qItem == NULL) { (void) taosReadAllQitems(pQueue->pQueue, pQueue->qall); (void) taosGetQitem(pQueue->qall, &pQueue->qItem); @@ -110,6 +159,7 @@ void streamQueueProcessSuccess(SStreamQueue* queue) { } queue->qItem = NULL; + queue->qChkptItem = NULL; atomic_store_8(&queue->status, STREAM_QUEUE__SUCESS); } @@ -121,6 +171,14 @@ void streamQueueProcessFail(SStreamQueue* queue) { atomic_store_8(&queue->status, STREAM_QUEUE__FAILED); } +void streamQueueGetSourceChkptFailed(SStreamQueue* pQueue) { + if (atomic_load_8(&pQueue->status) != STREAM_QUEUE__PROCESSING) { + stError("invalid queue status:%d, expect:%d", atomic_load_8(&pQueue->status), STREAM_QUEUE__PROCESSING); + return; + } + atomic_store_8(&pQueue->status, STREAM_QUEUE__CHKPTFAILED); +} + bool streamQueueIsFull(const SStreamQueue* pQueue) { int32_t numOfItems = streamQueueGetNumOfItems(pQueue); if (numOfItems >= STREAM_TASK_QUEUE_CAPACITY) { @@ -175,8 +233,9 @@ const char* streamQueueItemGetTypeStr(int32_t type) { EExtractDataCode streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks, int32_t* blockSize) { - const char* id = pTask->id.idStr; - int32_t taskLevel = pTask->info.taskLevel; + const char* id = pTask->id.idStr; + int32_t taskLevel = pTask->info.taskLevel; + SStreamQueue* pQueue = pTask->inputq.queue; *pInput = NULL; *numOfBlocks = 0; @@ -189,13 +248,19 @@ EExtractDataCode streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueIte } while (1) { - if (streamTaskShouldPause(pTask) || streamTaskShouldStop(pTask)) { - stDebug("s-task:%s task should pause, extract input blocks:%d", id, *numOfBlocks); + ETaskStatus status = streamTaskGetStatus(pTask).state; + if (status == TASK_STATUS__PAUSE || status == TASK_STATUS__STOP || status == TASK_STATUS__STOP) { + stDebug("s-task:%s task should pause/stop, extract input blocks:%d", id, *numOfBlocks); return EXEC_CONTINUE; } SStreamQueueItem* qItem = NULL; - streamQueueNextItem(pTask->inputq.queue, (SStreamQueueItem**)&qItem); + if (taskLevel == TASK_LEVEL__SOURCE) { + streamQueueNextItemInSourceQ(pQueue, &qItem, status); + } else { + streamQueueNextItem(pQueue, &qItem); + } + if (qItem == NULL) { // restore the token to bucket if (*numOfBlocks > 0) { @@ -225,14 +290,19 @@ EExtractDataCode streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueIte *numOfBlocks = 1; *pInput = qItem; return EXEC_CONTINUE; - } else { // previous existed blocks needs to be handle, before handle the checkpoint msg block + } else { // previous existed blocks needs to be handled, before handle the checkpoint msg block stDebug("s-task:%s %s msg extracted, handle previous blocks, numOfBlocks:%d", id, p, *numOfBlocks); *blockSize = streamQueueItemGetSize(*pInput); if (taskLevel == TASK_LEVEL__SINK) { streamTaskConsumeQuota(pTask->outputInfo.pTokenBucket, *blockSize); } - streamQueueProcessFail(pTask->inputq.queue); + if ((type == STREAM_INPUT__CHECKPOINT_TRIGGER || type == STREAM_INPUT__CHECKPOINT) && + (taskLevel == TASK_LEVEL__SOURCE)) { + streamQueueGetSourceChkptFailed(pQueue); + } else { + streamQueueProcessFail(pQueue); + } return EXEC_CONTINUE; } } else { @@ -252,7 +322,7 @@ EExtractDataCode streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueIte streamTaskConsumeQuota(pTask->outputInfo.pTokenBucket, *blockSize); } - streamQueueProcessFail(pTask->inputq.queue); + streamQueueProcessFail(pQueue); return EXEC_CONTINUE; } @@ -260,7 +330,7 @@ EExtractDataCode streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueIte } *numOfBlocks += 1; - streamQueueProcessSuccess(pTask->inputq.queue); + streamQueueProcessSuccess(pQueue); if (*numOfBlocks >= MAX_STREAM_EXEC_BATCH_NUM) { stDebug("s-task:%s batch size limit:%d reached, start to process blocks", id, MAX_STREAM_EXEC_BATCH_NUM); @@ -279,6 +349,7 @@ EExtractDataCode streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueIte int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem) { int8_t type = pItem->type; STaosQueue* pQueue = pTask->inputq.queue->pQueue; + int32_t level = pTask->info.taskLevel; int32_t total = streamQueueGetNumOfItems(pTask->inputq.queue) + 1; if (type == STREAM_INPUT__DATA_SUBMIT) { @@ -326,15 +397,28 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem) stDebug("s-task:%s blockdata enqueue, total in queue:%d, size:%.2fMiB", pTask->id.idStr, total, size); } else if (type == STREAM_INPUT__CHECKPOINT || type == STREAM_INPUT__CHECKPOINT_TRIGGER || type == STREAM_INPUT__TRANS_STATE || type == STREAM_INPUT__DATA_RETRIEVE) { - int32_t code = taosWriteQitem(pQueue, pItem); - if (code != TSDB_CODE_SUCCESS) { - streamFreeQitem(pItem); - return code; - } - double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue)); - stDebug("s-task:%s level:%d %s blockdata enqueue, total in queue:%d, size:%.2fMiB", pTask->id.idStr, - pTask->info.taskLevel, streamQueueItemGetTypeStr(type), total, size); + int32_t code = 0; + if ((type == STREAM_INPUT__CHECKPOINT_TRIGGER || type == STREAM_INPUT__CHECKPOINT) && level == TASK_LEVEL__SOURCE) { + STaosQueue* pChkptQ = pTask->inputq.queue->pChkptQueue; + code = taosWriteQitem(pChkptQ, pItem); + + double size = SIZE_IN_MiB(taosQueueMemorySize(pChkptQ)); + int32_t num = taosQueueItemSize(pChkptQ); + + stDebug("s-task:%s level:%d %s checkpoint enqueue ctrl queue, total in queue:%d, size:%.2fMiB, data queue:%d", + pTask->id.idStr, pTask->info.taskLevel, streamQueueItemGetTypeStr(type), num, size, (total - 1)); + } else { + code = taosWriteQitem(pQueue, pItem); + if (code != TSDB_CODE_SUCCESS) { + streamFreeQitem(pItem); + return code; + } + + double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue)); + stDebug("s-task:%s level:%d %s blockdata enqueue, total in queue:%d, size:%.2fMiB", pTask->id.idStr, + pTask->info.taskLevel, streamQueueItemGetTypeStr(type), total, size); + } } else if (type == STREAM_INPUT__GET_RES) { // use the default memory limit, refactor later. int32_t code = taosWriteQitem(pQueue, pItem); From 24c6fe727ff72ce32ff771b168d02f35cdfdc547 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 26 Jan 2025 21:50:51 +0800 Subject: [PATCH 04/21] refactor(stream): do some internal refactor. --- source/libs/stream/src/streamQueue.c | 52 +++++++++++++++------------- 1 file changed, 28 insertions(+), 24 deletions(-) diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index 2a1332c0c4..6a9a1ac880 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -119,37 +119,41 @@ void streamQueueNextItemInSourceQ(SStreamQueue* pQueue, SStreamQueueItem** pItem if (flag == STREAM_QUEUE__CHKPTFAILED) { *pItem = pQueue->qChkptItem; - } else { - pQueue->qChkptItem = NULL; - taosReadQitem(pQueue->pChkptQueue, (void**) &pQueue->qChkptItem); - if (pQueue->qChkptItem != NULL) { - stDebug("read data from checkpoint queue, status:%d", status); + ASSERT(status != TASK_STATUS__CK && pQueue->qItem == NULL); + return; + } - *pItem = pQueue->qChkptItem; - return; - } + if (flag == STREAM_QUEUE__FAILED) { + *pItem = pQueue->qItem; + ASSERT(status != TASK_STATUS__CK && pQueue->qChkptItem == NULL); + return; + } - // if in checkpoint status, not read data from ordinary input q. - if (status == TASK_STATUS__CK) { - stDebug("in checkpoint status, not ready data in normal queue"); - return; - } + pQueue->qChkptItem = NULL; + taosReadQitem(pQueue->pChkptQueue, (void**)&pQueue->qChkptItem); + if (pQueue->qChkptItem != NULL) { + stDebug("read data from checkpoint queue, status:%d", status); + + *pItem = pQueue->qChkptItem; + return; + } + + // if in checkpoint status, not read data from ordinary input q. + if (status == TASK_STATUS__CK) { + stDebug("in checkpoint status, not ready data in normal queue"); + return; } // let's try the ordinary input q - if (flag == STREAM_QUEUE__FAILED) { - *pItem = streamQueueCurItem(pQueue); - } else { - pQueue->qItem = NULL; - (void) taosGetQitem(pQueue->qall, &pQueue->qItem); + pQueue->qItem = NULL; + (void)taosGetQitem(pQueue->qall, &pQueue->qItem); - if (pQueue->qItem == NULL) { - (void) taosReadAllQitems(pQueue->pQueue, pQueue->qall); - (void) taosGetQitem(pQueue->qall, &pQueue->qItem); - } - - *pItem = streamQueueCurItem(pQueue); + if (pQueue->qItem == NULL) { + (void)taosReadAllQitems(pQueue->pQueue, pQueue->qall); + (void)taosGetQitem(pQueue->qall, &pQueue->qItem); } + + *pItem = streamQueueCurItem(pQueue); } void streamQueueProcessSuccess(SStreamQueue* queue) { From f56aeaf1bd06d3a60da5fa0a7d929257d1da2348 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 26 Jan 2025 22:57:06 +0800 Subject: [PATCH 05/21] fix(stream): fix error in deciding exec should quit or not. --- source/libs/stream/src/streamExec.c | 28 +++++++++++++++++++++++++--- source/libs/stream/src/streamQueue.c | 9 ++++----- 2 files changed, 29 insertions(+), 8 deletions(-) diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 054f88ec3d..ef3fb11866 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -915,8 +915,31 @@ bool streamTaskReadyToRun(const SStreamTask* pTask, char** pStatus) { } } +static bool shouldNotCont(SStreamTask* pTask) { + int32_t level = pTask->info.taskLevel; + SStreamQueue* pQueue = pTask->inputq.queue; + ETaskStatus status = streamTaskGetStatus(pTask).state; + + // 1. task should jump out + bool quit = (status == TASK_STATUS__STOP) || (status == TASK_STATUS__PAUSE) || (status == TASK_STATUS__DROPPING); + + // 2. checkpoint procedure, the source task's checkpoint queue is empty, not read from ordinary queue + bool notCkCont = + (taosQueueItemSize(pQueue->pChkptQueue) == 0) && (level == TASK_LEVEL__SOURCE) && (status == TASK_STATUS__CK); + + // 3. no data in ordinary queue + int32_t numOfItems = streamQueueGetNumOfItems(pQueue); + + if ((numOfItems == 0) || quit || notCkCont) { + return true; + } else { + return false; + } +} + int32_t streamResumeTask(SStreamTask* pTask) { const char* id = pTask->id.idStr; + int32_t level = pTask->info.taskLevel; int32_t code = 0; if (pTask->status.schedStatus != TASK_SCHED_STATUS__ACTIVE) { @@ -929,11 +952,10 @@ int32_t streamResumeTask(SStreamTask* pTask) { if (code) { stError("s-task:%s failed to exec stream task, code:%s, continue", id, tstrerror(code)); } - // check if continue + streamMutexLock(&pTask->lock); - int32_t numOfItems = streamQueueGetNumOfItems(pTask->inputq.queue); - if ((numOfItems == 0) || streamTaskShouldStop(pTask) || streamTaskShouldPause(pTask)) { + if (shouldNotCont(pTask)) { atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); streamTaskClearSchedIdleInfo(pTask); streamMutexUnlock(&pTask->lock); diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index 6a9a1ac880..3d0d01ce0e 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -113,7 +113,7 @@ void streamQueueNextItem(SStreamQueue* pQueue, SStreamQueueItem** pItem) { } } -void streamQueueNextItemInSourceQ(SStreamQueue* pQueue, SStreamQueueItem** pItem, ETaskStatus status) { +void streamQueueNextItemInSourceQ(SStreamQueue* pQueue, SStreamQueueItem** pItem, ETaskStatus status, const char* id) { *pItem = NULL; int8_t flag = atomic_exchange_8(&pQueue->status, STREAM_QUEUE__PROCESSING); @@ -132,15 +132,14 @@ void streamQueueNextItemInSourceQ(SStreamQueue* pQueue, SStreamQueueItem** pItem pQueue->qChkptItem = NULL; taosReadQitem(pQueue->pChkptQueue, (void**)&pQueue->qChkptItem); if (pQueue->qChkptItem != NULL) { - stDebug("read data from checkpoint queue, status:%d", status); - + stDebug("s-task:%s read data from checkpoint queue, status:%d", id, status); *pItem = pQueue->qChkptItem; return; } // if in checkpoint status, not read data from ordinary input q. if (status == TASK_STATUS__CK) { - stDebug("in checkpoint status, not ready data in normal queue"); + stDebug("s-task:%s in checkpoint status, not read data in block queue, status:%d", id, status); return; } @@ -260,7 +259,7 @@ EExtractDataCode streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueIte SStreamQueueItem* qItem = NULL; if (taskLevel == TASK_LEVEL__SOURCE) { - streamQueueNextItemInSourceQ(pQueue, &qItem, status); + streamQueueNextItemInSourceQ(pQueue, &qItem, status, id); } else { streamQueueNextItem(pQueue, &qItem); } From ed03b3a22cc3e70e7919e9d81c61d462b32893fb Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 27 Jan 2025 00:50:36 +0800 Subject: [PATCH 06/21] fix(stream): try starting task even the inputQ is full. --- source/dnode/vnode/src/tq/tqStreamTask.c | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index b34ea78f64..88bbc30b7b 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -317,9 +317,10 @@ bool taskReadyForDataFromWal(SStreamTask* pTask) { return false; } - // check if input queue is full or not + // check whether input queue is full or not if (streamQueueIsFull(pTask->inputq.queue)) { - tqTrace("s-task:%s input queue is full, do nothing", pTask->id.idStr); + tqTrace("s-task:%s input queue is full, launch task without scanning wal", pTask->id.idStr); + streamTrySchedExec(pTask); return false; } From cb9a1a852d58aee1bb743e1cf1cd901ce03e28da Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 28 Jan 2025 01:38:16 +0800 Subject: [PATCH 07/21] refactor: update some logs. --- source/libs/stream/src/streamDispatch.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index b7039d372d..b093f808c0 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -1098,6 +1098,8 @@ static void chkptReadyMsgSendMonitorFn(void* param, void* tmrId) { pActiveInfo = pTask->chkInfo.pActiveInfo; pTmrInfo = &pActiveInfo->chkptReadyMsgTmr; + stDebug("s-task:%s acquire task, refId:%" PRId64, id, taskRefId); + // check the status every 100ms if (streamTaskShouldStop(pTask)) { streamCleanBeforeQuitTmr(pTmrInfo, param); From 34a439d78bb322ad2ec5c3814eae072053855430 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 5 Feb 2025 10:59:29 +0800 Subject: [PATCH 08/21] fix(stream): fix error in the check of continuing execution condition. --- source/libs/stream/src/streamExec.c | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index ef3fb11866..3555515f75 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -924,16 +924,20 @@ static bool shouldNotCont(SStreamTask* pTask) { bool quit = (status == TASK_STATUS__STOP) || (status == TASK_STATUS__PAUSE) || (status == TASK_STATUS__DROPPING); // 2. checkpoint procedure, the source task's checkpoint queue is empty, not read from ordinary queue - bool notCkCont = - (taosQueueItemSize(pQueue->pChkptQueue) == 0) && (level == TASK_LEVEL__SOURCE) && (status == TASK_STATUS__CK); + bool emptyCkQueue = (taosQueueItemSize(pQueue->pChkptQueue) == 0) && (level == TASK_LEVEL__SOURCE); // 3. no data in ordinary queue - int32_t numOfItems = streamQueueGetNumOfItems(pQueue); + int32_t emptyBlockQueue = (streamQueueGetNumOfItems(pQueue) == 0); - if ((numOfItems == 0) || quit || notCkCont) { + if (quit) { return true; } else { - return false; + if (status == TASK_STATUS__CK) { + // in checkpoint procedure, we only check whether the controller queue is empty or not + return emptyCkQueue; + } else { // otherwise, if the block queue is empty, not continue. + return emptyBlockQueue; + } } } From 732a9d4b32b9cb36be64b3bbe674e242f1a90307 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 5 Feb 2025 13:35:23 +0800 Subject: [PATCH 09/21] fix(stream): fix error in the check of continuing execution condition. --- source/libs/stream/src/streamExec.c | 8 ++++---- source/libs/stream/src/streamQueue.c | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 3555515f75..1015917f61 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -924,19 +924,19 @@ static bool shouldNotCont(SStreamTask* pTask) { bool quit = (status == TASK_STATUS__STOP) || (status == TASK_STATUS__PAUSE) || (status == TASK_STATUS__DROPPING); // 2. checkpoint procedure, the source task's checkpoint queue is empty, not read from ordinary queue - bool emptyCkQueue = (taosQueueItemSize(pQueue->pChkptQueue) == 0) && (level == TASK_LEVEL__SOURCE); + bool emptyCkQueue = (taosQueueItemSize(pQueue->pChkptQueue) == 0); // 3. no data in ordinary queue - int32_t emptyBlockQueue = (streamQueueGetNumOfItems(pQueue) == 0); + bool emptyBlockQueue = (streamQueueGetNumOfItems(pQueue) == 0); if (quit) { return true; } else { - if (status == TASK_STATUS__CK) { + if (status == TASK_STATUS__CK && level == TASK_LEVEL__SOURCE) { // in checkpoint procedure, we only check whether the controller queue is empty or not return emptyCkQueue; } else { // otherwise, if the block queue is empty, not continue. - return emptyBlockQueue; + return emptyBlockQueue && emptyCkQueue; } } } diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index 3d0d01ce0e..f68dd1452f 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -402,7 +402,7 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem) type == STREAM_INPUT__TRANS_STATE || type == STREAM_INPUT__DATA_RETRIEVE) { int32_t code = 0; - if ((type == STREAM_INPUT__CHECKPOINT_TRIGGER || type == STREAM_INPUT__CHECKPOINT) && level == TASK_LEVEL__SOURCE) { + if ((type == STREAM_INPUT__CHECKPOINT_TRIGGER || type == STREAM_INPUT__CHECKPOINT) && (level == TASK_LEVEL__SOURCE)) { STaosQueue* pChkptQ = pTask->inputq.queue->pChkptQueue; code = taosWriteQitem(pChkptQ, pItem); From e60321196bf7f360ac5a1630648048a4f075371a Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 9 Feb 2025 22:29:41 +0800 Subject: [PATCH 10/21] fix(stream): remove invalid assert. --- source/libs/stream/src/streamQueue.c | 2 -- 1 file changed, 2 deletions(-) diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index f68dd1452f..c7b0bc8f11 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -119,13 +119,11 @@ void streamQueueNextItemInSourceQ(SStreamQueue* pQueue, SStreamQueueItem** pItem if (flag == STREAM_QUEUE__CHKPTFAILED) { *pItem = pQueue->qChkptItem; - ASSERT(status != TASK_STATUS__CK && pQueue->qItem == NULL); return; } if (flag == STREAM_QUEUE__FAILED) { *pItem = pQueue->qItem; - ASSERT(status != TASK_STATUS__CK && pQueue->qChkptItem == NULL); return; } From 3e55f8edfed67b32a5ccdcc584a329b7ba606ef2 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 23 Feb 2025 10:43:56 +0800 Subject: [PATCH 11/21] fix(stream): follower nodes not restart tasks. --- include/dnode/vnode/tqCommon.h | 2 +- source/dnode/snode/src/snode.c | 2 +- source/dnode/vnode/src/tq/tq.c | 3 ++- source/dnode/vnode/src/tqCommon/tqCommon.c | 31 ++++++++++++++-------- 4 files changed, 24 insertions(+), 14 deletions(-) diff --git a/include/dnode/vnode/tqCommon.h b/include/dnode/vnode/tqCommon.h index 4d5e18520c..d4ca0aba62 100644 --- a/include/dnode/vnode/tqCommon.h +++ b/include/dnode/vnode/tqCommon.h @@ -19,7 +19,7 @@ // message process int32_t tqStreamTaskStartAsync(SStreamMeta* pMeta, SMsgCb* cb, bool restart); int32_t tqStreamStartOneTaskAsync(SStreamMeta* pMeta, SMsgCb* cb, int64_t streamId, int32_t taskId); -int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pMsg, bool restored); +int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pMsg, bool restored, bool isLeader); int32_t tqStreamTaskProcessDispatchReq(SStreamMeta* pMeta, SRpcMsg* pMsg); int32_t tqStreamTaskProcessDispatchRsp(SStreamMeta* pMeta, SRpcMsg* pMsg); int32_t tqStreamTaskProcessRetrieveReq(SStreamMeta* pMeta, SRpcMsg* pMsg); diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index 6eee8c510b..b15ac447b0 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -157,7 +157,7 @@ int32_t sndProcessWriteMsg(SSnode *pSnode, SRpcMsg *pMsg, SRpcMsg *pRsp) { case TDMT_STREAM_TASK_DROP: return tqStreamTaskProcessDropReq(pSnode->pMeta, pMsg->pCont, pMsg->contLen); case TDMT_VND_STREAM_TASK_UPDATE: - return tqStreamTaskProcessUpdateReq(pSnode->pMeta, &pSnode->msgCb, pMsg, true); + return tqStreamTaskProcessUpdateReq(pSnode->pMeta, &pSnode->msgCb, pMsg, true, true); case TDMT_VND_STREAM_TASK_RESET: return tqStreamTaskProcessTaskResetReq(pSnode->pMeta, pMsg->pCont); case TDMT_STREAM_TASK_PAUSE: diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 63727e5c45..e1298b11ea 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1364,7 +1364,8 @@ int32_t tqProcessTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg) { } int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { - return tqStreamTaskProcessUpdateReq(pTq->pStreamMeta, &pTq->pVnode->msgCb, pMsg, pTq->pVnode->restored); + return tqStreamTaskProcessUpdateReq(pTq->pStreamMeta, &pTq->pVnode->msgCb, pMsg, + pTq->pVnode->restored, (pTq->pStreamMeta->role == NODE_ROLE_LEADER)); } int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg) { diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 2c48ada0fa..fb3582c5ff 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -139,7 +139,7 @@ int32_t tqStreamStartOneTaskAsync(SStreamMeta* pMeta, SMsgCb* cb, int64_t stream } // this is to process request from transaction, always return true. -int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pMsg, bool restored) { +int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pMsg, bool restored, bool isLeader) { int32_t vgId = pMeta->vgId; char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); int32_t len = pMsg->contLen - sizeof(SMsgHead); @@ -298,14 +298,19 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta); int32_t updateTasks = taosHashGetSize(pMeta->updateInfo.pTasks); - if (restored) { + if (restored && isLeader) { tqDebug("vgId:%d s-task:0x%x update epset transId:%d, set the restart flag", vgId, req.taskId, req.transId); pMeta->startInfo.tasksWillRestart = 1; } if (updateTasks < numOfTasks) { - tqDebug("vgId:%d closed tasks:%d, unclosed:%d, all tasks will be started when nodeEp update completed", vgId, - updateTasks, (numOfTasks - updateTasks)); + if (isLeader) { + tqDebug("vgId:%d closed tasks:%d, unclosed:%d, all tasks will be started when nodeEp update completed", vgId, + updateTasks, (numOfTasks - updateTasks)); + } else { + tqDebug("vgId:%d closed tasks:%d, unclosed:%d, follower not restart tasks", vgId, updateTasks, + (numOfTasks - updateTasks)); + } } else { if ((code = streamMetaCommit(pMeta)) < 0) { // always return true @@ -316,17 +321,21 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM streamMetaClearSetUpdateTaskListComplete(pMeta); - if (!restored) { - tqDebug("vgId:%d vnode restore not completed, not start all tasks", vgId); - } else { - tqDebug("vgId:%d all %d task(s) nodeEp updated and closed, transId:%d", vgId, numOfTasks, req.transId); + if (isLeader) { + if (!restored) { + tqDebug("vgId:%d vnode restore not completed, not start all tasks", vgId); + } else { + tqDebug("vgId:%d all %d task(s) nodeEp updated and closed, transId:%d", vgId, numOfTasks, req.transId); #if 0 taosMSleep(5000);// for test purpose, to trigger the leader election #endif - code = tqStreamTaskStartAsync(pMeta, cb, true); - if (code) { - tqError("vgId:%d async start all tasks, failed, code:%s", vgId, tstrerror(code)); + code = tqStreamTaskStartAsync(pMeta, cb, true); + if (code) { + tqError("vgId:%d async start all tasks, failed, code:%s", vgId, tstrerror(code)); + } } + } else { + tqDebug("vgId:%d follower nodes not restart tasks", vgId); } } From 14ef25101dddd37307b6fc75d6ee10f84eb887e5 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 23 Feb 2025 15:37:59 +0800 Subject: [PATCH 12/21] fix(stream): adjust log. --- source/dnode/vnode/src/tq/tqStreamTask.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index 88bbc30b7b..57579eecb4 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -131,7 +131,7 @@ static void doStartScanWal(void* param, void* tmrId) { } if (pMeta->startInfo.startAllTasks) { - tqTrace("vgId:%d in restart procedure, not ready to scan wal", vgId); + tqDebug("vgId:%d in restart procedure, not ready to scan wal", vgId); goto _end; } From 951b469b10c6a0e306662a35d904b9638df611bb Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 23 Feb 2025 16:18:19 +0800 Subject: [PATCH 13/21] fix(stream): add some logs. --- source/dnode/vnode/src/tq/tqStreamTask.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index 57579eecb4..1e021a8d5b 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -87,6 +87,8 @@ static void doStartScanWal(void* param, void* tmrId) { tmr_h pTimer = NULL; SBuildScanWalMsgParam* pParam = (SBuildScanWalMsgParam*)param; + tqDebug("start to do scan wal in tmr, metaRid:%" PRId64, pParam->metaId); + SStreamMeta* pMeta = taosAcquireRef(streamMetaRefPool, pParam->metaId); if (pMeta == NULL) { tqError("metaRid:%" PRId64 " not valid now, stream meta has been freed", pParam->metaId); From 6e1206bbc11ef6e2bb0f7da5284c9a86bc14b731 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 23 Feb 2025 16:18:49 +0800 Subject: [PATCH 14/21] fix(stream): add some logs. --- source/dnode/vnode/src/tq/tqStreamTask.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index 1e021a8d5b..c8862b03ad 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -156,7 +156,7 @@ static void doStartScanWal(void* param, void* tmrId) { goto _end; } - tqTrace("vgId:%d create msg to start wal scan, numOfTasks:%d", vgId, numOfTasks); + tqDebug("vgId:%d create msg to start wal scan, numOfTasks:%d", vgId, numOfTasks); #if 0 // wait for the vnode is freed, and invalid read may occur. From b23f20a4507c78673b848175946acc5c4a3c3576 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 25 Feb 2025 14:29:49 +0800 Subject: [PATCH 15/21] fix(stream): fix memory leak in checkpt queue. --- source/libs/stream/src/streamQueue.c | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index c7b0bc8f11..300b3bdb4d 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -32,11 +32,12 @@ typedef struct SQueueReader { static bool streamTaskExtractAvailableToken(STokenBucket* pBucket, const char* id); static void streamTaskPutbackToken(STokenBucket* pBucket); static void streamTaskConsumeQuota(STokenBucket* pBucket, int32_t bytes); +static void streamQueueNextItemInSourceQ(SStreamQueue* pQueue, SStreamQueueItem** pItem, ETaskStatus status, const char* id); static void streamQueueCleanup(SStreamQueue* pQueue) { SStreamQueueItem* qItem = NULL; while (1) { - streamQueueNextItem(pQueue, &qItem); + streamQueueNextItemInSourceQ(pQueue, &qItem, TASK_STATUS__READY, NULL); if (qItem == NULL) { break; } @@ -250,7 +251,7 @@ EExtractDataCode streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueIte while (1) { ETaskStatus status = streamTaskGetStatus(pTask).state; - if (status == TASK_STATUS__PAUSE || status == TASK_STATUS__STOP || status == TASK_STATUS__STOP) { + if (status == TASK_STATUS__PAUSE || status == TASK_STATUS__STOP) { stDebug("s-task:%s task should pause/stop, extract input blocks:%d", id, *numOfBlocks); return EXEC_CONTINUE; } From 00e02280b81eed943e3843f3d8feb3b160b26cd7 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 28 Feb 2025 14:01:26 +0800 Subject: [PATCH 16/21] fix(stream): adjust the error code position. --- include/libs/stream/tstream.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 5188e3c667..0fa32acfbb 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -136,8 +136,8 @@ enum { enum { STREAM_QUEUE__SUCESS = 1, STREAM_QUEUE__FAILED, - STREAM_QUEUE__CHKPTFAILED, STREAM_QUEUE__PROCESSING, + STREAM_QUEUE__CHKPTFAILED, }; typedef enum EStreamTaskEvent { From 3892a98e2ee61619fadbcf1f30ff74c4f0c6a824 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 28 Feb 2025 14:09:45 +0800 Subject: [PATCH 17/21] fix(stream): check return values. --- source/libs/stream/src/streamQueue.c | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index 300b3bdb4d..954e41f288 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -144,11 +144,21 @@ void streamQueueNextItemInSourceQ(SStreamQueue* pQueue, SStreamQueueItem** pItem // let's try the ordinary input q pQueue->qItem = NULL; - (void)taosGetQitem(pQueue->qall, &pQueue->qItem); + int32_t code = taosGetQitem(pQueue->qall, &pQueue->qItem); + if (code) { + stError("s-task:%s failed to get item in inputq, code:%s", id, tstrerror(code)); + } if (pQueue->qItem == NULL) { - (void)taosReadAllQitems(pQueue->pQueue, pQueue->qall); - (void)taosGetQitem(pQueue->qall, &pQueue->qItem); + code = taosReadAllQitems(pQueue->pQueue, pQueue->qall); + if (code) { + stError("s-task:%s failed to get all items in inputq, code:%s", id, tstrerror(code)); + } + + code = taosGetQitem(pQueue->qall, &pQueue->qItem); + if (code) { + stError("s-task:%s failed to get item in inputq, code:%s", id, tstrerror(code)); + } } *pItem = streamQueueCurItem(pQueue); From 1462f64ce209b1628b505333003ea5b427a9c4e0 Mon Sep 17 00:00:00 2001 From: facetosea <285808407@qq.com> Date: Fri, 28 Feb 2025 14:15:41 +0800 Subject: [PATCH 18/21] fix: different lenth in uion --- source/libs/parser/src/parTranslater.c | 52 +++++++++++++++ tests/army/query/queryBugs.py | 88 +++++++++++++++++++++++++- 2 files changed, 139 insertions(+), 1 deletion(-) diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index be4882935f..3511636bd9 100755 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -7406,6 +7406,52 @@ static SNode* createSetOperProject(const char* pTableAlias, SNode* pNode) { return (SNode*)pCol; } +static bool isUionOperator(SNode* pNode) { + return QUERY_NODE_SET_OPERATOR == nodeType(pNode) && (((SSetOperator*)pNode)->opType == SET_OP_TYPE_UNION || + ((SSetOperator*)pNode)->opType == SET_OP_TYPE_UNION_ALL); +} + +static int32_t pushdownCastForUnion(STranslateContext* pCxt, SNode* pNode, SExprNode* pExpr, int pos) { + int32_t code = TSDB_CODE_SUCCESS; + if (isUionOperator(pNode)) { + SSetOperator* pSetOperator = (SSetOperator*)pNode; + SNodeList* pLeftProjections = getProjectList(pSetOperator->pLeft); + SNodeList* pRightProjections = getProjectList(pSetOperator->pRight); + if (LIST_LENGTH(pLeftProjections) != LIST_LENGTH(pRightProjections)) { + return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INCORRECT_NUM_OF_COL); + } + + SNode* pLeft = NULL; + SNode* pRight = NULL; + int32_t index = 0; + FORBOTH(pLeft, pLeftProjections, pRight, pRightProjections) { + ++index; + if (index < pos) { + continue; + } + SNode* pRightFunc = NULL; + code = createCastFunc(pCxt, pRight, pExpr->resType, &pRightFunc); + if (TSDB_CODE_SUCCESS != code || NULL == pRightFunc) { + return code; + } + REPLACE_LIST2_NODE(pRightFunc); + code = pushdownCastForUnion(pCxt, pSetOperator->pRight, (SExprNode*)pRightFunc, index); + if (TSDB_CODE_SUCCESS != code ) return code; + + SNode* pLeftFunc = NULL; + code = createCastFunc(pCxt, pLeft, pExpr->resType, &pLeftFunc); + if (TSDB_CODE_SUCCESS != code || NULL == pLeftFunc) { + return code; + } + REPLACE_LIST1_NODE(pLeftFunc); + code = pushdownCastForUnion(pCxt, pSetOperator->pLeft, (SExprNode*)pLeftFunc, index); + if (TSDB_CODE_SUCCESS != code ) return code; + break; + } + } + return TSDB_CODE_SUCCESS; +} + static int32_t translateSetOperProject(STranslateContext* pCxt, SSetOperator* pSetOperator) { SNodeList* pLeftProjections = getProjectList(pSetOperator->pLeft); SNodeList* pRightProjections = getProjectList(pSetOperator->pRight); @@ -7415,9 +7461,11 @@ static int32_t translateSetOperProject(STranslateContext* pCxt, SSetOperator* pS SNode* pLeft = NULL; SNode* pRight = NULL; + int32_t index = 0; FORBOTH(pLeft, pLeftProjections, pRight, pRightProjections) { SExprNode* pLeftExpr = (SExprNode*)pLeft; SExprNode* pRightExpr = (SExprNode*)pRight; + ++index; int32_t comp = dataTypeComp(&pLeftExpr->resType, &pRightExpr->resType); if (comp > 0) { SNode* pRightFunc = NULL; @@ -7427,6 +7475,8 @@ static int32_t translateSetOperProject(STranslateContext* pCxt, SSetOperator* pS } REPLACE_LIST2_NODE(pRightFunc); pRightExpr = (SExprNode*)pRightFunc; + code = pushdownCastForUnion(pCxt, pSetOperator->pRight, pRightExpr, index); + if (TSDB_CODE_SUCCESS != code ) return code; } else if (comp < 0) { SNode* pLeftFunc = NULL; int32_t code = createCastFunc(pCxt, pLeft, pRightExpr->resType, &pLeftFunc); @@ -7439,6 +7489,8 @@ static int32_t translateSetOperProject(STranslateContext* pCxt, SSetOperator* pS snprintf(pLeftFuncExpr->userAlias, sizeof(pLeftFuncExpr->userAlias), "%s", pLeftExpr->userAlias); pLeft = pLeftFunc; pLeftExpr = pLeftFuncExpr; + code = pushdownCastForUnion(pCxt, pSetOperator->pLeft, pLeftExpr, index); + if (TSDB_CODE_SUCCESS != code ) return code; } snprintf(pRightExpr->aliasName, sizeof(pRightExpr->aliasName), "%s", pLeftExpr->aliasName); SNode* pProj = createSetOperProject(pSetOperator->stmtName, pLeft); diff --git a/tests/army/query/queryBugs.py b/tests/army/query/queryBugs.py index cd61b8c620..e6b20addb9 100644 --- a/tests/army/query/queryBugs.py +++ b/tests/army/query/queryBugs.py @@ -325,12 +325,97 @@ class TDTestCase(TBase): tdSql.query("select * from t1 where ts > '2025-01-01 00:00:00';") tdSql.checkRows(0) + def FIX_TS_6058(self): + tdSql.execute("create database iot_60j_production_eqp;") + tdSql.execute("create table iot_60j_production_eqp.realtime_data_collections (device_time TIMESTAMP, item_value VARCHAR(64), \ + upload_time TIMESTAMP) tags(bu_id VARCHAR(64), district_id VARCHAR(64), factory_id VARCHAR(64), production_line_id VARCHAR(64), \ + production_processes_id VARCHAR(64), work_center_id VARCHAR(64), station_id VARCHAR(64), device_name VARCHAR(64), item_name VARCHAR(64));") + + sub1 = " SELECT '实际速度' as name, 0 as rank, '当月' as cycle,\ + CASE \ + WHEN COUNT(item_value) = 0 THEN NULL\ + ELSE AVG(CAST(item_value AS double))\ + END AS item_value\ + FROM iot_60j_production_eqp.realtime_data_collections\ + WHERE device_time >= TO_TIMESTAMP(CONCAT(substring(TO_CHAR(today ,'YYYY-MM-dd'), 1,7), '-01 00:00:00'), 'YYYY-mm-dd')\ + AND item_name = 'Premixer_SpindleMotor_ActualSpeed' " + + sub2 = " SELECT '实际速度' as name, 3 as rank, TO_CHAR(TODAY(),'YYYY-MM-dd') as cycle,\ + CASE \ + WHEN COUNT(item_value) = 0 THEN NULL\ + ELSE AVG(CAST(item_value AS double))\ + END AS item_value\ + FROM iot_60j_production_eqp.realtime_data_collections\ + WHERE device_time >= TODAY()-1d and device_time <= now()\ + AND item_name = 'Premixer_SpindleMotor_ActualSpeed' " + + sub3 = " SELECT '设定速度' as name, 1 as rank, CAST(CONCAT('WEEK-',CAST(WEEKOFYEAR(TODAY()-1w) as VARCHAR)) as VARCHAR) as cycle,\ + CASE \ + WHEN COUNT(item_value) = 0 THEN NULL\ + ELSE AVG(CAST(item_value AS double))\ + END AS item_value\ + FROM iot_60j_production_eqp.realtime_data_collections\ + where \ + item_name = 'Premixer_SpindleMotor_SettingSpeed'\ + AND (\ + (WEEKDAY(now) = 0 AND device_time >= today()-8d and device_time <= today()-1d) OR\ + (WEEKDAY(now) = 1 AND device_time >= today()-9d and device_time <= today()-2d) OR\ + (WEEKDAY(now) = 2 AND device_time >= today()-10d and device_time <= today()-3d) OR\ + (WEEKDAY(now) = 3 AND device_time >= today()-11d and device_time <= today()-4d) OR\ + (WEEKDAY(now) = 4 AND device_time >= today()-12d and device_time <= today()-5d) OR\ + (WEEKDAY(now) = 5 AND device_time >= today()-13d and device_time <= today()-6d) OR\ + (WEEKDAY(now) = 6 AND device_time >= today()-14d and device_time <= today()-7d)\ + ) " + + sub4 = " SELECT '设定速度2' as name, 1 as rank, CAST(CONCAT('WEEK-',CAST(WEEKOFYEAR(TODAY()-1w) as VARCHAR)) as VARCHAR(5000)) as cycle,\ + CASE \ + WHEN COUNT(item_value) = 0 THEN NULL\ + ELSE AVG(CAST(item_value AS double))\ + END AS item_value\ + FROM iot_60j_production_eqp.realtime_data_collections\ + where \ + item_name = 'Premixer_SpindleMotor_SettingSpeed'\ + AND (\ + (WEEKDAY(now) = 0 AND device_time >= today()-8d and device_time <= today()-1d) OR\ + (WEEKDAY(now) = 1 AND device_time >= today()-9d and device_time <= today()-2d) OR\ + (WEEKDAY(now) = 2 AND device_time >= today()-10d and device_time <= today()-3d) OR\ + (WEEKDAY(now) = 3 AND device_time >= today()-11d and device_time <= today()-4d) OR\ + (WEEKDAY(now) = 4 AND device_time >= today()-12d and device_time <= today()-5d) OR\ + (WEEKDAY(now) = 5 AND device_time >= today()-13d and device_time <= today()-6d) OR\ + (WEEKDAY(now) = 6 AND device_time >= today()-14d and device_time <= today()-7d)\ + ) " + for uiontype in ["union" ,"union all"]: + repeatLines = 1 + if uiontype == "union": + repeatLines = 0 + for i in range(1, 10): + tdLog.debug(f"test: realtime_data_collections {i} times...") + tdSql.query(f"select name,cycle,item_value from ( {sub1} {uiontype} {sub2} {uiontype} {sub3}) order by rank,name,cycle;", queryTimes = 1) + tdSql.checkRows(3) + tdSql.query(f"select name,cycle,item_value from ( {sub1} {uiontype} {sub2} {uiontype} {sub4}) order by rank,name,cycle;", queryTimes = 1) + tdSql.checkRows(3) + tdSql.query(f"select name,cycle,item_value from ( {sub3} {uiontype} {sub2} {uiontype} {sub1}) order by rank,name,cycle;", queryTimes = 1) + tdSql.checkRows(3) + tdSql.query(f"select name,cycle,item_value from ( {sub3} {uiontype} {sub2} {uiontype} {sub1}) order by rank,name,cycle;", queryTimes = 1) + tdSql.checkRows(3) + tdSql.query(f"select name,cycle,item_value from ( {sub2} {uiontype} {sub4} {uiontype} {sub1}) order by rank,name,cycle;", queryTimes = 1) + tdSql.checkRows(3) + tdSql.query(f"select name,cycle,item_value from ( {sub3} {uiontype} {sub2} {uiontype} {sub1} {uiontype} {sub4}) order by rank,name,cycle;", queryTimes = 1) + tdSql.checkRows(4) + tdSql.query(f"select name,cycle,item_value from ( {sub2} {uiontype} {sub3} {uiontype} {sub1} {uiontype} {sub4}) order by rank,name,cycle;", queryTimes = 1) + tdSql.checkRows(4) + tdSql.query(f"select name,cycle,item_value from ( {sub3} {uiontype} {sub4} {uiontype} {sub1} {uiontype} {sub2}) order by rank,name,cycle;", queryTimes = 1) + tdSql.checkRows(4) + tdSql.query(f"select name,cycle,item_value from ( {sub3} {uiontype} {sub4} {uiontype} {sub1} {uiontype} {sub2} {uiontype} {sub4}) order by rank,name,cycle;", queryTimes = 1) + tdSql.checkRows(4 + repeatLines) + tdSql.query(f"select name,cycle,item_value from ( {sub3} {uiontype} {sub2} {uiontype} {sub1} {uiontype} {sub2} {uiontype} {sub4}) order by rank,name,cycle;", queryTimes = 1) + tdSql.checkRows(4 + repeatLines) + # run def run(self): tdLog.debug(f"start to excute {__file__}") self.ts5946() - # TD BUGS self.FIX_TD_30686() self.FIX_TD_31684() @@ -340,6 +425,7 @@ class TDTestCase(TBase): self.FIX_TS_5143() self.FIX_TS_5239() self.FIX_TS_5984() + self.FIX_TS_6058() tdLog.success(f"{__file__} successfully executed") From fac18aa80694b7766068da973da946047f4d1c95 Mon Sep 17 00:00:00 2001 From: Feng Chao Date: Fri, 28 Feb 2025 14:52:57 +0800 Subject: [PATCH 19/21] ci: add linux/mac ci build and test workflow --- .github/workflows/taosd-ci.yml | 316 ++++++++++++++++++++++++++ .github/workflows/taosd-doc-build.yml | 74 +----- 2 files changed, 327 insertions(+), 63 deletions(-) create mode 100644 .github/workflows/taosd-ci.yml diff --git a/.github/workflows/taosd-ci.yml b/.github/workflows/taosd-ci.yml new file mode 100644 index 0000000000..022e4bcab2 --- /dev/null +++ b/.github/workflows/taosd-ci.yml @@ -0,0 +1,316 @@ + +name: TDengine CI Pipeline + +on: + pull_request: + branches: + - 'main' + - '3.0' + - '3.1' + paths-ignore: + - 'packaging/**' + - 'docs/**' + repository_dispatch: + types: [run-tests] + +concurrency: + group: ${{ github.workflow }}-${{ github.ref }} + cancel-in-progress: true + +env: + CONTAINER_NAME: 'taosd-test' + WKDIR: '/var/lib/jenkins/workspace' + WK: '/var/lib/jenkins/workspace/TDinternal' + WKC: '/var/lib/jenkins/workspace/TDinternal/community' + +jobs: + fetch-parameters: + runs-on: + group: CI + labels: [self-hosted, Linux, X64, testing] + outputs: + tdinternal: ${{ steps.parameters.outputs.tdinternal }} + run_function_test: ${{ steps.parameters.outputs.run_function_test }} + run_tdgpt_test: ${{ steps.parameters.outputs.run_tdgpt_test }} + source_branch: ${{ steps.parameters.outputs.source_branch }} + target_branch: ${{ steps.parameters.outputs.target_branch }} + pr_number: ${{ steps.parameters.outputs.pr_number }} + steps: + - name: Determine trigger source and fetch parameters + id: parameters + run: | + set -euo pipefail + # check the trigger source and get branch information + if [ "${{ github.event_name }}" == "repository_dispatch" ]; then + tdinternal="true" + source_branch=${{ github.event.client_payload.tdinternal_source_branch }} + target_branch=${{ github.event.client_payload.tdinternal_target_branch }} + pr_number=${{ github.event.client_payload.tdinternal_pr_number }} + run_tdgpt_test="true" + run_function_test="true" + else + tdinternal="false" + source_branch=${{ github.event.pull_request.head.ref }} + target_branch=${{ github.event.pull_request.base.ref }} + pr_number=${{ github.event.pull_request.number }} + + # check whether to run tdgpt test cases + cd ${{ env.WKC }} + changed_files_non_doc=$(git --no-pager diff --name-only FETCH_HEAD `git merge-base FETCH_HEAD $target_branch`|grep -v "^docs/en/"|grep -v "^docs/zh/"|grep -v ".md$" | tr '\n' ' ' || :) + + if [[ "$changed_files_non_doc" != '' && "$changed_files_non_doc" =~ /forecastoperator.c|anomalywindowoperator.c|tanalytics.h|tanalytics.c|tdgpt_cases.task|analytics/ ]]; then + run_tdgpt_test="true" + else + run_tdgpt_test="false" + fi + + # check whether to run function test cases + changed_files_non_tdgpt=$(git --no-pager diff --name-only FETCH_HEAD `git merge-base FETCH_HEAD $target_branch`|grep -v "^docs/en/"|grep -v "^docs/zh/"|grep -v ".md$" | grep -Ev "forecastoperator.c|anomalywindowoperator.c|tanalytics.h|tanalytics.c|tdgpt_cases.task|analytics" | tr '\n' ' ' ||:) + if [ $changed_files_non_tdgpt != '' ]; then + run_function_test="true" + else + run_function_test="false" + fi + fi + + echo "tdinternal=$tdinternal" >> $GITHUB_OUTPUT + echo "run_function_test=$run_function_test" >> $GITHUB_OUTPUT + echo "run_tdgpt_test=$run_tdgpt_test" >> $GITHUB_OUTPUT + echo "source_branch=$source_branch" >> $GITHUB_OUTPUT + echo "target_branch=$target_branch" >> $GITHUB_OUTPUT + echo "pr_number=$pr_number" >> $GITHUB_OUTPUT + + run-tests-on-linux: + needs: fetch-parameters + runs-on: + group: CI + labels: [self-hosted, Linux, X64, testing] + timeout-minutes: 200 + env: + IS_TDINTERNAL: ${{ needs.fetch-parameters.outputs.tdinternal }} + RUN_RUNCTION_TEST: ${{ needs.fetch-parameters.outputs.run_function_test }} + RUN_TDGPT_TEST: ${{ needs.fetch-parameters.outputs.run_tdgpt_tests }} + SOURCE_BRANCH: ${{ needs.fetch-parameters.outputs.source_branch }} + TARGET_BRANCH: ${{ needs.fetch-parameters.outputs.target_branch }} + PR_NUMBER: ${{ needs.fetch-parameters.outputs.pr_number }} + steps: + - name: Output the environment information + run: | + echo "::group::Environment Info" + date + hostname + env + echo "Runner: ${{ runner.name }}" + echo "Trigger Source from TDinternal: ${{ env.IS_TDINTERNAL }}" + echo "Workspace: ${{ env.WKDIR }}" + git --version + echo "${{ env.WKDIR }}/restore.sh -p ${{ env.PR_NUMBER }} -n ${{ github.run_number }} -c ${{ env.CONTAINER_NAME }}" + echo "::endgroup::" + + - name: Prepare repositories + run: | + set -euo pipefail + prepare_environment() { + cd "$1" + git reset --hard + git clean -f + git remote prune origin + git fetch + git checkout "$2" + } + prepare_environment "${{ env.WK }}" "${{ env.TARGET_BRANCH }}" + prepare_environment "${{ env.WKC }}" "${{ env.TARGET_BRANCH }}" + + - name: Get latest codes and logs for TDinternal PR + if: ${{ env.IS_TDINTERNAL == 'true' }} + run: | + cd ${{ env.WK }} + git pull >/dev/null + git log -5 + echo "`date "+%Y%m%d-%H%M%S"` TDinternalTest/${{ env.PR_NUMBER }}:${{ github.run_number }}:${{ env.TARGET_BRANCH }}" >>${{ env.WKDIR }}/jenkins.log + echo "CHANGE_BRANCH:${{ env.SOURCE_BRANCH }}" >>${{ env.WKDIR }}/jenkins.log + echo "TDinternal log: `git log -5`" >>${{ env.WKDIR }}/jenkins.log + git fetch origin +refs/pull/${{ env.PR_NUMBER }}/merge + git checkout -qf FETCH_HEAD + git log -5 + echo "TDinternal log merged: `git log -5`" >>${{ env.WKDIR }}/jenkins.log + cd ${{ env.WKC }} + git remote prune origin + git pull >/dev/null + git log -5 + echo "community log: `git log -5`" >>${{ env.WKDIR }}/jenkins.log + - name: Get latest codes and logs for TDengine PR + if: ${{ env.IS_TDINTERNAL == 'false' }} + run: | + cd ${{ env.WKC }} + git remote prune origin + git pull >/dev/null + git log -5 + echo "`date "+%Y%m%d-%H%M%S"` TDengineTest/${{ env.PR_NUMBER }}:${{ github.run_number }}:${{ env.TARGET_BRANCH }}" >>${{ env.WKDIR }}/jenkins.log + echo "CHANGE_BRANCH:${{ env.SOURCE_BRANCH }}" >>${{ env.WKDIR }}/jenkins.log + echo "community log: `git log -5`" >>${{ env.WKDIR }}/jenkins.log + git fetch origin +refs/pull/${{ env.PR_NUMBER }}/merge + git checkout -qf FETCH_HEAD + git log -5 + echo "community log merged: `git log -5`" >>${{ env.WKDIR }}/jenkins.log + cd ${{ env.WK }} + git pull >/dev/null + git log -5 + echo "TDinternal log: `git log -5`" >>${{ env.WKDIR }}/jenkins.log + - name: Update submodule + run: | + cd ${{ env.WKC }} + git submodule update --init --recursive + - name: Output the 'file_no_doc_changed' information to the file + if: ${{ env.IS_TDINTERNAL == 'false' }} + run: | + mkdir -p ${{ env.WKDIR }}/tmp/${{ env.PR_NUMBER }}_${{ github.run_number }} + changed_files_non_doc=$(git --no-pager diff --name-only FETCH_HEAD `git merge-base FETCH_HEAD ${{ env.TARGET_BRANCH }}`|grep -v "^docs/en/"|grep -v "^docs/zh/"|grep -v ".md$" | tr '\n' ' ' || :) + echo $changed_files_non_doc > ${{ env.WKDIR }}/tmp/${{ env.PR_NUMBER }}_${{ github.run_number }}/docs_changed.txt + - name: Check assert testing + run: | + cd ${{ env.WKC }}/tests/parallel_test + ./run_check_assert_container.sh -d ${{ env.WKDIR }} + - name: Check void function testing + run: | + cd ${{ env.WKC }}/tests/parallel_test + ./run_check_void_container.sh -d ${{ env.WKDIR }} + - name: Build docker container + run: | + date + rm -rf ${{ env.WKC }}/debug + cd ${{ env.WKC }}/tests/parallel_test + time ./container_build.sh -w ${{ env.WKDIR }} -e + - name: Get parameters for testing + id: get_param + run: | + log_server_file="/home/log_server.json" + timeout_cmd="" + extra_param="" + + if [ -f "$log_server_file" ]; then + log_server_enabled=$(jq '.enabled' "$log_server_file") + timeout_param=$(jq '.timeout' "$log_server_file") + if [ "$timeout_param" != "null" ] && [ "$timeout_param" != "0" ]; then + timeout_cmd="timeout $timeout_param" + fi + + if [ "$log_server_enabled" == "1" ]; then + log_server=$(jq '.server' "$log_server_file" | sed 's/\\\"//g') + if [ "$log_server" != "null" ] && [ "$log_server" != "" ]; then + extra_param="-w $log_server" + fi + fi + fi + echo "timeout_cmd=$timeout_cmd" >> $GITHUB_OUTPUT + echo "extra_param=$extra_param" >> $GITHUB_OUTPUT + - name: Run function returns with a null pointer scan testing + run: | + cd ${{ env.WKC }}/tests/parallel_test + ./run_scan_container.sh -d ${{ env.WKDIR }} -b ${{ env.PR_NUMBER }}_${{ github.run_number }} -f ${{ env.WKDIR }}/tmp/${{ env.PR_NUMBER }}_${{ github.run_number }}/docs_changed.txt ${{ steps.get_param.outputs.extra_param }} + - name: Run tdgpt test cases + if: ${{ env.IS_TDINTERNAL }} == 'false' && ${{ env.RUN_TDGPT_TEST }} == 'true' + run: | + cd ${{ env.WKC }}/tests/parallel_test + export DEFAULT_RETRY_TIME=2 + date + timeout 600 time ./run.sh -e -m /home/m.json -t tdgpt_cases.task -b ${{ env.PR_NUMBER }}_${{ github.run_number }} -l ${{ env.WKDIR }}/log -o 300 ${{ steps.get_param.outputs.extra_param }} + - name: Run function test cases + if: ${{ env.RUN_RUNCTION_TEST }} == 'true' + run: | + cd ${{ env.WKC }}/tests/parallel_test + export DEFAULT_RETRY_TIME=2 + date + ${{ steps.get_param.outputs.timeout_cmd }} time ./run.sh -e -m /home/m.json -t cases.task -b ${{ env.PR_NUMBER }}_${{ github.run_number }} -l ${{ env.WKDIR }}/log -o 1200 ${{ steps.get_param.outputs.extra_param }} + + run-tests-on-mac: + needs: fetch-parameters + if: ${{ needs.fetch-parameters.outputs.run_function_test == 'false' }} + runs-on: + group: CI + labels: [self-hosted, macOS, ARM64, testing] + timeout-minutes: 60 + env: + IS_TDINTERNAL: ${{ needs.fetch-parameters.outputs.tdinternal }} + SOURCE_BRANCH: ${{ needs.fetch-parameters.outputs.source_branch }} + TARGET_BRANCH: ${{ needs.fetch-parameters.outputs.target_branch }} + PR_NUMBER: ${{ needs.fetch-parameters.outputs.pr_number }} + steps: + - name: Output the environment information + run: | + echo "::group::Environment Info" + date + hostname + env + echo "Runner: ${{ runner.name }}" + echo "Trigger Source from TDinternal: ${{ env.IS_TDINTERNAL }}" + echo "Workspace: ${{ env.WKDIR }}" + git --version + echo "${{ env.WKDIR }}/restore.sh -p ${{ env.PR_NUMBER }} -n ${{ github.run_number }} -c ${{ env.CONTAINER_NAME }}" + echo "::endgroup::" + - name: Prepare repositories + run: | + set -euo pipefail + prepare_environment() { + cd "$1" + git reset --hard + git clean -f + git remote prune origin + git fetch + git checkout "$2" + } + prepare_environment "${{ env.WK }}" "${{ env.TARGET_BRANCH }}" + prepare_environment "${{ env.WKC }}" "${{ env.TARGET_BRANCH }}" + - name: Get latest codes and logs for TDinternal PR + if: ${{ env.IS_TDINTERNAL == 'true' }} + run: | + cd ${{ env.WK }} + git pull >/dev/null + git log -5 + echo "`date "+%Y%m%d-%H%M%S"` TDinternalTest/${{ env.PR_NUMBER }}:${{ github.run_number }}:${{ env.TARGET_BRANCH }}" >>${{ env.WKDIR }}/jenkins.log + echo "CHANGE_BRANCH:${{ env.SOURCE_BRANCH }}" >>${{ env.WKDIR }}/jenkins.log + echo "TDinternal log: `git log -5`" >>${{ env.WKDIR }}/jenkins.log + git fetch origin +refs/pull/${{ env.PR_NUMBER }}/merge + git checkout -qf FETCH_HEAD + git log -5 + echo "TDinternal log merged: `git log -5`" >>${{ env.WKDIR }}/jenkins.log + cd ${{ env.WKC }} + git remote prune origin + git pull >/dev/null + git log -5 + echo "community log: `git log -5`" >>${{ env.WKDIR }}/jenkins.log + - name: Get latest codes and logs for TDengine PR + if: ${{ env.IS_TDINTERNAL == 'false' }} + run: | + cd ${{ env.WKC }} + git remote prune origin + git pull >/dev/null + git log -5 + echo "`date "+%Y%m%d-%H%M%S"` TDengineTest/${{ env.PR_NUMBER }}:${{ github.run_number }}:${{ env.TARGET_BRANCH }}" >>${{ env.WKDIR }}/jenkins.log + echo "CHANGE_BRANCH:${{ env.SOURCE_BRANCH }}" >>${{ env.WKDIR }}/jenkins.log + echo "community log: `git log -5`" >>${{ env.WKDIR }}/jenkins.log + git fetch origin +refs/pull/${{ env.PR_NUMBER }}/merge + git checkout -qf FETCH_HEAD + git log -5 + echo "community log merged: `git log -5`" >>${{ env.WKDIR }}/jenkins.log + cd ${{ env.WK }} + git pull >/dev/null + git log -5 + echo "TDinternal log: `git log -5`" >>${{ env.WKDIR }}/jenkins.log + - name: Update submodule + run: | + cd ${{ env.WKC }} + git submodule update --init --recursive + - name: Run tests + run: | + date + cd ${{ env.WK }} + rm -rf debug + mkdir debug + cd ${{ env.WK }}/debug + echo $PATH + echo "PATH=/opt/homebrew/bin:$PATH" >> $GITHUB_ENV + cmake .. -DBUILD_TEST=true -DBUILD_HTTPS=false -DCMAKE_BUILD_TYPE=Release + make -j10 + ctest -j10 || exit 7 + date diff --git a/.github/workflows/taosd-doc-build.yml b/.github/workflows/taosd-doc-build.yml index 52a35fc3d5..1dffaa0aa7 100644 --- a/.github/workflows/taosd-doc-build.yml +++ b/.github/workflows/taosd-doc-build.yml @@ -1,17 +1,14 @@ name: TDengine Doc Build on: - workflow_call: - inputs: - target_branch: - description: "Target branch name of for building the document" - required: true - type: string - - target_pr_number: - description: "PR number of target branch to merge for building the document" - required: true - type: string + pull_request: + branches: + - 'main' + - '3.0' + - '3.1' + paths: + - 'docs/**' + - '*.md' env: DOC_WKC: "/root/doc_ci_work" @@ -21,81 +18,32 @@ env: TOOLS_REPO: "taos-tools" jobs: - check: - runs-on: - group: CI - labels: [self-hosted, doc-build] - outputs: - changed_files_zh: ${{ steps.set_output.outputs.changed_files_zh }} - changed_files_en: ${{ steps.set_output.outputs.changed_files_en }} - changed_files_non_doc: ${{ steps.set_output.outputs.changed_files_non_doc }} - changed_files_non_tdgpt: ${{ steps.set_output.outputs.changed_files_non_tdgpt }} - steps: - - name: Get the latest document contents from the repository - run: | - set -e - # ./.github/scripts/update_repo.sh ${{ env.DOC_WKC }}/${{ env.TD_REPO }} ${{ inputs.target_branch }} ${{ inputs.target_pr_number }} - cd ${{ env.DOC_WKC }}/${{ env.TD_REPO }} - git reset --hard - git clean -f - git remote prune origin - git fetch - git checkout ${{ inputs.target_branch }} - git pull >/dev/null - git fetch origin +refs/pull/${{ inputs.target_pr_number }}/merge - git checkout -qf FETCH_HEAD - - name: Check whether the document is changed and set output variables - id: set_output - run: | - set -e - cd ${{ env.DOC_WKC }}/${{ env.TD_REPO }} - changed_files_zh=$(git --no-pager diff --name-only FETCH_HEAD `git merge-base FETCH_HEAD ${{ inputs.tartget_branch }}`| grep "^docs/zh/" | tr '\n' ' ' || :) - changed_files_en=$(git --no-pager diff --name-only FETCH_HEAD `git merge-base FETCH_HEAD ${{ inputs.tartget_branch }}`| grep "^docs/en/" | tr '\n' ' ' || :) - changed_files_non_doc=$(git --no-pager diff --name-only FETCH_HEAD `git merge-base FETCH_HEAD ${{ inputs.tartget_branch }}`|grep -v "^docs/en/"|grep -v "^docs/zh/"|grep -v ".md$" | tr '\n' ' ' || :) - changed_files_non_tdgpt=$(git --no-pager diff --name-only FETCH_HEAD `git merge-base FETCH_HEAD ${{ inputs.tartget_branch }}`|grep -v "^docs/en/"|grep -v "^docs/zh/"|grep -v ".md$" | grep -Ev "forecastoperator.c|anomalywindowoperator.c|tanalytics.h|tanalytics.c|tdgpt_cases.task|analytics" | tr '\n' ' ' ||:) - echo "changed_files_zh=${changed_files_zh}" >> $GITHUB_OUTPUT - echo "changed_files_en=${changed_files_en}" >> $GITHUB_OUTPUT - echo "changed_files_non_doc=${changed_files_non_doc}" >> $GITHUB_OUTPUT - echo "changed_files_non_tdgpt=${changed_files_non_tdgpt}" >> $GITHUB_OUTPUT - - build: - needs: check + build-doc: runs-on: group: CI labels: [self-hosted, doc-build] - if: ${{ needs.check.outputs.changed_files_zh != '' || needs.check.outputs.changed_files_en != '' }} - steps: - name: Get the latest document contents run: | set -e - #./.github/scripts/update_repo.sh ${{ env.DOC_WKC }}/${{ env.TD_REPO }} ${{ inputs.target_branch }} ${{ inputs.target_pr_number }} cd ${{ env.DOC_WKC }}/${{ env.TD_REPO }} git reset --hard git clean -f git remote prune origin git fetch - git checkout ${{ inputs.target_branch }} + git checkout ${{ github.event.pull_request.base.ref }} git pull >/dev/null - git fetch origin +refs/pull/${{ inputs.target_pr_number }}/merge + git fetch origin +refs/pull/${{ github.event.pull_request.number }}/merge git checkout -qf FETCH_HEAD - name: Build the chinese document - if: ${{ needs.check.outputs.changed_files_zh != '' }} run: | cd ${{ env.DOC_WKC }}/${{ env.ZH_DOC_REPO }} yarn ass local yarn build - name: Build the english document - if: ${{ needs.check.outputs.changed_files_en != '' }} run: | cd ${{ env.DOC_WKC }}/${{ env.EN_DOC_REPO }} yarn ass local yarn build - - outputs: - changed_files_zh: ${{ needs.check.outputs.changed_files_zh }} - changed_files_en: ${{ needs.check.outputs.changed_files_en }} - changed_files_non_doc: ${{ needs.check.outputs.changed_files_non_doc }} - changed_files_non_tdgpt: ${{ needs.check.outputs.changed_files_non_tdgpt }} From 5f6ecab85443cd161ea13d39fc00911e0c6144ad Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 28 Feb 2025 15:05:00 +0800 Subject: [PATCH 20/21] fix(stream): check the return values. --- source/dnode/vnode/src/tq/tqStreamTask.c | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index c8862b03ad..7c65f805e9 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -322,7 +322,10 @@ bool taskReadyForDataFromWal(SStreamTask* pTask) { // check whether input queue is full or not if (streamQueueIsFull(pTask->inputq.queue)) { tqTrace("s-task:%s input queue is full, launch task without scanning wal", pTask->id.idStr); - streamTrySchedExec(pTask); + int32_t code = streamTrySchedExec(pTask); + if (code) { + tqError("s-task:%s failed to start task while inputQ is full", pTask->id.idStr); + } return false; } From d7560846b168036512a23dcd8621bef7f0b966d1 Mon Sep 17 00:00:00 2001 From: WANG Xu Date: Fri, 28 Feb 2025 18:07:10 +0800 Subject: [PATCH 21/21] Update .github/workflows/taosd-ci.yml --- .github/workflows/taosd-ci.yml | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/.github/workflows/taosd-ci.yml b/.github/workflows/taosd-ci.yml index 022e4bcab2..5be958adf7 100644 --- a/.github/workflows/taosd-ci.yml +++ b/.github/workflows/taosd-ci.yml @@ -1,5 +1,4 @@ - -name: TDengine CI Pipeline +name: TDengine CI Test on: pull_request: