Merge pull request #29219 from taosdata/fix/job.remain2

fix: job remained issue cause of task dropped during pre-process phase
This commit is contained in:
Shengliang Guan 2024-12-20 09:03:36 +08:00 committed by GitHub
commit 1ce6f4a383
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 32 additions and 7 deletions

View File

@ -11,6 +11,6 @@ target_link_libraries(
PRIVATE os util transport qcom nodes PRIVATE os util transport qcom nodes
) )
# if(${BUILD_TEST}) #if(${BUILD_TEST})
# ADD_SUBDIRECTORY(test) # ADD_SUBDIRECTORY(test)
# endif(${BUILD_TEST}) #endif(${BUILD_TEST})

View File

@ -162,7 +162,7 @@ void ctgTestInitLogFile() {
(void)ctgdEnableDebug("cache", true); (void)ctgdEnableDebug("cache", true);
(void)ctgdEnableDebug("lock", true); (void)ctgdEnableDebug("lock", true);
if (taosInitLog(defaultLogFileNamePrefix, maxLogFileNum) < 0) { if (taosInitLog(defaultLogFileNamePrefix, 1, false) < 0) {
(void)printf("failed to open log file in directory:%s\n", tsLogDir); (void)printf("failed to open log file in directory:%s\n", tsLogDir);
ASSERT(0); ASSERT(0);
} }

View File

@ -258,7 +258,7 @@ int32_t qwAddTaskCtxImpl(QW_FPARAMS_DEF, bool acquire, SQWTaskCtx **ctx) {
} }
} }
atomic_add_fetch_64(&gQueryMgmt.stat.taskInitNum, 1); (void)atomic_add_fetch_64(&gQueryMgmt.stat.taskInitNum, 1);
if (acquire && ctx) { if (acquire && ctx) {
QW_RET(qwAcquireTaskCtx(QW_FPARAMS(), ctx)); QW_RET(qwAcquireTaskCtx(QW_FPARAMS(), ctx));
@ -283,7 +283,7 @@ void qwFreeTaskHandle(SQWTaskCtx *ctx) {
qDestroyTask(otaskHandle); qDestroyTask(otaskHandle);
taosDisableMemPoolUsage(); taosDisableMemPoolUsage();
atomic_add_fetch_64(&gQueryMgmt.stat.taskExecDestroyNum, 1); (void)atomic_add_fetch_64(&gQueryMgmt.stat.taskExecDestroyNum, 1);
qDebug("task handle destroyed"); qDebug("task handle destroyed");
} }
@ -297,7 +297,7 @@ void qwFreeSinkHandle(SQWTaskCtx *ctx) {
dsDestroyDataSinker(osinkHandle); dsDestroyDataSinker(osinkHandle);
QW_SINK_DISABLE_MEMPOOL(); QW_SINK_DISABLE_MEMPOOL();
atomic_add_fetch_64(&gQueryMgmt.stat.taskSinkDestroyNum, 1); (void)atomic_add_fetch_64(&gQueryMgmt.stat.taskSinkDestroyNum, 1);
qDebug("sink handle destroyed"); qDebug("sink handle destroyed");
} }
@ -409,6 +409,8 @@ int32_t qwDropTaskCtx(QW_FPARAMS_DEF) {
if (ctx->pJobInfo && TSDB_CODE_SUCCESS != ctx->pJobInfo->errCode) { if (ctx->pJobInfo && TSDB_CODE_SUCCESS != ctx->pJobInfo->errCode) {
QW_UPDATE_RSP_CODE(ctx, ctx->pJobInfo->errCode); QW_UPDATE_RSP_CODE(ctx, ctx->pJobInfo->errCode);
} else {
QW_UPDATE_RSP_CODE(ctx, TSDB_CODE_TSC_QUERY_CANCELLED);
} }
atomic_store_ptr(&ctx->taskHandle, NULL); atomic_store_ptr(&ctx->taskHandle, NULL);
@ -428,7 +430,7 @@ int32_t qwDropTaskCtx(QW_FPARAMS_DEF) {
QW_TASK_DLOG_E("task ctx dropped"); QW_TASK_DLOG_E("task ctx dropped");
atomic_add_fetch_64(&gQueryMgmt.stat.taskDestroyNum, 1); (void)atomic_add_fetch_64(&gQueryMgmt.stat.taskDestroyNum, 1);
return code; return code;
} }

View File

@ -763,6 +763,11 @@ int32_t qwPreprocessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
QW_LOCK(QW_WRITE, &ctx->lock); QW_LOCK(QW_WRITE, &ctx->lock);
if (QW_EVENT_PROCESSED(ctx, QW_EVENT_DROP) || QW_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
QW_TASK_WLOG("task dropping or already dropped, drop event:%d", QW_GET_EVENT(ctx, QW_EVENT_DROP));
QW_ERR_JRET(ctx->rspCode);
}
ctx->ctrlConnInfo = qwMsg->connInfo; ctx->ctrlConnInfo = qwMsg->connInfo;
ctx->sId = sId; ctx->sId = sId;
ctx->phase = -1; ctx->phase = -1;

18
tests/script/api/test.sh Executable file
View File

@ -0,0 +1,18 @@
#!/bin/bash
make clean
make
pgrep taosd || taosd >> /dev/null 2>&1 &
sleep 10
./dbTableRoute localhost
./batchprepare localhost
./stmt-crash localhost
./insertSameTs localhost
./passwdTest localhost
./whiteListTest localhost
./tmqViewTest