Merge remote-tracking branch 'origin/main' into merge/3340

This commit is contained in:
Shengliang Guan 2024-10-24 10:16:26 +08:00
commit f59f3901c0
7 changed files with 68 additions and 20 deletions

View File

@ -213,13 +213,13 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode, bool commitAndRemoveWal)
taosQueueGetThreadId(pVnode->pApplyW.queue));
tMultiWorkerCleanup(&pVnode->pApplyW);
dInfo("vgId:%d, wait for vnode query queue:%p is empty", pVnode->vgId, pVnode->pQueryQ);
while (!taosQueueEmpty(pVnode->pQueryQ)) taosMsleep(10);
dInfo("vgId:%d, wait for vnode fetch queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pFetchQ,
taosQueueGetThreadId(pVnode->pFetchQ));
while (!taosQueueEmpty(pVnode->pFetchQ)) taosMsleep(10);
dInfo("vgId:%d, wait for vnode query queue:%p is empty", pVnode->vgId, pVnode->pQueryQ);
while (!taosQueueEmpty(pVnode->pQueryQ)) taosMsleep(10);
tqNotifyClose(pVnode->pImpl->pTq);
dInfo("vgId:%d, wait for vnode stream queue:%p is empty", pVnode->vgId, pVnode->pStreamQ);
while (!taosQueueEmpty(pVnode->pStreamQ)) taosMsleep(10);

View File

@ -496,6 +496,7 @@ void metaULock(SMeta *pMeta) {
static void metaCleanup(SMeta **ppMeta) {
SMeta *pMeta = *ppMeta;
if (pMeta) {
metaInfo("vgId:%d meta clean up, path:%s", TD_VID(pMeta->pVnode), pMeta->path);
if (pMeta->pEnv) metaAbort(pMeta);
if (pMeta->pCache) metaCacheClose(pMeta);
#ifdef BUILD_NO_CALL

View File

@ -3064,9 +3064,8 @@ static int32_t nextRowIterGet(CacheNextRowIter *pIter, TSDBROW **ppRow, bool *pI
iMax[nMax] = i;
max[nMax++] = pIter->input[i].pRow;
} else {
pIter->input[i].next = false;
}
pIter->input[i].next = false;
}
}

View File

@ -502,6 +502,10 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int
}
int32_t qWorkerProcessCQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts) {
if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
int32_t code = 0;
int8_t status = 0;
bool queryDone = false;

View File

@ -329,7 +329,7 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen,
if (len < 0) {
QW_TASK_ELOG("invalid length from dsGetDataLength, length:%" PRId64 "", len);
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
QW_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
}
if (len == 0) {
@ -337,18 +337,18 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen,
code = dsGetDataBlock(ctx->sinkHandle, &output);
if (code) {
QW_TASK_ELOG("dsGetDataBlock failed, code:%x - %s", code, tstrerror(code));
QW_ERR_RET(code);
QW_ERR_JRET(code);
}
QW_TASK_DLOG("no more data in sink and query end, fetched blocks %d rows %" PRId64, pOutput->numOfBlocks,
pOutput->numOfRows);
if (!ctx->dynamicTask) {
QW_ERR_RET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCC, ctx->dynamicTask));
QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCC, ctx->dynamicTask));
}
if (NULL == pRsp) {
QW_ERR_RET(qwMallocFetchRsp(!ctx->localExec, len, &pRsp));
QW_ERR_JRET(qwMallocFetchRsp(!ctx->localExec, len, &pRsp));
*pOutput = output;
} else {
pOutput->queryEnd = output.queryEnd;
@ -368,7 +368,7 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen,
*dataLen += len + PAYLOAD_PREFIX_LEN;
*pRawDataLen += rawLen + PAYLOAD_PREFIX_LEN;
QW_ERR_RET(qwMallocFetchRsp(!ctx->localExec, *dataLen, &pRsp));
QW_ERR_JRET(qwMallocFetchRsp(!ctx->localExec, *dataLen, &pRsp));
// set the serialize start position
output.pData = pRsp->data + *dataLen - (len + PAYLOAD_PREFIX_LEN);
@ -380,7 +380,7 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen,
code = dsGetDataBlock(ctx->sinkHandle, &output);
if (code) {
QW_TASK_ELOG("dsGetDataBlock failed, code:%x - %s", code, tstrerror(code));
QW_ERR_RET(code);
QW_ERR_JRET(code);
}
pOutput->queryEnd = output.queryEnd;
@ -399,7 +399,7 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen,
if (DS_BUF_EMPTY == pOutput->bufStatus && pOutput->queryEnd) {
QW_TASK_DLOG("task all data fetched and done, fetched blocks %d rows %" PRId64, pOutput->numOfBlocks,
pOutput->numOfRows);
QW_ERR_RET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCC, ctx->dynamicTask));
QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCC, ctx->dynamicTask));
break;
}
@ -416,8 +416,11 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen,
}
}
_return:
*rspMsg = pRsp;
return TSDB_CODE_SUCCESS;
return code;
}
int32_t qwGetDeleteResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, SDeleteRes *pRes) {
@ -472,6 +475,12 @@ int32_t qwQuickRspFetchReq(QW_FPARAMS_DEF, SQWTaskCtx *ctx, SQWMsg *qwMsg, int32
code = qwGetQueryResFromSink(QW_FPARAMS(), ctx, &dataLen, &rawLen, &rsp, &sOutput);
}
if (code) {
qwFreeFetchRsp(rsp);
rsp = NULL;
dataLen = 0;
}
if (NULL == rsp && TSDB_CODE_SUCCESS == code) {
return TSDB_CODE_SUCCESS;
}
@ -877,10 +886,11 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
break;
}
qwFreeFetchRsp(rsp);
rsp = NULL;
if (code && QW_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) {
QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH);
qwFreeFetchRsp(rsp);
rsp = NULL;
qwMsg->connInfo = ctx->dataConnInfo;
code = qwBuildAndSendFetchRsp(ctx->fetchMsgType + 1, &qwMsg->connInfo, NULL, 0, code);
@ -1432,6 +1442,8 @@ void qWorkerDestroy(void **qWorkerMgmt) {
while (0 == destroyed) {
taosMsleep(2);
}
*qWorkerMgmt = NULL;
}
int32_t qWorkerGetStat(SReadHandle *handle, void *qWorkerMgmt, SQWorkerStat *pStat) {

View File

@ -223,6 +223,7 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/agg_group_NotReturnValue.py -Q 2
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/agg_group_NotReturnValue.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/agg_group_NotReturnValue.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/td-32548.py
,,y,system-test,./pytest.sh python3 ./test.py -f 3-enterprise/restore/restoreDnode.py -N 5 -M 3 -i False
,,y,system-test,./pytest.sh python3 ./test.py -f 3-enterprise/restore/restoreVnode.py -N 5 -M 3 -i False
@ -1254,7 +1255,7 @@
,,y,script,./test.sh -f tsim/query/unionall_as_table.sim
,,y,script,./test.sh -f tsim/query/multi_order_by.sim
,,y,script,./test.sh -f tsim/query/sys_tbname.sim
,,y,script,./test.sh -f tsim/query/sort-pre-cols.sim
,,y,script,./test.sh -f tsim/query/sort-pre-cols.sim
,,y,script,./test.sh -f tsim/query/groupby.sim
,,y,script,./test.sh -f tsim/query/groupby_distinct.sim
,,y,script,./test.sh -f tsim/query/event.sim
@ -1262,7 +1263,7 @@
,,y,script,./test.sh -f tsim/query/emptyTsRange.sim
,,y,script,./test.sh -f tsim/query/emptyTsRange_scl.sim
,,y,script,./test.sh -f tsim/query/partitionby.sim
,,y,script,./test.sh -f tsim/query/tableCount.sim
,,y,script,./test.sh -f tsim/query/tableCount.sim
,,y,script,./test.sh -f tsim/query/show_db_table_kind.sim
,,y,script,./test.sh -f tsim/query/bi_star_table.sim
,,y,script,./test.sh -f tsim/query/bi_tag_scan.sim
@ -1532,8 +1533,8 @@
,,n,script,./test.sh -f tsim/tagindex/sma_and_tag_index.sim
,,y,script,./test.sh -f tsim/tagindex/indexOverflow.sim
,,y,script,./test.sh -f tsim/view/view.sim
,,y,script,./test.sh -f tsim/query/cache_last.sim
,,y,script,./test.sh -f tsim/query/const.sim
,,y,script,./test.sh -f tsim/query/cache_last.sim
,,y,script,./test.sh -f tsim/query/const.sim
,,y,script,./test.sh -f tsim/query/nestedJoinView.sim
@ -1566,4 +1567,3 @@
,,n,docs-examples-test,bash rust.sh
,,n,docs-examples-test,bash go.sh
,,n,docs-examples-test,bash test_R.sh

View File

@ -0,0 +1,32 @@
from util.cases import *
from util.sql import *
class TDTestCase:
def init(self, conn, logSql, replicaVar=1):
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor(), True)
tdSql.execute("drop database if exists td_32548;")
tdSql.execute("create database td_32548 cachemodel 'last_row' keep 3650,3650,3650;")
def run(self):
tdSql.execute("use td_32548;")
tdSql.execute("create table ntb1 (ts timestamp, ival int);")
tdSql.execute("insert into ntb1 values ('2024-07-08 17:54:49.675', 54);")
tdSql.execute("flush database td_32548;")
tdSql.execute("insert into ntb1 values ('2024-07-08 17:53:49.675', 53);")
tdSql.execute("insert into ntb1 values ('2024-07-08 17:52:49.675', 52);")
tdSql.execute("delete from ntb1 where ts = '2024-07-08 17:54:49.675';")
tdSql.query('select last_row(ts) from ntb1;')
tdSql.checkData(0, 0, '2024-07-08 17:53:49.675')
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())