From 152fc5daab94f1b418d1f9fcf2ae24b59da1a191 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 29 Mar 2023 16:36:35 +0800 Subject: [PATCH 01/23] fix(stream):fix the race condition when creating new tsables. --- source/dnode/vnode/src/tq/tqExec.c | 4 +- source/libs/executor/inc/executil.h | 1 + source/libs/executor/inc/executorimpl.h | 23 ++- source/libs/executor/src/executil.c | 17 +- source/libs/executor/src/executor.c | 212 +++++++++++++----------- source/libs/executor/src/executorimpl.c | 1 + source/libs/executor/src/scanoperator.c | 32 ++-- 7 files changed, 156 insertions(+), 134 deletions(-) diff --git a/source/dnode/vnode/src/tq/tqExec.c b/source/dnode/vnode/src/tq/tqExec.c index f23b5f8526..18de4b75d0 100644 --- a/source/dnode/vnode/src/tq/tqExec.c +++ b/source/dnode/vnode/src/tq/tqExec.c @@ -68,14 +68,14 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs int32_t vgId = TD_VID(pTq->pVnode); if (qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType) < 0) { - tqDebug("prepare scan failed, return, consumer:0x%"PRIx64, pHandle->consumerId); + tqDebug("prepare scan failed, vgId:%d, consumer:0x%"PRIx64, vgId, pHandle->consumerId); if (pOffset->type == TMQ_OFFSET__LOG) { pRsp->rspOffset = *pOffset; return 0; } else { tqOffsetResetToLog(pOffset, pHandle->snapshotVer); if (qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType) < 0) { - tqDebug("prepare scan failed, return, consumer:0x%"PRIx64, pHandle->consumerId); + tqDebug("prepare scan failed, vgId:%d, consumer:0x%"PRIx64, vgId, pHandle->consumerId); pRsp->rspOffset = *pOffset; return 0; } diff --git a/source/libs/executor/inc/executil.h b/source/libs/executor/inc/executil.h index f99c7de93d..9b8f034e44 100644 --- a/source/libs/executor/inc/executil.h +++ b/source/libs/executor/inc/executil.h @@ -107,6 +107,7 @@ int32_t tableListGetGroupList(const STableListInfo* pTableList, int32_t uint64_t tableListGetSize(const STableListInfo* pTableList); uint64_t tableListGetSuid(const STableListInfo* pTableList); STableKeyInfo* tableListGetInfo(const STableListInfo* pTableList, int32_t index); +int32_t tableListFind(const STableListInfo* pTableList, uint64_t uid, int32_t startIndex); size_t getResultRowSize(struct SqlFunctionCtx* pCtx, int32_t numOfOutput); void initResultRowInfo(SResultRowInfo* pResultRowInfo); diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 3f519568c4..c269367eb2 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -143,10 +143,7 @@ typedef struct { SQueryTableDataCond tableCond; int64_t fillHistoryVer1; int64_t fillHistoryVer2; - - // int8_t triggerSaved; - // int64_t deleteMarkSaved; - SStreamState* pState; + SStreamState* pState; } SStreamTaskInfo; typedef struct { @@ -168,15 +165,14 @@ typedef struct STaskStopInfo { } STaskStopInfo; struct SExecTaskInfo { - STaskIdInfo id; - uint32_t status; - STimeWindow window; - STaskCostInfo cost; - int64_t owner; // if it is in execution - int32_t code; - int32_t qbufQuota; // total available buffer (in KB) during execution query - - int64_t version; // used for stream to record wal version, why not move to sschemainfo + STaskIdInfo id; + uint32_t status; + STimeWindow window; + STaskCostInfo cost; + int64_t owner; // if it is in execution + int32_t code; + int32_t qbufQuota; // total available buffer (in KB) during execution query + int64_t version; // used for stream to record wal version, why not move to sschemainfo SStreamTaskInfo streamInfo; SSchemaInfo schemaInfo; STableListInfo* pTableInfoList; // this is a table list @@ -188,6 +184,7 @@ struct SExecTaskInfo { SLocalFetch localFetch; SArray* pResultBlockList; // result block list STaskStopInfo stopInfo; + SRWLatch lock; // secure the access of STableListInfo }; enum { diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 953d614951..347a38247e 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -34,7 +34,7 @@ struct STableListInfo { int32_t numOfOuputGroups; // the data block will be generated one by one int32_t* groupOffset; // keep the offset value for each group in the tableList SArray* pTableList; - SHashObj* map; // speedup acquire the tableQueryInfo by table uid + SHashObj* map; // speedup acquire the tableQueryInfo by table uid uint64_t suid; }; @@ -1800,6 +1800,21 @@ STableKeyInfo* tableListGetInfo(const STableListInfo* pTableList, int32_t index) return taosArrayGet(pTableList->pTableList, index); } +int32_t tableListFind(const STableListInfo* pTableList, uint64_t uid, int32_t startIndex) { + int32_t numOfTables = taosArrayGetSize(pTableList->pTableList); + if (startIndex >= numOfTables) { + return -1; + } + + for (int32_t i = startIndex; i < numOfTables; ++i) { + STableKeyInfo* p = taosArrayGet(pTableList->pTableList, i); + if (p->uid == uid) { + return i; + } + } + return -1; +} + uint64_t getTableGroupId(const STableListInfo* pTableList, uint64_t tableUid) { int32_t* slot = taosHashGet(pTableList->map, &tableUid, sizeof(tableUid)); ASSERT(pTableList->map != NULL && slot != NULL); diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 85f17c0d53..5b71cdcac9 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -410,6 +410,7 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo } STableListInfo* pTableListInfo = pTaskInfo->pTableInfoList; + taosWLockLatch(&pTaskInfo->lock); for (int32_t i = 0; i < numOfQualifiedTables; ++i) { uint64_t* uid = taosArrayGet(qa, i); @@ -424,6 +425,7 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo if (code != TSDB_CODE_SUCCESS) { taosMemoryFree(keyBuf); taosArrayDestroy(qa); + taosWUnLockLatch(&pTaskInfo->lock); return code; } } @@ -445,6 +447,7 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo tableListAddTableInfo(pTableListInfo, keyInfo.uid, keyInfo.groupId); } + taosWUnLockLatch(&pTaskInfo->lock); if (keyBuf != NULL) { taosMemoryFree(keyBuf); } @@ -452,7 +455,9 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo taosArrayDestroy(qa); } else { // remove the table id in current list qDebug(" %d remove child tables from the stream scanner", (int32_t)taosArrayGetSize(tableIdList)); + taosWLockLatch(&pTaskInfo->lock); code = tqReaderRemoveTbUidList(pScanInfo->tqReader, tableIdList); + taosWUnLockLatch(&pTaskInfo->lock); } return code; @@ -1000,6 +1005,7 @@ int32_t qStreamRestoreParam(qTaskInfo_t tinfo) { } return 0; } + bool qStreamRecoverScanFinished(qTaskInfo_t tinfo) { SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; return pTaskInfo->streamInfo.recoverScanFinished; @@ -1080,8 +1086,11 @@ int32_t qStreamSetScanMemData(qTaskInfo_t tinfo, SPackedData submit) { } int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subType) { - SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; - SOperatorInfo* pOperator = pTaskInfo->pRoot; + SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; + SOperatorInfo* pOperator = pTaskInfo->pRoot; + STableListInfo* pTableListInfo = pTaskInfo->pTableInfoList; + const char* id = GET_TASKID(pTaskInfo); + pTaskInfo->streamInfo.prepareStatus = *pOffset; pTaskInfo->streamInfo.returned = 0; @@ -1095,20 +1104,23 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT // TODO add more check if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { if(pOperator->numOfDownstream != 1){ - qError("pOperator->numOfDownstream != 1:%d", pOperator->numOfDownstream); + qError("invalid operator, number of downstream:%d, %s", pOperator->numOfDownstream, id); return -1; } pOperator = pOperator->pDownstream[0]; } SStreamScanInfo* pInfo = pOperator->info; + STableScanInfo* pScanInfo = pInfo->pTableScanOp->info; + STableScanBase* pScanBaseInfo = &pScanInfo->base; + if (pOffset->type == TMQ_OFFSET__LOG) { - STableScanInfo* pTSInfo = pInfo->pTableScanOp->info; - tsdbReaderClose(pTSInfo->base.dataReader); - pTSInfo->base.dataReader = NULL; + tsdbReaderClose(pScanBaseInfo->dataReader); + pScanBaseInfo->dataReader = NULL; + // let's seek to the next version in wal file if (tqSeekVer(pInfo->tqReader, pOffset->version + 1, pTaskInfo->id.str) < 0) { - qError("tqSeekVer failed ver:%" PRId64, pOffset->version + 1); + qError("tqSeekVer failed ver:%, %s" PRId64, pOffset->version + 1, id); return -1; } } else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) { @@ -1117,120 +1129,120 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT int64_t uid = pOffset->uid; int64_t ts = pOffset->ts; + // this value may be changed if new tables are created + taosRLockLatch(&pTaskInfo->lock); + int32_t numOfTables = tableListGetSize(pTableListInfo); + if (uid == 0) { - if (tableListGetSize(pTaskInfo->pTableInfoList) != 0) { - STableKeyInfo* pTableInfo = tableListGetInfo(pTaskInfo->pTableInfoList, 0); + if (numOfTables != 0) { + STableKeyInfo* pTableInfo = tableListGetInfo(pTableListInfo, 0); uid = pTableInfo->uid; ts = INT64_MIN; } else { - qError("uid == 0 and tablelist size is 0"); + taosRUnLockLatch(&pTaskInfo->lock); + qError("no table in table list, %s", id); return -1; } } - /*if (pTaskInfo->streamInfo.lastStatus.type != TMQ_OFFSET__SNAPSHOT_DATA ||*/ - /*pTaskInfo->streamInfo.lastStatus.uid != uid || pTaskInfo->streamInfo.lastStatus.ts != ts) {*/ - STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info; - int32_t numOfTables = tableListGetSize(pTaskInfo->pTableInfoList); - - qDebug("switch to table uid:%" PRId64 " ts:%" PRId64 "% "PRId64 " rows returned", uid, ts, pInfo->pTableScanOp->resultInfo.totalRows); pInfo->pTableScanOp->resultInfo.totalRows = 0; - bool found = false; - for (int32_t i = 0; i < numOfTables; i++) { - STableKeyInfo* pTableInfo = tableListGetInfo(pTaskInfo->pTableInfoList, i); - if (pTableInfo->uid == uid) { - found = true; - pTableScanInfo->currentTable = i; - break; - } - } + // start from current accessed position + int32_t index = tableListFind(pTableListInfo, uid, pScanInfo->currentTable); + taosRUnLockLatch(&pTaskInfo->lock); - // TODO after dropping table, table may not found - if(!found){ - qError("uid not found in tablelist %" PRId64, uid); + if (index >= 0) { + pScanInfo->currentTable = index; + } else { + qError("uid:%" PRIu64 " not found in table list, total:%d %s", uid, numOfTables, id); return -1; } - if (pTableScanInfo->base.dataReader == NULL) { - STableKeyInfo* pList = tableListGetInfo(pTaskInfo->pTableInfoList, 0); - int32_t num = tableListGetSize(pTaskInfo->pTableInfoList); - - if (tsdbReaderOpen(pTableScanInfo->base.readHandle.vnode, &pTableScanInfo->base.cond, pList, num, - pTableScanInfo->pResBlock, &pTableScanInfo->base.dataReader, NULL) < 0 || - pTableScanInfo->base.dataReader == NULL) { - qError("tsdbReaderOpen failed. uid:%" PRIi64, pOffset->uid); + STableKeyInfo keyInfo = {.uid = uid}; + if (pScanBaseInfo->dataReader == NULL) { + int32_t code = tsdbReaderOpen(pScanBaseInfo->readHandle.vnode, &pScanBaseInfo->cond, &keyInfo, 1, + pScanInfo->pResBlock, &pScanBaseInfo->dataReader, NULL); + if (code != TSDB_CODE_SUCCESS) { + qError("prepare read tsdb snapshot failed, uid:%" PRId64 ", code:%s %s", pOffset->uid, tstrerror(code), id); + terrno = code; return -1; } + } else { + tsdbSetTableList(pScanBaseInfo->dataReader, &keyInfo, 1); + int64_t oldSkey = pScanBaseInfo->cond.twindows.skey; + + // let's start from the next ts that returned to consumer. + pScanBaseInfo->cond.twindows.skey = ts + 1; + tsdbReaderReset(pScanBaseInfo->dataReader, &pScanBaseInfo->cond); + + // restore the key value + pScanBaseInfo->cond.twindows.skey = oldSkey; + pScanInfo->scanTimes = 0; + + qDebug("tsdb reader offset seek snapshot to uid:%" PRId64 " ts %" PRId64 " table index:%d numOfTable:%d, %s", + uid, ts, pScanInfo->currentTable, numOfTables, id); + } + } else { + qError("invalid pOffset->type:%d, %s", pOffset->type, id); + return -1; + } + + } else { // subType == TOPIC_SUB_TYPE__TABLE/DB + if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) { + SStreamRawScanInfo* pInfo = pOperator->info; + SSnapContext* sContext = pInfo->sContext; + if (setForSnapShot(sContext, pOffset->uid) != 0) { + qError("setDataForSnapShot error. uid:%" PRIi64, pOffset->uid); + return -1; } - STableKeyInfo tki = {.uid = uid}; - tsdbSetTableList(pTableScanInfo->base.dataReader, &tki, 1); - int64_t oldSkey = pTableScanInfo->base.cond.twindows.skey; - pTableScanInfo->base.cond.twindows.skey = ts + 1; - tsdbReaderReset(pTableScanInfo->base.dataReader, &pTableScanInfo->base.cond); - pTableScanInfo->base.cond.twindows.skey = oldSkey; - pTableScanInfo->scanTimes = 0; + SMetaTableInfo mtInfo = getUidfromSnapShot(sContext); + tsdbReaderClose(pInfo->dataReader); + pInfo->dataReader = NULL; - qDebug("tsdb reader offset seek snapshot to uid:%" PRId64 " ts %" PRId64 ", table cur set to %d , all table num %d", uid, - ts, pTableScanInfo->currentTable, numOfTables); - } else { - qError("invalid pOffset->type:%d", pOffset->type); - return -1; + cleanupQueryTableDataCond(&pTaskInfo->streamInfo.tableCond); + tableListClear(pTableListInfo); + + if (mtInfo.uid == 0) { + return 0; // no data + } + + initQueryTableDataCondForTmq(&pTaskInfo->streamInfo.tableCond, sContext, &mtInfo); + pTaskInfo->streamInfo.tableCond.twindows.skey = pOffset->ts; + +// if (pTableListInfo == NULL) { +// pTableListInfo = tableListCreate(); +// } + + tableListAddTableInfo(pTableListInfo, mtInfo.uid, 0); + + STableKeyInfo* pList = tableListGetInfo(pTableListInfo, 0); + int32_t size = tableListGetSize(pTableListInfo); + + tsdbReaderOpen(pInfo->vnode, &pTaskInfo->streamInfo.tableCond, pList, size, NULL, &pInfo->dataReader, NULL); + + cleanupQueryTableDataCond(&pTaskInfo->streamInfo.tableCond); + strcpy(pTaskInfo->streamInfo.tbName, mtInfo.tbName); + tDeleteSSchemaWrapper(pTaskInfo->streamInfo.schema); + pTaskInfo->streamInfo.schema = mtInfo.schema; + + qDebug("tmqsnap qStreamPrepareScan snapshot data uid:%" PRId64 " ts %" PRId64, mtInfo.uid, pOffset->ts); + } else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_META) { + SStreamRawScanInfo* pInfo = pOperator->info; + SSnapContext* sContext = pInfo->sContext; + if (setForSnapShot(sContext, pOffset->uid) != 0) { + qError("setForSnapShot error. uid:%" PRIu64 " ,version:%" PRId64, pOffset->uid, pOffset->version); + return -1; + } + qDebug("tmqsnap qStreamPrepareScan snapshot meta uid:%" PRId64 " ts %" PRId64, pOffset->uid, pOffset->ts); + } else if (pOffset->type == TMQ_OFFSET__LOG) { + SStreamRawScanInfo* pInfo = pOperator->info; + tsdbReaderClose(pInfo->dataReader); + pInfo->dataReader = NULL; + qDebug("tmqsnap qStreamPrepareScan snapshot log"); } - } else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) { - SStreamRawScanInfo* pInfo = pOperator->info; - SSnapContext* sContext = pInfo->sContext; - if (setForSnapShot(sContext, pOffset->uid) != 0) { - qError("setDataForSnapShot error. uid:%" PRIi64, pOffset->uid); - return -1; - } - - SMetaTableInfo mtInfo = getUidfromSnapShot(sContext); - tsdbReaderClose(pInfo->dataReader); - pInfo->dataReader = NULL; - - cleanupQueryTableDataCond(&pTaskInfo->streamInfo.tableCond); - tableListClear(pTaskInfo->pTableInfoList); - - if (mtInfo.uid == 0) { - return 0; // no data - } - - initQueryTableDataCondForTmq(&pTaskInfo->streamInfo.tableCond, sContext, &mtInfo); - pTaskInfo->streamInfo.tableCond.twindows.skey = pOffset->ts; - - if (pTaskInfo->pTableInfoList == NULL) { - pTaskInfo->pTableInfoList = tableListCreate(); - } - - tableListAddTableInfo(pTaskInfo->pTableInfoList, mtInfo.uid, 0); - - STableKeyInfo* pList = tableListGetInfo(pTaskInfo->pTableInfoList, 0); - int32_t size = tableListGetSize(pTaskInfo->pTableInfoList); - - tsdbReaderOpen(pInfo->vnode, &pTaskInfo->streamInfo.tableCond, pList, size, NULL, &pInfo->dataReader, NULL); - - cleanupQueryTableDataCond(&pTaskInfo->streamInfo.tableCond); - strcpy(pTaskInfo->streamInfo.tbName, mtInfo.tbName); - tDeleteSSchemaWrapper(pTaskInfo->streamInfo.schema); - pTaskInfo->streamInfo.schema = mtInfo.schema; - - qDebug("tmqsnap qStreamPrepareScan snapshot data uid:%" PRId64 " ts %" PRId64, mtInfo.uid, pOffset->ts); - } else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_META) { - SStreamRawScanInfo* pInfo = pOperator->info; - SSnapContext* sContext = pInfo->sContext; - if (setForSnapShot(sContext, pOffset->uid) != 0) { - qError("setForSnapShot error. uid:%" PRIu64 " ,version:%" PRId64, pOffset->uid, pOffset->version); - return -1; - } - qDebug("tmqsnap qStreamPrepareScan snapshot meta uid:%" PRId64 " ts %" PRId64, pOffset->uid, pOffset->ts); - } else if (pOffset->type == TMQ_OFFSET__LOG) { - SStreamRawScanInfo* pInfo = pOperator->info; - tsdbReaderClose(pInfo->dataReader); - pInfo->dataReader = NULL; - qDebug("tmqsnap qStreamPrepareScan snapshot log"); } + return 0; } diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 24a26d575a..3a6ab4463a 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -1989,6 +1989,7 @@ static SExecTaskInfo* doCreateExecTaskInfo(uint64_t queryId, uint64_t taskId, in pTaskInfo->stopInfo.pStopInfo = taosArrayInit(4, sizeof(SExchangeOpStopInfo)); pTaskInfo->pResultBlockList = taosArrayInit(128, POINTER_BYTES); + taosInitRWLatch(&pTaskInfo->lock); pTaskInfo->id.vgId = vgId; pTaskInfo->id.queryId = queryId; pTaskInfo->id.str = buildTaskId(taskId, queryId); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 5dff1abb97..b2197017df 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -766,8 +766,8 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { STableKeyInfo* pTableInfo = tableListGetInfo(pTaskInfo->pTableInfoList, pInfo->currentTable); tsdbSetTableList(pInfo->base.dataReader, pTableInfo, 1); - qDebug("set uid:%" PRIu64 " into scanner, total tables:%d, index:%d %s", pTableInfo->uid, numOfTables, - pInfo->currentTable, pTaskInfo->id.str); + qDebug("set uid:%" PRIu64 " into scanner, total tables:%d, index:%d/%d %s", pTableInfo->uid, numOfTables, + pInfo->currentTable, numOfTables, pTaskInfo->id.str); tsdbReaderReset(pInfo->base.dataReader, &pInfo->base.cond); pInfo->scanTimes = 0; @@ -1569,19 +1569,16 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SStreamScanInfo* pInfo = pOperator->info; + const char* id = GET_TASKID(pTaskInfo); - qDebug("start to exec queue scan"); + qDebug("start to exec queue scan, %s", id); if (pTaskInfo->streamInfo.submit.msgStr != NULL) { - if (pInfo->tqReader->msg2.msgStr == NULL) { - /*pInfo->tqReader->pMsg = pTaskInfo->streamInfo.pReq;*/ - /*const SSubmitReq* pSubmit = pInfo->tqReader->pMsg;*/ - /*if (tqReaderSetDataMsg(pInfo->tqReader, pSubmit, 0) < 0) {*/ - /*void* msgStr = pTaskInfo->streamInfo.*/ + if (pInfo->tqReader->msg2.msgStr == NULL) { SPackedData submit = pTaskInfo->streamInfo.submit; if (tqReaderSetSubmitReq2(pInfo->tqReader, submit.msgStr, submit.msgLen, submit.ver) < 0) { - qError("submit msg messed up when initing stream submit block %p", submit.msgStr); + qError("submit msg messed up when initing stream submit block %p, %s", submit.msgStr, id); pInfo->tqReader->msg2 = (SPackedData){0}; pInfo->tqReader->setMsg = 0; ASSERT(0); @@ -1615,18 +1612,20 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_DATA) { SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp); if (pResult && pResult->info.rows > 0) { - qDebug("queue scan tsdb return %d rows min:%" PRId64 " max:%" PRId64 " wal curVersion:%" PRId64, pResult->info.rows, - pResult->info.window.skey, pResult->info.window.ekey, pInfo->tqReader->pWalReader->curVersion); + qDebug("queue scan tsdb return %d rows min:%" PRId64 " max:%" PRId64 " wal curVersion:%" PRId64" %s", pResult->info.rows, + pResult->info.window.skey, pResult->info.window.ekey, pInfo->tqReader->pWalReader->curVersion, id); pTaskInfo->streamInfo.returned = 1; return pResult; } else { + // no data has return already, try to extract data in the WAL if (!pTaskInfo->streamInfo.returned) { STableScanInfo* pTSInfo = pInfo->pTableScanOp->info; tsdbReaderClose(pTSInfo->base.dataReader); - qDebug("3"); + pTSInfo->base.dataReader = NULL; tqOffsetResetToLog(&pTaskInfo->streamInfo.prepareStatus, pTaskInfo->streamInfo.snapshotVer); - qDebug("queue scan tsdb over, switch to wal ver %" PRId64 "", pTaskInfo->streamInfo.snapshotVer + 1); + + qDebug("queue scan tsdb over, switch to wal ver:%" PRId64 " %s", pTaskInfo->streamInfo.snapshotVer + 1, id); if (tqSeekVer(pInfo->tqReader, pTaskInfo->streamInfo.snapshotVer + 1, pTaskInfo->id.str) < 0) { tqOffsetResetToLog(&pTaskInfo->streamInfo.lastStatus, pTaskInfo->streamInfo.snapshotVer); return NULL; @@ -1643,7 +1642,7 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { if (tqNextBlock(pInfo->tqReader, &ret) < 0) { // if the end is reached, terrno is 0 if (terrno != 0) { - qError("failed to get next log block since %s", terrstr()); + qError("failed to get next log block since %s, %s", terrstr(), id); } } @@ -1658,9 +1657,6 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { } else if (ret.fetchType == FETCH_TYPE__META) { qError("unexpected ret.fetchType:%d", ret.fetchType); continue; - // pTaskInfo->streamInfo.lastStatus = ret.offset; - // pTaskInfo->streamInfo.metaBlk = ret.meta; - // return NULL; } else if (ret.fetchType == FETCH_TYPE__NONE || (ret.fetchType == FETCH_TYPE__SEP && pOperator->status == OP_EXEC_RECV)) { pTaskInfo->streamInfo.lastStatus = ret.offset; @@ -1672,7 +1668,7 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { } } } else { - qError("unexpected streamInfo prepare type: %d", pTaskInfo->streamInfo.prepareStatus.type); + qError("unexpected streamInfo prepare type: %d %s", pTaskInfo->streamInfo.prepareStatus.type, id); return NULL; } } From f61732dca32c228f0167de961e80012b2a85ac10 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Chappyguoxy=E2=80=9D?= <“happy_guoxy@163.com”> Date: Wed, 29 Mar 2023 16:56:53 +0800 Subject: [PATCH 02/23] test: refine query cases --- tests/system-test/2-query/columnLenUpdated.py | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/tests/system-test/2-query/columnLenUpdated.py b/tests/system-test/2-query/columnLenUpdated.py index d6940fde8b..ea01cd623c 100644 --- a/tests/system-test/2-query/columnLenUpdated.py +++ b/tests/system-test/2-query/columnLenUpdated.py @@ -147,9 +147,9 @@ class TDTestCase: tdSql.checkData(1, 1, '55555') - tdSql.query("create table stb (ts timestamp, f1 int) tags (tg1 binary(2))") + tdSql.query("create table stb (ts timestamp, f1 int, f2 binary(2)) tags (tg1 binary(2))") tdSql.query("create table tb1 using stb tags('bb')") - tdSql.query("insert into tb1 values (now, 2)") + tdSql.query("insert into tb1 values (now, 2,'22')") tdSql.query("select count(*) from stb group by tg1") tdSql.checkData(0, 0, 1) @@ -163,13 +163,23 @@ class TDTestCase: if retCode != "TAOS_OK": tdLog.exit("taos -s fail") - keyDict['s'] = "\"insert into db1.tb2 values (now, 2)\"" + keyDict['s'] = "\"insert into db1.tb2 values (now, 2,'22')\"" + retCode = taos_command(buildPath, "s", keyDict['s'], "Insert OK", '') + if retCode != "TAOS_OK": + tdLog.exit("taos -s fail") + + keyDict['s'] = "\"alter table db1.stb modify column f2 binary(5) \"" + retCode = taos_command(buildPath, "s", keyDict['s'], "Query OK", '') + if retCode != "TAOS_OK": + tdLog.exit("taos -s fail") + + keyDict['s'] = "\"insert into db1.tb2 values (now, 3,'55555')\"" retCode = taos_command(buildPath, "s", keyDict['s'], "Insert OK", '') if retCode != "TAOS_OK": tdLog.exit("taos -s fail") tdSql.query("select count(*) from stb group by tg1") - tdSql.checkData(0, 0, 1) + tdSql.checkData(0, 0, 2) tdSql.checkData(1, 0, 1) From d998912ce8b8cf6b36838a71ff75b12cde2b06f0 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 29 Mar 2023 17:11:41 +0800 Subject: [PATCH 03/23] fix(query): fix syntax error. --- source/libs/executor/src/executor.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 5b71cdcac9..6c4a4bbe88 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -1120,7 +1120,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT // let's seek to the next version in wal file if (tqSeekVer(pInfo->tqReader, pOffset->version + 1, pTaskInfo->id.str) < 0) { - qError("tqSeekVer failed ver:%, %s" PRId64, pOffset->version + 1, id); + qError("tqSeekVer failed ver:%"PRId64", %s" PRId64, pOffset->version + 1, id); return -1; } } else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) { From b25b6a8333c9280b4a616a4b1d4dae348667f304 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Chappyguoxy=E2=80=9D?= <“happy_guoxy@163.com”> Date: Wed, 29 Mar 2023 18:16:46 +0800 Subject: [PATCH 04/23] test: refine query cases --- tests/parallel_test/cases.task | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index 260d47032a..a50503e24b 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -14,8 +14,9 @@ ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_math.py -Q 2 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_time.py -Q 2 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery.py -Q 2 -,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/columnLenUpdated.py -Q 2 -,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/columnLenUpdated.py -Q 3 +#,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/columnLenUpdated.py +#,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/columnLenUpdated.py -Q 2 +#,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/columnLenUpdated.py -Q 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery.py -Q 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_str.py -Q 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_math.py -Q 3 @@ -24,7 +25,7 @@ ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_str.py -Q 4 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_math.py -Q 4 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_time.py -Q 4 -,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/columnLenUpdated.py -Q 4 +#,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/columnLenUpdated.py -Q 4 ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqShow.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqDropStb.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/subscribeStb0.py From 3810e57c1542c4deb332d6c20cafda120a7c99ea Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Wed, 29 Mar 2023 19:10:10 +0800 Subject: [PATCH 05/23] fix: tag copy issue --- source/libs/executor/src/executil.c | 4 ++++ tests/parallel_test/cases.task | 8 ++++---- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 953d614951..0081ffcb0b 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -571,6 +571,10 @@ int32_t getColInfoResultForGroupby(void* metaHandle, SNodeList* group, STableLis memcpy(pStart, data, len); pStart += len; } else if (IS_VAR_DATA_TYPE(pValue->info.type)) { + if (varDataTLen(data) > pValue->info.bytes) { + code = TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER; + goto end; + } memcpy(pStart, data, varDataTLen(data)); pStart += varDataTLen(data); } else { diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index a50503e24b..ab7540ee59 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -14,9 +14,9 @@ ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_math.py -Q 2 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_time.py -Q 2 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery.py -Q 2 -#,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/columnLenUpdated.py -#,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/columnLenUpdated.py -Q 2 -#,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/columnLenUpdated.py -Q 3 +,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/columnLenUpdated.py +,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/columnLenUpdated.py -Q 2 +,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/columnLenUpdated.py -Q 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery.py -Q 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_str.py -Q 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_math.py -Q 3 @@ -25,7 +25,7 @@ ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_str.py -Q 4 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_math.py -Q 4 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_time.py -Q 4 -#,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/columnLenUpdated.py -Q 4 +,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/columnLenUpdated.py -Q 4 ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqShow.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqDropStb.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/subscribeStb0.py From 56c98d77684049c00481330460992b53a73cd3ae Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 29 Mar 2023 19:35:04 +0800 Subject: [PATCH 06/23] refactor: do some internal refactor. --- include/libs/executor/executor.h | 3 ++- source/dnode/vnode/src/tq/tq.c | 7 +++--- source/dnode/vnode/src/tq/tqMeta.c | 10 ++++---- source/libs/executor/inc/executorimpl.h | 8 ++----- source/libs/executor/src/executor.c | 24 +++++++------------ source/libs/executor/src/executorimpl.c | 4 ++-- tests/system-test/2-query/columnLenUpdated.py | 2 +- 7 files changed, 23 insertions(+), 35 deletions(-) diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index 19a6407b77..8d63306a88 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -78,7 +78,7 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers, int32_t v * @param SReadHandle * @return */ -qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* pReaderHandle, int32_t vgId, int32_t* numOfCols, SSchemaWrapper** pSchema); +qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* pReaderHandle, int32_t vgId, int32_t* numOfCols, uint64_t id); /** * set the task Id, usually used by message queue process @@ -89,6 +89,7 @@ qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* pReaderHandle, int3 void qSetTaskId(qTaskInfo_t tinfo, uint64_t taskId, uint64_t queryId); int32_t qSetStreamOpOpen(qTaskInfo_t tinfo); + /** * Set multiple input data blocks for the stream scan. * @param tinfo diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 4db53c1627..484a0559a8 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -799,7 +799,6 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg STqHandle tqHandle = {0}; pHandle = &tqHandle; - /*taosInitRWLatch(&pExec->lock);*/ uint64_t oldConsumerId = pHandle->consumerId; memcpy(pHandle->subKey, req.subKey, TSDB_SUBSCRIBE_KEY_LEN); @@ -834,7 +833,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg req.qmsg = NULL; pHandle->execHandle.task = - qCreateQueueExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle, vgId, &pHandle->execHandle.numOfCols, NULL); + qCreateQueueExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle, vgId, &pHandle->execHandle.numOfCols, req.newConsumerId); void* scanner = NULL; qExtractStreamScanner(pHandle->execHandle.task, &scanner); pHandle->execHandle.pExecReader = qExtractReaderFromStreamScanner(scanner); @@ -847,7 +846,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg buildSnapContext(handle.meta, handle.version, 0, pHandle->execHandle.subType, pHandle->fetchMeta, (SSnapContext**)(&handle.sContext)); - pHandle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &handle, vgId, NULL, NULL); + pHandle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &handle, vgId, NULL, req.newConsumerId); } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) { pHandle->pWalReader = walOpenReader(pVnode->pWal, NULL); pHandle->execHandle.execTb.suid = req.suid; @@ -865,7 +864,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg buildSnapContext(handle.meta, handle.version, req.suid, pHandle->execHandle.subType, pHandle->fetchMeta, (SSnapContext**)(&handle.sContext)); - pHandle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &handle, vgId, NULL, NULL); + pHandle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &handle, vgId, NULL, req.newConsumerId); } taosHashPut(pTq->pHandle, req.subKey, strlen(req.subKey), pHandle, sizeof(STqHandle)); diff --git a/source/dnode/vnode/src/tq/tqMeta.c b/source/dnode/vnode/src/tq/tqMeta.c index b6bca1e4ca..cb4be7a539 100644 --- a/source/dnode/vnode/src/tq/tqMeta.c +++ b/source/dnode/vnode/src/tq/tqMeta.c @@ -307,7 +307,7 @@ int32_t tqMetaRestoreHandle(STQ* pTq) { if (handle.execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { handle.execHandle.task = - qCreateQueueExecTaskInfo(handle.execHandle.execCol.qmsg, &reader, vgId, &handle.execHandle.numOfCols, NULL); +` qCreateQueueExecTaskInfo(handle.execHandle.execCol.qmsg, &reader, vgId, &handle.execHandle.numOfCols, 0); if (handle.execHandle.task == NULL) { tqError("cannot create exec task for %s", handle.subKey); code = -1; @@ -332,7 +332,7 @@ int32_t tqMetaRestoreHandle(STQ* pTq) { buildSnapContext(reader.meta, reader.version, 0, handle.execHandle.subType, handle.fetchMeta, (SSnapContext**)(&reader.sContext)); - handle.execHandle.task = qCreateQueueExecTaskInfo(NULL, &reader, vgId, NULL, NULL); + handle.execHandle.task = qCreateQueueExecTaskInfo(NULL, &reader, vgId, NULL, 0); } else if (handle.execHandle.subType == TOPIC_SUB_TYPE__TABLE) { handle.pWalReader = walOpenReader(pTq->pVnode->pWal, NULL); @@ -341,7 +341,7 @@ int32_t tqMetaRestoreHandle(STQ* pTq) { tqDebug("vgId:%d, tq try to get all ctb, suid:%" PRId64, pTq->pVnode->config.vgId, handle.execHandle.execTb.suid); for (int32_t i = 0; i < taosArrayGetSize(tbUidList); i++) { int64_t tbUid = *(int64_t*)taosArrayGet(tbUidList, i); - tqDebug("vgId:%d, idx %d, uid:%" PRId64, TD_VID(pTq->pVnode), i, tbUid); + tqDebug("vgId:%d, idx %d, uid:%" PRId64, vgId, i, tbUid); } handle.execHandle.pExecReader = tqOpenReader(pTq->pVnode); tqReaderSetTbUidList(handle.execHandle.pExecReader, tbUidList); @@ -349,9 +349,9 @@ int32_t tqMetaRestoreHandle(STQ* pTq) { buildSnapContext(reader.meta, reader.version, handle.execHandle.execTb.suid, handle.execHandle.subType, handle.fetchMeta, (SSnapContext**)(&reader.sContext)); - handle.execHandle.task = qCreateQueueExecTaskInfo(NULL, &reader, vgId, NULL, NULL); + handle.execHandle.task = qCreateQueueExecTaskInfo(NULL, &reader, vgId, NULL, 0); } - tqDebug("tq restore %s consumer %" PRId64 " vgId:%d", handle.subKey, handle.consumerId, TD_VID(pTq->pVnode)); + tqDebug("tq restore %s consumer %" PRId64 " vgId:%d", handle.subKey, handle.consumerId, vgId); taosHashPut(pTq->pHandle, pKey, kLen, &handle, sizeof(STqHandle)); } diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index c269367eb2..0cb9955056 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -482,12 +482,6 @@ typedef struct SStreamScanInfo { } SStreamScanInfo; typedef struct { - // int8_t subType; - // bool withMeta; - // int64_t suid; - // int64_t snapVersion; - // void *metaInfo; - // void *dataInfo; SVnode* vnode; SSDataBlock pRes; // result SSDataBlock STsdbReader* dataReader; @@ -690,6 +684,8 @@ typedef struct SStreamFillOperatorInfo { #define OPTR_IS_OPENED(_optr) (((_optr)->status & OP_OPENED) == OP_OPENED) #define OPTR_SET_OPENED(_optr) ((_optr)->status |= OP_OPENED) +SExecTaskInfo* doCreateExecTaskInfo(uint64_t queryId, uint64_t taskId, int32_t vgId, EOPTR_EXEC_MODEL model, char* dbFName); + SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn, __optr_fn_t cleanup, __optr_close_fn_t closeFn, __optr_reqBuf_fn_t reqBufFn, __optr_explain_fn_t explain); int32_t optrDummyOpenFn(SOperatorInfo* pOperator); diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 6c4a4bbe88..06813f0148 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -242,30 +242,28 @@ int32_t qSetSMAInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks, return code; } -qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* pReaderHandle, int32_t vgId, int32_t* numOfCols, SSchemaWrapper** pSchema) { - if (msg == NULL) { - // create raw scan - SExecTaskInfo* pTaskInfo = taosMemoryCalloc(1, sizeof(SExecTaskInfo)); +qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* pReaderHandle, int32_t vgId, int32_t* numOfCols, uint64_t id) { + if (msg == NULL) { // create raw scan + SExecTaskInfo* pTaskInfo = doCreateExecTaskInfo(0, id, vgId, OPTR_EXEC_MODEL_QUEUE, ""); if (NULL == pTaskInfo) { terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } - setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED); - - pTaskInfo->cost.created = taosGetTimestampUs(); - pTaskInfo->execModel = OPTR_EXEC_MODEL_QUEUE; pTaskInfo->pRoot = createRawScanOperatorInfo(pReaderHandle, pTaskInfo); if (NULL == pTaskInfo->pRoot) { terrno = TSDB_CODE_OUT_OF_MEMORY; taosMemoryFree(pTaskInfo); return NULL; } + + qDebug("create raw scan task info completed, vgId:%d, %s", vgId, GET_TASKID(pTaskInfo)); return pTaskInfo; } struct SSubplan* pPlan = NULL; - int32_t code = qStringToSubplan(msg, &pPlan); + + int32_t code = qStringToSubplan(msg, &pPlan); if (code != TSDB_CODE_SUCCESS) { terrno = code; return NULL; @@ -292,9 +290,6 @@ qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* pReaderHandle, int3 } } - if (pSchema) { - *pSchema = tCloneSSchemaWrapper(((SExecTaskInfo*)pTaskInfo)->schemaInfo.qsw); - } return pTaskInfo; } @@ -1138,6 +1133,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT STableKeyInfo* pTableInfo = tableListGetInfo(pTableListInfo, 0); uid = pTableInfo->uid; ts = INT64_MIN; + pScanInfo->currentTable = 0; } else { taosRUnLockLatch(&pTaskInfo->lock); qError("no table in table list, %s", id); @@ -1210,10 +1206,6 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT initQueryTableDataCondForTmq(&pTaskInfo->streamInfo.tableCond, sContext, &mtInfo); pTaskInfo->streamInfo.tableCond.twindows.skey = pOffset->ts; -// if (pTableListInfo == NULL) { -// pTableListInfo = tableListCreate(); -// } - tableListAddTableInfo(pTableListInfo, mtInfo.uid, 0); STableKeyInfo* pList = tableListGetInfo(pTableListInfo, 0); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 3a6ab4463a..c2a068e1cb 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -1974,7 +1974,7 @@ char* buildTaskId(uint64_t taskId, uint64_t queryId) { return p; } -static SExecTaskInfo* doCreateExecTaskInfo(uint64_t queryId, uint64_t taskId, int32_t vgId, EOPTR_EXEC_MODEL model, char* dbFName) { +SExecTaskInfo* doCreateExecTaskInfo(uint64_t queryId, uint64_t taskId, int32_t vgId, EOPTR_EXEC_MODEL model, char* dbFName) { SExecTaskInfo* pTaskInfo = taosMemoryCalloc(1, sizeof(SExecTaskInfo)); if (pTaskInfo == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -1982,6 +1982,7 @@ static SExecTaskInfo* doCreateExecTaskInfo(uint64_t queryId, uint64_t taskId, in } setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED); + pTaskInfo->cost.created = taosGetTimestampUs(); pTaskInfo->schemaInfo.dbname = taosStrdup(dbFName); pTaskInfo->execModel = model; @@ -2465,7 +2466,6 @@ int32_t createExecTaskInfo(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHand goto _complete; } - (*pTaskInfo)->cost.created = taosGetTimestampUs(); return TSDB_CODE_SUCCESS; _complete: diff --git a/tests/system-test/2-query/columnLenUpdated.py b/tests/system-test/2-query/columnLenUpdated.py index d6940fde8b..e4555b4e79 100644 --- a/tests/system-test/2-query/columnLenUpdated.py +++ b/tests/system-test/2-query/columnLenUpdated.py @@ -83,7 +83,7 @@ class TDTestCase: def init(self, conn, logSql, replicaVar=1): self.replicaVar = int(replicaVar) tdLog.debug(f"start to excute {__file__}") - tdSql.init(conn.cursor()) + tdSql.init(conn.cursor(), True) def getBuildPath(self): selfPath = os.path.dirname(os.path.realpath(__file__)) From c8ad465a0d18e2aa68e93d179325c7732ecd57be Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 29 Mar 2023 19:40:34 +0800 Subject: [PATCH 07/23] refactor: do some internal refactor. --- source/dnode/vnode/src/tq/tqMeta.c | 2 +- source/libs/executor/src/executor.c | 17 +++++++++-------- 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/source/dnode/vnode/src/tq/tqMeta.c b/source/dnode/vnode/src/tq/tqMeta.c index cb4be7a539..a273f2edec 100644 --- a/source/dnode/vnode/src/tq/tqMeta.c +++ b/source/dnode/vnode/src/tq/tqMeta.c @@ -307,7 +307,7 @@ int32_t tqMetaRestoreHandle(STQ* pTq) { if (handle.execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { handle.execHandle.task = -` qCreateQueueExecTaskInfo(handle.execHandle.execCol.qmsg, &reader, vgId, &handle.execHandle.numOfCols, 0); + qCreateQueueExecTaskInfo(handle.execHandle.execCol.qmsg, &reader, vgId, &handle.execHandle.numOfCols, 0); if (handle.execHandle.task == NULL) { tqError("cannot create exec task for %s", handle.subKey); code = -1; diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 06813f0148..1b71a41586 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -1098,7 +1098,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT // TODO add more check if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { - if(pOperator->numOfDownstream != 1){ + if (pOperator->numOfDownstream != 1) { qError("invalid operator, number of downstream:%d, %s", pOperator->numOfDownstream, id); return -1; } @@ -1115,7 +1115,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT // let's seek to the next version in wal file if (tqSeekVer(pInfo->tqReader, pOffset->version + 1, pTaskInfo->id.str) < 0) { - qError("tqSeekVer failed ver:%"PRId64", %s" PRId64, pOffset->version + 1, id); + qError("tqSeekVer failed ver:%"PRId64", %s", pOffset->version + 1, id); return -1; } } else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) { @@ -1123,6 +1123,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT // those data are from the snapshot in tsdb, besides the data in the wal file. int64_t uid = pOffset->uid; int64_t ts = pOffset->ts; + int32_t index = 0; // this value may be changed if new tables are created taosRLockLatch(&pTaskInfo->lock); @@ -1144,7 +1145,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT pInfo->pTableScanOp->resultInfo.totalRows = 0; // start from current accessed position - int32_t index = tableListFind(pTableListInfo, uid, pScanInfo->currentTable); + index = tableListFind(pTableListInfo, uid, pScanInfo->currentTable); taosRUnLockLatch(&pTaskInfo->lock); if (index >= 0) { @@ -1183,12 +1184,12 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT return -1; } - } else { // subType == TOPIC_SUB_TYPE__TABLE/DB + } else { // subType == TOPIC_SUB_TYPE__TABLE/TOPIC_SUB_TYPE__DB if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) { SStreamRawScanInfo* pInfo = pOperator->info; SSnapContext* sContext = pInfo->sContext; if (setForSnapShot(sContext, pOffset->uid) != 0) { - qError("setDataForSnapShot error. uid:%" PRIi64, pOffset->uid); + qError("setDataForSnapShot error. uid:%" PRId64" , %s", pOffset->uid, id); return -1; } @@ -1218,7 +1219,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT tDeleteSSchemaWrapper(pTaskInfo->streamInfo.schema); pTaskInfo->streamInfo.schema = mtInfo.schema; - qDebug("tmqsnap qStreamPrepareScan snapshot data uid:%" PRId64 " ts %" PRId64, mtInfo.uid, pOffset->ts); + qDebug("tmqsnap qStreamPrepareScan snapshot data uid:%" PRId64 " ts %" PRId64" %s", mtInfo.uid, pOffset->ts, id); } else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_META) { SStreamRawScanInfo* pInfo = pOperator->info; SSnapContext* sContext = pInfo->sContext; @@ -1226,12 +1227,12 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT qError("setForSnapShot error. uid:%" PRIu64 " ,version:%" PRId64, pOffset->uid, pOffset->version); return -1; } - qDebug("tmqsnap qStreamPrepareScan snapshot meta uid:%" PRId64 " ts %" PRId64, pOffset->uid, pOffset->ts); + qDebug("tmqsnap qStreamPrepareScan snapshot meta uid:%" PRId64 " ts %" PRId64 " %s", pOffset->uid, pOffset->ts, id); } else if (pOffset->type == TMQ_OFFSET__LOG) { SStreamRawScanInfo* pInfo = pOperator->info; tsdbReaderClose(pInfo->dataReader); pInfo->dataReader = NULL; - qDebug("tmqsnap qStreamPrepareScan snapshot log"); + qDebug("tmqsnap qStreamPrepareScan snapshot log, %s", id); } } From 84d75c2ec09da7454ba83ab8d945ff46616f3126 Mon Sep 17 00:00:00 2001 From: slzhou Date: Wed, 29 Mar 2023 19:51:13 +0800 Subject: [PATCH 08/23] fix: join after interval --- source/libs/executor/src/timewindowoperator.c | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index b01143841c..70febcaf6a 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -128,8 +128,9 @@ FORCE_INLINE int32_t getForwardStepsInBlock(int32_t numOfRows, __block_search_fn if (end >= 0) { forwardRows = end; - if (pData[end + pos] == ekey) { + while (pData[end + pos] == ekey) { forwardRows += 1; + ++pos; } } } else { @@ -137,8 +138,9 @@ FORCE_INLINE int32_t getForwardStepsInBlock(int32_t numOfRows, __block_search_fn if (end >= 0) { forwardRows = end; - if (pData[end + pos] == ekey) { + while (pData[end + pos] == ekey) { forwardRows += 1; + ++pos; } } // int32_t end = searchFn((char*)pData, pos + 1, ekey, order); From d9b3c638c3a2a7c5e299793faa1c57b8c3c9ced4 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Wed, 29 Mar 2023 21:03:39 +0800 Subject: [PATCH 09/23] fix:add assert for debug --- source/common/src/tdataformat.c | 3 +++ source/libs/parser/src/parInsertUtil.c | 6 ++++++ 2 files changed, 9 insertions(+) diff --git a/source/common/src/tdataformat.c b/source/common/src/tdataformat.c index 34808aa389..9632750a18 100644 --- a/source/common/src/tdataformat.c +++ b/source/common/src/tdataformat.c @@ -2453,6 +2453,9 @@ int32_t tColDataAddValueByDataBlock(SColData *pColData, int8_t type, int32_t byt code = tColDataAppendValueImpl[pColData->flag][CV_FLAG_NULL](pColData, NULL, 0); if (code) goto _exit; } else { + if(ASSERT(varDataTLen(data + offset) <= bytes)){ + uError("var data length invalid, varDataTLen(data + offset):%d <= bytes:%d", (int)varDataTLen(data + offset), bytes); + } code = tColDataAppendValueImpl[pColData->flag][CV_FLAG_VALUE](pColData, (uint8_t *)varDataVal(data + offset), varDataLen(data + offset)); } diff --git a/source/libs/parser/src/parInsertUtil.c b/source/libs/parser/src/parInsertUtil.c index 132a3b2618..bbe36e0c80 100644 --- a/source/libs/parser/src/parInsertUtil.c +++ b/source/libs/parser/src/parInsertUtil.c @@ -656,6 +656,9 @@ int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreate } else { pStart += colLength[j]; } + if(ASSERT(pCol->nVal == numOfRows)){ + uError("tFields is null, pCol->nVal:%d != numOfRows:%d", pCol->nVal, numOfRows); + } } }else{ for (int i = 0; i < numFields; i++) { @@ -684,6 +687,9 @@ int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreate } else { pStart += colLength[i]; } + if(ASSERT(pCol->nVal == numOfRows)){ + uError("tFields is not null, pCol->nVal:%d != numOfRows:%d", pCol->nVal, numOfRows); + } boundInfo->pColIndex[j] = -1; break; } From c1b4f941997c039d0bfa7de110551c3c689843ca Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Wed, 29 Mar 2023 22:12:13 +0800 Subject: [PATCH 10/23] fix:add assert for debug --- source/common/src/tdataformat.c | 2 ++ source/libs/parser/src/parInsertUtil.c | 6 ------ 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/source/common/src/tdataformat.c b/source/common/src/tdataformat.c index 9632750a18..d6ab974c6c 100644 --- a/source/common/src/tdataformat.c +++ b/source/common/src/tdataformat.c @@ -2455,6 +2455,8 @@ int32_t tColDataAddValueByDataBlock(SColData *pColData, int8_t type, int32_t byt } else { if(ASSERT(varDataTLen(data + offset) <= bytes)){ uError("var data length invalid, varDataTLen(data + offset):%d <= bytes:%d", (int)varDataTLen(data + offset), bytes); + code = TSDB_CODE_INVALID_PARA; + goto _exit; } code = tColDataAppendValueImpl[pColData->flag][CV_FLAG_VALUE](pColData, (uint8_t *)varDataVal(data + offset), varDataLen(data + offset)); diff --git a/source/libs/parser/src/parInsertUtil.c b/source/libs/parser/src/parInsertUtil.c index bbe36e0c80..132a3b2618 100644 --- a/source/libs/parser/src/parInsertUtil.c +++ b/source/libs/parser/src/parInsertUtil.c @@ -656,9 +656,6 @@ int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreate } else { pStart += colLength[j]; } - if(ASSERT(pCol->nVal == numOfRows)){ - uError("tFields is null, pCol->nVal:%d != numOfRows:%d", pCol->nVal, numOfRows); - } } }else{ for (int i = 0; i < numFields; i++) { @@ -687,9 +684,6 @@ int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreate } else { pStart += colLength[i]; } - if(ASSERT(pCol->nVal == numOfRows)){ - uError("tFields is not null, pCol->nVal:%d != numOfRows:%d", pCol->nVal, numOfRows); - } boundInfo->pColIndex[j] = -1; break; } From 5f4cb41e288740e69a8f69d139af8e11400d5d66 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 29 Mar 2023 23:45:41 +0800 Subject: [PATCH 11/23] fix(tmq): set correct start offset value. --- source/client/src/clientTmq.c | 12 ++++++++---- source/dnode/vnode/src/tq/tq.c | 4 ++-- source/dnode/vnode/src/tq/tqScan.c | 2 +- source/libs/executor/src/executor.c | 18 +++++++++++------- 4 files changed, 22 insertions(+), 14 deletions(-) diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 111ca28cdc..a16ddce0aa 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -535,10 +535,14 @@ static int32_t doSendCommitMsg(tmq_t* tmq, SMqClientVg* pVg, const char* pTopicN atomic_add_fetch_32(&pParamSet->totalRspNum, 1); SEp* pEp = GET_ACTIVE_EP(&pVg->epSet); - tscDebug("consumer:0x%" PRIx64 " topic:%s on vgId:%d send offset:%" PRId64 " prev:%" PRId64 - ", ep:%s:%d, ordinal:%d/%d, req:0x%" PRIx64, - tmq->consumerId, pOffset->subKey, pVg->vgId, pOffset->val.version, pVg->committedOffset.version, pEp->fqdn, - pEp->port, index + 1, totalVgroups, pMsgSendInfo->requestId); + char offsetBuf[80] = {0}; + tFormatOffset(offsetBuf, tListLen(offsetBuf), &pOffset->val); + + char commitBuf[80] = {0}; + tFormatOffset(commitBuf, tListLen(commitBuf), &pVg->committedOffset); + tscDebug("consumer:0x%" PRIx64 " topic:%s on vgId:%d send offset:%s prev:%s, ep:%s:%d, ordinal:%d/%d, req:0x%" PRIx64, + tmq->consumerId, pOffset->subKey, pVg->vgId, offsetBuf, commitBuf, pEp->fqdn, pEp->port, index + 1, + totalVgroups, pMsgSendInfo->requestId); int64_t transporterId = 0; asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, pMsgSendInfo); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 484a0559a8..d4bdd633e9 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -399,8 +399,8 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand char formatBuf[80]; tFormatOffset(formatBuf, 80, pOffsetVal); - tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, existed offset found, offset reset to %s and continue.", - consumerId, pHandle->subKey, vgId, formatBuf); + tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, existed offset found, offset reset to %s and continue. reqId:0x%"PRIx64, + consumerId, pHandle->subKey, vgId, formatBuf, pRequest->reqId); return 0; } else { // no poll occurs in this vnode for this topic, let's seek to the right offset value. diff --git a/source/dnode/vnode/src/tq/tqScan.c b/source/dnode/vnode/src/tq/tqScan.c index 00695e14f4..85a62c4dd1 100644 --- a/source/dnode/vnode/src/tq/tqScan.c +++ b/source/dnode/vnode/src/tq/tqScan.c @@ -92,7 +92,7 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs SSDataBlock* pDataBlock = NULL; uint64_t ts = 0; - tqDebug("vgId:%d, tmq task start to execute, consumer:0x%" PRIx64, vgId, pHandle->consumerId); + tqDebug("vgId:%d, tmq task start to execute, consumer:0x%"PRIx64, vgId, pHandle->consumerId); code = qExecTask(task, &pDataBlock, &ts); if (code != TSDB_CODE_SUCCESS) { diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 1b71a41586..60463bad5f 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -1156,6 +1156,11 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT } STableKeyInfo keyInfo = {.uid = uid}; + int64_t oldSkey = pScanBaseInfo->cond.twindows.skey; + + // let's start from the next ts that returned to consumer. + pScanBaseInfo->cond.twindows.skey = ts + 1; + if (pScanBaseInfo->dataReader == NULL) { int32_t code = tsdbReaderOpen(pScanBaseInfo->readHandle.vnode, &pScanBaseInfo->cond, &keyInfo, 1, pScanInfo->pResBlock, &pScanBaseInfo->dataReader, NULL); @@ -1164,21 +1169,20 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT terrno = code; return -1; } + + qDebug("tsdb reader created with offset(snapshot) uid:%" PRId64 " ts %" PRId64 " table index:%d, total:%d, %s", + uid, ts, pScanInfo->currentTable, numOfTables, id); } else { tsdbSetTableList(pScanBaseInfo->dataReader, &keyInfo, 1); - int64_t oldSkey = pScanBaseInfo->cond.twindows.skey; - - // let's start from the next ts that returned to consumer. - pScanBaseInfo->cond.twindows.skey = ts + 1; tsdbReaderReset(pScanBaseInfo->dataReader, &pScanBaseInfo->cond); - - // restore the key value - pScanBaseInfo->cond.twindows.skey = oldSkey; pScanInfo->scanTimes = 0; qDebug("tsdb reader offset seek snapshot to uid:%" PRId64 " ts %" PRId64 " table index:%d numOfTable:%d, %s", uid, ts, pScanInfo->currentTable, numOfTables, id); } + + // restore the key value + pScanBaseInfo->cond.twindows.skey = oldSkey; } else { qError("invalid pOffset->type:%d, %s", pOffset->type, id); return -1; From 06aa7d3750c74193711797bffcf3445598beb325 Mon Sep 17 00:00:00 2001 From: slzhou Date: Thu, 30 Mar 2023 08:54:00 +0800 Subject: [PATCH 12/23] fix: add test case for interval after join --- tests/parallel_test/cases.task | 1 + tests/script/tsim/query/join_interval.sim | 42 +++++++++++++++++++++++ 2 files changed, 43 insertions(+) create mode 100644 tests/script/tsim/query/join_interval.sim diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index 260d47032a..cf2d62a2c3 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -868,6 +868,7 @@ ,,y,script,./test.sh -f tsim/query/session.sim ,,y,script,./test.sh -f tsim/query/udf.sim ,,y,script,./test.sh -f tsim/query/udf_with_const.sim +,,y,script,./test.sh -f tsim/query/join_interval.sim ,,y,script,./test.sh -f tsim/query/sys_tbname.sim ,,y,script,./test.sh -f tsim/query/groupby.sim ,,y,script,./test.sh -f tsim/query/event.sim diff --git a/tests/script/tsim/query/join_interval.sim b/tests/script/tsim/query/join_interval.sim new file mode 100644 index 0000000000..14994a5cc1 --- /dev/null +++ b/tests/script/tsim/query/join_interval.sim @@ -0,0 +1,42 @@ + +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 +system sh/cfg.sh -n dnode1 -c udf -v 1 +system sh/exec.sh -n dnode1 -s start +sql connect + +print ======== step create databases +sql create database d1 +sql create database d2 +sql create table d1.t1(ts timestamp, i int) tags(t int); +sql create table d2.t1(ts timestamp, i int); +sql insert into d1.t11 using d1.t1 tags(1) values(1500000000000, 0)(1500000000001, 1)(1500000000002,2)(1500000000003,3)(1500000000004,4) +sql insert into d1.t12 using d1.t1 tags(2) values(1500000000000, 0)(1500000000001, 1)(1500000000002,2)(1500000000003,3)(1500000000004,4) +sql insert into d1.t13 using d1.t1 tags(3) values(1500000000000, 0)(1500000000001, 1)(1500000000002,2)(1500000000003,3)(1500000000004,4) + +sql insert into d2.t1 values(1500000000000,0)(1500000000001,1)(1500000000002,2) + +sql select _wstart,_wend,count((a.ts)),count(b.ts) from d1.t1 a, d2.t1 b where a.ts is not null and a.ts = b.ts interval(1a) ; +if $data02 != 3 then + return -1 +endi + +if $data03 != 3 then + return -1 +endi + +if $data12 != 3 then + return -1 +endi + +if $data13 != 3 then + return -1 +endi +if $data22 != 3 then + return -1 +endi + +if $data23 != 3 then + return -1 +endi +system sh/exec.sh -n dnode1 -s stop -x SIGINT From f93efc8538f4c4eeebc8d5b0ae93492776b0ac7a Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Thu, 30 Mar 2023 09:21:34 +0800 Subject: [PATCH 13/23] fix:telemetry.py failed in windows --- tests/system-test/0-others/telemetry.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/system-test/0-others/telemetry.py b/tests/system-test/0-others/telemetry.py index bc5d276faa..c62e3c2487 100644 --- a/tests/system-test/0-others/telemetry.py +++ b/tests/system-test/0-others/telemetry.py @@ -181,7 +181,7 @@ class TDTestCase: def run(self): # sourcery skip: extract-duplicate-method, remove-redundant-fstring tdSql.prepare() # time.sleep(2) - vgroups = "30" + vgroups = "8" sql = "create database db3 vgroups " + vgroups tdSql.query(sql) From 347c08bfeac0fe28c28adf46e300785e81d81ba3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Chappyguoxy=E2=80=9D?= <“happy_guoxy@163.com”> Date: Thu, 30 Mar 2023 10:17:52 +0800 Subject: [PATCH 14/23] test: refine query cases --- tests/parallel_test/cases.task | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index ab7540ee59..fcece56816 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -10,22 +10,21 @@ ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_str.py ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_math.py ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_time.py +,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_26.py ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_str.py -Q 2 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_math.py -Q 2 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_time.py -Q 2 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery.py -Q 2 +,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_26.py -Q 2 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/columnLenUpdated.py ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/columnLenUpdated.py -Q 2 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/columnLenUpdated.py -Q 3 -,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery.py -Q 3 -,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_str.py -Q 3 -,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_math.py -Q 3 -,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_time.py -Q 3 +,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/columnLenUpdated.py -Q 4 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery.py -Q 4 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_str.py -Q 4 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_math.py -Q 4 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_time.py -Q 4 -,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/columnLenUpdated.py -Q 4 +,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_26.py -Q 4 ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqShow.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqDropStb.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/subscribeStb0.py @@ -47,6 +46,11 @@ ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/concat.py -Q 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/out_of_order.py -Q 2 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/out_of_order.py -Q 4 +,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery.py -Q 3 +,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_str.py -Q 3 +,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_math.py -Q 3 +,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_time.py -Q 3 +,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_26.py -Q 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/create_wrong_topic.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/dropDbR3ConflictTransaction.py -N 3 From 2e8e5a98e9125e3729c4956266c19289b3019470 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Chappyguoxy=E2=80=9D?= <“happy_guoxy@163.com”> Date: Thu, 30 Mar 2023 10:18:05 +0800 Subject: [PATCH 15/23] test: refine query cases --- tests/system-test/2-query/nestedQuery.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/system-test/2-query/nestedQuery.py b/tests/system-test/2-query/nestedQuery.py index 6557aad05f..1b843defce 100755 --- a/tests/system-test/2-query/nestedQuery.py +++ b/tests/system-test/2-query/nestedQuery.py @@ -6144,7 +6144,7 @@ class TDTestCase: startTime = time.time() - self.function_before_26() + #self.function_before_26() self.math_nest(['UNIQUE']) self.math_nest(['MODE']) @@ -6157,9 +6157,9 @@ class TDTestCase: # self.math_nest(['MAVG']) # self.math_nest(['HYPERLOGLOG']) # self.math_nest(['TAIL']) - # self.math_nest(['CSUM']) - # self.math_nest(['statecount','stateduration']) - # self.math_nest(['HISTOGRAM']) + self.math_nest(['CSUM']) + self.math_nest(['statecount','stateduration']) + self.math_nest(['HISTOGRAM']) # self.str_nest(['LTRIM','RTRIM','LOWER','UPPER']) # self.str_nest(['LENGTH','CHAR_LENGTH']) From 01ae6da8a45c2d2e1af2fecb1d2d9d33e687e062 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Chappyguoxy=E2=80=9D?= <“happy_guoxy@163.com”> Date: Thu, 30 Mar 2023 10:18:22 +0800 Subject: [PATCH 16/23] test: refine query cases --- tests/system-test/2-query/nestedQuery_math.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/system-test/2-query/nestedQuery_math.py b/tests/system-test/2-query/nestedQuery_math.py index 2d0bbcb352..3e37a2c15e 100755 --- a/tests/system-test/2-query/nestedQuery_math.py +++ b/tests/system-test/2-query/nestedQuery_math.py @@ -34,9 +34,9 @@ class TDTestCase(TDTestCase): self.math_nest(['MAVG']) self.math_nest(['HYPERLOGLOG']) self.math_nest(['TAIL']) - self.math_nest(['CSUM']) - self.math_nest(['statecount','stateduration']) - self.math_nest(['HISTOGRAM']) + # self.math_nest(['CSUM']) + # self.math_nest(['statecount','stateduration']) + # self.math_nest(['HISTOGRAM']) # self.str_nest(['LTRIM','RTRIM','LOWER','UPPER']) # self.str_nest(['LENGTH','CHAR_LENGTH']) From 8825b8931441684d7ee070629ea4df0babd52df8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Chappyguoxy=E2=80=9D?= <“happy_guoxy@163.com”> Date: Thu, 30 Mar 2023 10:18:36 +0800 Subject: [PATCH 17/23] test: refine query cases --- tests/system-test/2-query/out_of_order.py | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/tests/system-test/2-query/out_of_order.py b/tests/system-test/2-query/out_of_order.py index 148b89fc58..47a9cc3c11 100644 --- a/tests/system-test/2-query/out_of_order.py +++ b/tests/system-test/2-query/out_of_order.py @@ -162,19 +162,18 @@ class TDTestCase: sql = "select count(*) from (select distinct(tbname) from %s.meters)" %dbname tdSql.query(sql) - num = tdSql.getData(0,0) + # 目前不需要了 + # num = tdSql.getData(0,0) - for i in range(0,num): - sql1 = "select count(*) from %s.d%d" %(dbname,i) - tdSql.query(sql1) - sql1_result = tdSql.getData(0,0) - tdLog.info("sql:%s , result: %s" %(sql1,sql1_result)) + # for i in range(0,num): + # sql1 = "select count(*) from %s.d%d" %(dbname,i) + # tdSql.query(sql1) + # sql1_result = tdSql.getData(0,0) + # tdLog.info("sql:%s , result: %s" %(sql1,sql1_result)) + def check_out_of_order(self,dbname,tables,per_table_num,order,replica): self.run_benchmark(dbname,tables,per_table_num,order,replica) - print("sleep 10 seconds") - #time.sleep(10) - print("sleep 10 seconds finish") self.run_sql(dbname) From cf99c2e69dc207fd80e81dee5cf0872b7739004a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Chappyguoxy=E2=80=9D?= <“happy_guoxy@163.com”> Date: Thu, 30 Mar 2023 10:18:53 +0800 Subject: [PATCH 18/23] test: refine query cases --- tests/system-test/2-query/nestedQuery_26.py | 76 +++++++++++++++++++++ 1 file changed, 76 insertions(+) create mode 100755 tests/system-test/2-query/nestedQuery_26.py diff --git a/tests/system-test/2-query/nestedQuery_26.py b/tests/system-test/2-query/nestedQuery_26.py new file mode 100755 index 0000000000..9d5f31d1e0 --- /dev/null +++ b/tests/system-test/2-query/nestedQuery_26.py @@ -0,0 +1,76 @@ +################################################################### +# Copyright (c) 2016 by TAOS Technologies, Inc. +# All rights reserved. +# +# This file is proprietary and confidential to TAOS Technologies. +# No part of this file may be reproduced, stored, transmitted, +# disclosed or used in any form or by any means other than as +# expressly provided by the written permission from Jianhui Tao +# +################################################################### + +# -*- coding: utf-8 -*- +from util.cases import tdCases +from .nestedQuery import * + +class TDTestCase(TDTestCase): + + + def run(self): + tdSql.prepare() + + startTime = time.time() + + self.function_before_26() + + # self.math_nest(['UNIQUE']) + # self.math_nest(['MODE']) + # self.math_nest(['SAMPLE']) + + # self.math_nest(['ABS','SQRT']) + # self.math_nest(['SIN','COS','TAN','ASIN','ACOS','ATAN']) + # self.math_nest(['POW','LOG']) + # self.math_nest(['FLOOR','CEIL','ROUND']) + # self.math_nest(['MAVG']) + # self.math_nest(['HYPERLOGLOG']) + # self.math_nest(['TAIL']) + # self.math_nest(['CSUM']) + # self.math_nest(['statecount','stateduration']) + # self.math_nest(['HISTOGRAM']) + + # self.str_nest(['LTRIM','RTRIM','LOWER','UPPER']) + # self.str_nest(['LENGTH','CHAR_LENGTH']) + # self.str_nest(['SUBSTR']) + # self.str_nest(['CONCAT']) + # self.str_nest(['CONCAT_WS']) + # self.time_nest(['CAST']) #放到time里起来弄 + # self.time_nest(['CAST_1']) + # self.time_nest(['CAST_2']) + # self.time_nest(['CAST_3']) + # self.time_nest(['CAST_4']) + + + + # self.time_nest(['NOW','TODAY']) + # self.time_nest(['TIMEZONE']) + # self.time_nest(['TIMETRUNCATE']) + # self.time_nest(['TO_ISO8601']) + # self.time_nest(['TO_UNIXTIMESTAMP']) + # self.time_nest(['ELAPSED']) + #self.time_nest(['TIMEDIFF_1']) + #self.time_nest(['TIMEDIFF_2']) + + + endTime = time.time() + print("total time %ds" % (endTime - startTime)) + + + + + def stop(self): + tdSql.close() + tdLog.success("%s successfully executed" % __file__) + + +tdCases.addWindows(__file__, TDTestCase()) +tdCases.addLinux(__file__, TDTestCase()) From 2aeda10174be4cfb674009683710db6b93220843 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Chappyguoxy=E2=80=9D?= <“happy_guoxy@163.com”> Date: Thu, 30 Mar 2023 10:22:17 +0800 Subject: [PATCH 19/23] test: refine query cases --- tests/system-test/2-query/out_of_order.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/system-test/2-query/out_of_order.py b/tests/system-test/2-query/out_of_order.py index 47a9cc3c11..ff2b71193b 100644 --- a/tests/system-test/2-query/out_of_order.py +++ b/tests/system-test/2-query/out_of_order.py @@ -181,7 +181,7 @@ class TDTestCase: startTime = time.time() #self.check_out_of_order('db1',10,random.randint(10000,50000),random.randint(1,10),1) - self.check_out_of_order('db1',random.randint(50,200),random.randint(10000,20000),random.randint(1,5),1) + self.check_out_of_order('db1',random.randint(50,100),random.randint(10000,20000),random.randint(1,5),1) # self.check_out_of_order('db2',random.randint(50,200),random.randint(10000,50000),random.randint(5,50),1) From 07171a01b4da0962919ddf8dca19e31181655520 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Thu, 30 Mar 2023 10:52:39 +0800 Subject: [PATCH 20/23] fix:taosdMonitor.py failed in windows --- tests/system-test/0-others/taosdMonitor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/system-test/0-others/taosdMonitor.py b/tests/system-test/0-others/taosdMonitor.py index 944ff52d5b..195f1ba5bc 100644 --- a/tests/system-test/0-others/taosdMonitor.py +++ b/tests/system-test/0-others/taosdMonitor.py @@ -292,7 +292,7 @@ class TDTestCase: def run(self): # sourcery skip: extract-duplicate-method, remove-redundant-fstring tdSql.prepare() # time.sleep(2) - vgroups = "30" + vgroups = "8" sql = "create database db3 vgroups " + vgroups tdSql.query(sql) sql = "create table db3.stb (ts timestamp, f int) tags (t int)" From 8f3c41fc709d38a9735d644fa8cfd1759fcfc65a Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Thu, 30 Mar 2023 11:03:40 +0800 Subject: [PATCH 21/23] fix: add more column update cases --- tests/system-test/2-query/columnLenUpdated.py | 24 +++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/tests/system-test/2-query/columnLenUpdated.py b/tests/system-test/2-query/columnLenUpdated.py index ea01cd623c..3b87cbe22a 100644 --- a/tests/system-test/2-query/columnLenUpdated.py +++ b/tests/system-test/2-query/columnLenUpdated.py @@ -147,6 +147,30 @@ class TDTestCase: tdSql.checkData(1, 1, '55555') + + keyDict['s'] = "\"alter table db1.tba add column f2 binary(5) \"" + retCode = taos_command(buildPath, "s", keyDict['s'], "Query OK", '') + if retCode != "TAOS_OK": + tdLog.exit("taos -s fail") + + tdSql.query("select * from tba order by ts") + tdSql.query("select * from tba order by ts") + tdSql.checkData(0, 2, None) + tdSql.checkData(1, 2, None) + + + + + keyDict['s'] = "\"alter table db1.tba add column f3 binary(5) \"" + retCode = taos_command(buildPath, "s", keyDict['s'], "Query OK", '') + if retCode != "TAOS_OK": + tdLog.exit("taos -s fail") + + tdSql.query("select f3 from tba order by ts") + tdSql.checkData(0, 0, None) + tdSql.checkData(1, 0, None) + + tdSql.query("create table stb (ts timestamp, f1 int, f2 binary(2)) tags (tg1 binary(2))") tdSql.query("create table tb1 using stb tags('bb')") tdSql.query("insert into tb1 values (now, 2,'22')") From e1d85ce0747f465d79f49ffd07ddee89de854f96 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 30 Mar 2023 12:04:47 +0800 Subject: [PATCH 22/23] fix(tmq): fix error. --- source/libs/executor/src/executor.c | 18 ++++++++++-------- source/libs/executor/src/scanoperator.c | 3 ++- tests/system-test/7-tmq/subscribeDb.py | 4 ++-- .../7-tmq/tmqConsFromTsdb-mutilVg.py | 2 ++ 4 files changed, 16 insertions(+), 11 deletions(-) diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 60463bad5f..656ffda0ca 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -1145,13 +1145,16 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT pInfo->pTableScanOp->resultInfo.totalRows = 0; // start from current accessed position - index = tableListFind(pTableListInfo, uid, pScanInfo->currentTable); + // we cannot start from the pScanInfo->currentTable, since the commit offset may cause the rollback of the start + // position, let's find it from the beginning. + index = tableListFind(pTableListInfo, uid, 0); taosRUnLockLatch(&pTaskInfo->lock); if (index >= 0) { pScanInfo->currentTable = index; } else { - qError("uid:%" PRIu64 " not found in table list, total:%d %s", uid, numOfTables, id); + qError("vgId:%d uid:%" PRIu64 " not found in table list, total:%d, index:%d %s", pTaskInfo->id.vgId, uid, + numOfTables, pScanInfo->currentTable, id); return -1; } @@ -1160,25 +1163,24 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT // let's start from the next ts that returned to consumer. pScanBaseInfo->cond.twindows.skey = ts + 1; + pScanInfo->scanTimes = 0; if (pScanBaseInfo->dataReader == NULL) { int32_t code = tsdbReaderOpen(pScanBaseInfo->readHandle.vnode, &pScanBaseInfo->cond, &keyInfo, 1, - pScanInfo->pResBlock, &pScanBaseInfo->dataReader, NULL); + pScanInfo->pResBlock, &pScanBaseInfo->dataReader, id); if (code != TSDB_CODE_SUCCESS) { qError("prepare read tsdb snapshot failed, uid:%" PRId64 ", code:%s %s", pOffset->uid, tstrerror(code), id); terrno = code; return -1; } - qDebug("tsdb reader created with offset(snapshot) uid:%" PRId64 " ts %" PRId64 " table index:%d, total:%d, %s", - uid, ts, pScanInfo->currentTable, numOfTables, id); + qDebug("tsdb reader created with offset(snapshot) uid:%" PRId64 " ts:%" PRId64 " table index:%d, total:%d, %s", + uid, pScanBaseInfo->cond.twindows.skey, pScanInfo->currentTable, numOfTables, id); } else { tsdbSetTableList(pScanBaseInfo->dataReader, &keyInfo, 1); tsdbReaderReset(pScanBaseInfo->dataReader, &pScanBaseInfo->cond); - pScanInfo->scanTimes = 0; - qDebug("tsdb reader offset seek snapshot to uid:%" PRId64 " ts %" PRId64 " table index:%d numOfTable:%d, %s", - uid, ts, pScanInfo->currentTable, numOfTables, id); + uid, pScanBaseInfo->cond.twindows.skey, pScanInfo->currentTable, numOfTables, id); } // restore the key value diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index c13366c560..84317c825b 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -779,13 +779,14 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { // if no data, switch to next table and continue scan pInfo->currentTable++; if (pInfo->currentTable >= numOfTables) { + qDebug("all table checked in table list, total:%d, return NULL, %s", numOfTables, GET_TASKID(pTaskInfo)); return NULL; } STableKeyInfo* pTableInfo = tableListGetInfo(pTaskInfo->pTableInfoList, pInfo->currentTable); tsdbSetTableList(pInfo->base.dataReader, pTableInfo, 1); qDebug("set uid:%" PRIu64 " into scanner, total tables:%d, index:%d/%d %s", pTableInfo->uid, numOfTables, - pInfo->currentTable, numOfTables, pTaskInfo->id.str); + pInfo->currentTable, numOfTables, GET_TASKID(pTaskInfo)); tsdbReaderReset(pInfo->base.dataReader, &pInfo->base.cond); pInfo->scanTimes = 0; diff --git a/tests/system-test/7-tmq/subscribeDb.py b/tests/system-test/7-tmq/subscribeDb.py index 0fa9bcfbd4..c47c218891 100644 --- a/tests/system-test/7-tmq/subscribeDb.py +++ b/tests/system-test/7-tmq/subscribeDb.py @@ -13,11 +13,11 @@ from util.dnodes import * class TDTestCase: hostname = socket.gethostname() - #rpcDebugFlagVal = '143' + rpcDebugFlagVal = '143' #clientCfgDict = {'serverPort': '', 'firstEp': '', 'secondEp':'', 'rpcDebugFlag':'135', 'fqdn':''} #clientCfgDict["rpcDebugFlag"] = rpcDebugFlagVal #updatecfgDict = {'clientCfg': {}, 'serverPort': '', 'firstEp': '', 'secondEp':'', 'rpcDebugFlag':'135', 'fqdn':''} - #updatecfgDict["rpcDebugFlag"] = rpcDebugFlagVal + updatecfgDict["rpcDebugFlag"] = rpcDebugFlagVal #print ("===================: ", updatecfgDict) def init(self, conn, logSql, replicaVar=1): diff --git a/tests/system-test/7-tmq/tmqConsFromTsdb-mutilVg.py b/tests/system-test/7-tmq/tmqConsFromTsdb-mutilVg.py index 87832ac0ef..11fc7dbcc0 100644 --- a/tests/system-test/7-tmq/tmqConsFromTsdb-mutilVg.py +++ b/tests/system-test/7-tmq/tmqConsFromTsdb-mutilVg.py @@ -16,6 +16,8 @@ sys.path.append("./7-tmq") from tmqCommon import * class TDTestCase: + updatecfgDict = {"tsdbDebugFlag":135} + def __init__(self): self.vgroups = 4 self.ctbNum = 10 From 71d2620259e2313e0cf3446d21f182a48653096b Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 30 Mar 2023 12:58:52 +0800 Subject: [PATCH 23/23] other: update the test case. --- tests/system-test/7-tmq/subscribeDb.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/system-test/7-tmq/subscribeDb.py b/tests/system-test/7-tmq/subscribeDb.py index c47c218891..41db943f83 100644 --- a/tests/system-test/7-tmq/subscribeDb.py +++ b/tests/system-test/7-tmq/subscribeDb.py @@ -13,11 +13,11 @@ from util.dnodes import * class TDTestCase: hostname = socket.gethostname() - rpcDebugFlagVal = '143' + # rpcDebugFlagVal = '143' #clientCfgDict = {'serverPort': '', 'firstEp': '', 'secondEp':'', 'rpcDebugFlag':'135', 'fqdn':''} #clientCfgDict["rpcDebugFlag"] = rpcDebugFlagVal #updatecfgDict = {'clientCfg': {}, 'serverPort': '', 'firstEp': '', 'secondEp':'', 'rpcDebugFlag':'135', 'fqdn':''} - updatecfgDict["rpcDebugFlag"] = rpcDebugFlagVal + # updatecfgDict["rpcDebugFlag"] = rpcDebugFlagVal #print ("===================: ", updatecfgDict) def init(self, conn, logSql, replicaVar=1):