diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c index 3cf0382eba..20618dbdf3 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c @@ -213,13 +213,13 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode, bool commitAndRemoveWal) taosQueueGetThreadId(pVnode->pApplyW.queue)); tMultiWorkerCleanup(&pVnode->pApplyW); - dInfo("vgId:%d, wait for vnode query queue:%p is empty", pVnode->vgId, pVnode->pQueryQ); - while (!taosQueueEmpty(pVnode->pQueryQ)) taosMsleep(10); - dInfo("vgId:%d, wait for vnode fetch queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pFetchQ, taosQueueGetThreadId(pVnode->pFetchQ)); while (!taosQueueEmpty(pVnode->pFetchQ)) taosMsleep(10); + dInfo("vgId:%d, wait for vnode query queue:%p is empty", pVnode->vgId, pVnode->pQueryQ); + while (!taosQueueEmpty(pVnode->pQueryQ)) taosMsleep(10); + tqNotifyClose(pVnode->pImpl->pTq); dInfo("vgId:%d, wait for vnode stream queue:%p is empty", pVnode->vgId, pVnode->pStreamQ); while (!taosQueueEmpty(pVnode->pStreamQ)) taosMsleep(10); diff --git a/source/dnode/vnode/src/meta/metaOpen.c b/source/dnode/vnode/src/meta/metaOpen.c index 8f2c0b5a5e..659ba3f777 100644 --- a/source/dnode/vnode/src/meta/metaOpen.c +++ b/source/dnode/vnode/src/meta/metaOpen.c @@ -496,6 +496,7 @@ void metaULock(SMeta *pMeta) { static void metaCleanup(SMeta **ppMeta) { SMeta *pMeta = *ppMeta; if (pMeta) { + metaInfo("vgId:%d meta clean up, path:%s", TD_VID(pMeta->pVnode), pMeta->path); if (pMeta->pEnv) metaAbort(pMeta); if (pMeta->pCache) metaCacheClose(pMeta); #ifdef BUILD_NO_CALL diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index 3c50e91275..5583e464ed 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -3064,9 +3064,8 @@ static int32_t nextRowIterGet(CacheNextRowIter *pIter, TSDBROW **ppRow, bool *pI iMax[nMax] = i; max[nMax++] = pIter->input[i].pRow; - } else { - pIter->input[i].next = false; } + pIter->input[i].next = false; } } diff --git a/source/libs/qworker/src/qwMsg.c b/source/libs/qworker/src/qwMsg.c index 0011d1c70c..69014d5b1c 100644 --- a/source/libs/qworker/src/qwMsg.c +++ b/source/libs/qworker/src/qwMsg.c @@ -502,6 +502,10 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int } int32_t qWorkerProcessCQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts) { + if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) { + QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); + } + int32_t code = 0; int8_t status = 0; bool queryDone = false; diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 7180c58404..ddc4812b55 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -329,7 +329,7 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, if (len < 0) { QW_TASK_ELOG("invalid length from dsGetDataLength, length:%" PRId64 "", len); - QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); + QW_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT); } if (len == 0) { @@ -337,18 +337,18 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, code = dsGetDataBlock(ctx->sinkHandle, &output); if (code) { QW_TASK_ELOG("dsGetDataBlock failed, code:%x - %s", code, tstrerror(code)); - QW_ERR_RET(code); + QW_ERR_JRET(code); } QW_TASK_DLOG("no more data in sink and query end, fetched blocks %d rows %" PRId64, pOutput->numOfBlocks, pOutput->numOfRows); if (!ctx->dynamicTask) { - QW_ERR_RET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCC, ctx->dynamicTask)); + QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCC, ctx->dynamicTask)); } if (NULL == pRsp) { - QW_ERR_RET(qwMallocFetchRsp(!ctx->localExec, len, &pRsp)); + QW_ERR_JRET(qwMallocFetchRsp(!ctx->localExec, len, &pRsp)); *pOutput = output; } else { pOutput->queryEnd = output.queryEnd; @@ -368,7 +368,7 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, *dataLen += len + PAYLOAD_PREFIX_LEN; *pRawDataLen += rawLen + PAYLOAD_PREFIX_LEN; - QW_ERR_RET(qwMallocFetchRsp(!ctx->localExec, *dataLen, &pRsp)); + QW_ERR_JRET(qwMallocFetchRsp(!ctx->localExec, *dataLen, &pRsp)); // set the serialize start position output.pData = pRsp->data + *dataLen - (len + PAYLOAD_PREFIX_LEN); @@ -380,7 +380,7 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, code = dsGetDataBlock(ctx->sinkHandle, &output); if (code) { QW_TASK_ELOG("dsGetDataBlock failed, code:%x - %s", code, tstrerror(code)); - QW_ERR_RET(code); + QW_ERR_JRET(code); } pOutput->queryEnd = output.queryEnd; @@ -399,7 +399,7 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, if (DS_BUF_EMPTY == pOutput->bufStatus && pOutput->queryEnd) { QW_TASK_DLOG("task all data fetched and done, fetched blocks %d rows %" PRId64, pOutput->numOfBlocks, pOutput->numOfRows); - QW_ERR_RET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCC, ctx->dynamicTask)); + QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCC, ctx->dynamicTask)); break; } @@ -416,8 +416,11 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, } } +_return: + *rspMsg = pRsp; - return TSDB_CODE_SUCCESS; + + return code; } int32_t qwGetDeleteResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, SDeleteRes *pRes) { @@ -472,6 +475,12 @@ int32_t qwQuickRspFetchReq(QW_FPARAMS_DEF, SQWTaskCtx *ctx, SQWMsg *qwMsg, int32 code = qwGetQueryResFromSink(QW_FPARAMS(), ctx, &dataLen, &rawLen, &rsp, &sOutput); } + if (code) { + qwFreeFetchRsp(rsp); + rsp = NULL; + dataLen = 0; + } + if (NULL == rsp && TSDB_CODE_SUCCESS == code) { return TSDB_CODE_SUCCESS; } @@ -877,10 +886,11 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { break; } + qwFreeFetchRsp(rsp); + rsp = NULL; + if (code && QW_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) { QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH); - qwFreeFetchRsp(rsp); - rsp = NULL; qwMsg->connInfo = ctx->dataConnInfo; code = qwBuildAndSendFetchRsp(ctx->fetchMsgType + 1, &qwMsg->connInfo, NULL, 0, code); @@ -1432,6 +1442,8 @@ void qWorkerDestroy(void **qWorkerMgmt) { while (0 == destroyed) { taosMsleep(2); } + + *qWorkerMgmt = NULL; } int32_t qWorkerGetStat(SReadHandle *handle, void *qWorkerMgmt, SQWorkerStat *pStat) { diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index cfe88138ef..09216add82 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -223,6 +223,7 @@ ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/agg_group_NotReturnValue.py -Q 2 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/agg_group_NotReturnValue.py -Q 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/agg_group_NotReturnValue.py -Q 4 +,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/td-32548.py ,,y,system-test,./pytest.sh python3 ./test.py -f 3-enterprise/restore/restoreDnode.py -N 5 -M 3 -i False ,,y,system-test,./pytest.sh python3 ./test.py -f 3-enterprise/restore/restoreVnode.py -N 5 -M 3 -i False @@ -1254,7 +1255,7 @@ ,,y,script,./test.sh -f tsim/query/unionall_as_table.sim ,,y,script,./test.sh -f tsim/query/multi_order_by.sim ,,y,script,./test.sh -f tsim/query/sys_tbname.sim -,,y,script,./test.sh -f tsim/query/sort-pre-cols.sim +,,y,script,./test.sh -f tsim/query/sort-pre-cols.sim ,,y,script,./test.sh -f tsim/query/groupby.sim ,,y,script,./test.sh -f tsim/query/groupby_distinct.sim ,,y,script,./test.sh -f tsim/query/event.sim @@ -1262,7 +1263,7 @@ ,,y,script,./test.sh -f tsim/query/emptyTsRange.sim ,,y,script,./test.sh -f tsim/query/emptyTsRange_scl.sim ,,y,script,./test.sh -f tsim/query/partitionby.sim -,,y,script,./test.sh -f tsim/query/tableCount.sim +,,y,script,./test.sh -f tsim/query/tableCount.sim ,,y,script,./test.sh -f tsim/query/show_db_table_kind.sim ,,y,script,./test.sh -f tsim/query/bi_star_table.sim ,,y,script,./test.sh -f tsim/query/bi_tag_scan.sim @@ -1532,8 +1533,8 @@ ,,n,script,./test.sh -f tsim/tagindex/sma_and_tag_index.sim ,,y,script,./test.sh -f tsim/tagindex/indexOverflow.sim ,,y,script,./test.sh -f tsim/view/view.sim -,,y,script,./test.sh -f tsim/query/cache_last.sim -,,y,script,./test.sh -f tsim/query/const.sim +,,y,script,./test.sh -f tsim/query/cache_last.sim +,,y,script,./test.sh -f tsim/query/const.sim ,,y,script,./test.sh -f tsim/query/nestedJoinView.sim @@ -1566,4 +1567,3 @@ ,,n,docs-examples-test,bash rust.sh ,,n,docs-examples-test,bash go.sh ,,n,docs-examples-test,bash test_R.sh - diff --git a/tests/system-test/2-query/td-32548.py b/tests/system-test/2-query/td-32548.py new file mode 100644 index 0000000000..45611b8372 --- /dev/null +++ b/tests/system-test/2-query/td-32548.py @@ -0,0 +1,32 @@ +from util.cases import * +from util.sql import * + +class TDTestCase: + def init(self, conn, logSql, replicaVar=1): + tdLog.debug("start to execute %s" % __file__) + tdSql.init(conn.cursor(), True) + + tdSql.execute("drop database if exists td_32548;") + tdSql.execute("create database td_32548 cachemodel 'last_row' keep 3650,3650,3650;") + + def run(self): + tdSql.execute("use td_32548;") + + tdSql.execute("create table ntb1 (ts timestamp, ival int);") + tdSql.execute("insert into ntb1 values ('2024-07-08 17:54:49.675', 54);") + + tdSql.execute("flush database td_32548;") + + tdSql.execute("insert into ntb1 values ('2024-07-08 17:53:49.675', 53);") + tdSql.execute("insert into ntb1 values ('2024-07-08 17:52:49.675', 52);") + tdSql.execute("delete from ntb1 where ts = '2024-07-08 17:54:49.675';") + + tdSql.query('select last_row(ts) from ntb1;') + tdSql.checkData(0, 0, '2024-07-08 17:53:49.675') + + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase())