diff --git a/include/libs/executor/storageapi.h b/include/libs/executor/storageapi.h index 3433f98d38..04bc839d51 100644 --- a/include/libs/executor/storageapi.h +++ b/include/libs/executor/storageapi.h @@ -36,6 +36,7 @@ extern "C" { #define CACHESCAN_RETRIEVE_LAST_ROW 0x4 #define CACHESCAN_RETRIEVE_LAST 0x8 +#define META_READER_LOCK 0x0 #define META_READER_NOLOCK 0x1 #define STREAM_STATE_BUFF_HASH 1 diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 50e3dd0d03..55317d03af 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1508,6 +1508,8 @@ static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock } } + pBlock->info.rows = numOfRows; + destroyStreamTaskIter(pIter); taosRUnLockLatch(&pStream->lock); diff --git a/source/dnode/vnode/src/meta/metaQuery.c b/source/dnode/vnode/src/meta/metaQuery.c index 2bf73198aa..cc7ae03483 100644 --- a/source/dnode/vnode/src/meta/metaQuery.c +++ b/source/dnode/vnode/src/meta/metaQuery.c @@ -143,7 +143,7 @@ tb_uid_t metaGetTableEntryUidByName(SMeta *pMeta, const char *name) { int metaGetTableNameByUid(void *pVnode, uint64_t uid, char *tbName) { int code = 0; SMetaReader mr = {0}; - metaReaderDoInit(&mr, ((SVnode *)pVnode)->pMeta, 0); + metaReaderDoInit(&mr, ((SVnode *)pVnode)->pMeta, META_READER_LOCK); code = metaReaderGetTableEntryByUid(&mr, uid); if (code < 0) { metaReaderClear(&mr); @@ -159,7 +159,7 @@ int metaGetTableNameByUid(void *pVnode, uint64_t uid, char *tbName) { int metaGetTableSzNameByUid(void *meta, uint64_t uid, char *tbName) { int code = 0; SMetaReader mr = {0}; - metaReaderDoInit(&mr, (SMeta *)meta, 0); + metaReaderDoInit(&mr, (SMeta *)meta, META_READER_LOCK); code = metaReaderGetTableEntryByUid(&mr, uid); if (code < 0) { metaReaderClear(&mr); @@ -174,7 +174,7 @@ int metaGetTableSzNameByUid(void *meta, uint64_t uid, char *tbName) { int metaGetTableUidByName(void *pVnode, char *tbName, uint64_t *uid) { int code = 0; SMetaReader mr = {0}; - metaReaderDoInit(&mr, ((SVnode *)pVnode)->pMeta, 0); + metaReaderDoInit(&mr, ((SVnode *)pVnode)->pMeta, META_READER_LOCK); SMetaReader *pReader = &mr; @@ -195,7 +195,7 @@ int metaGetTableUidByName(void *pVnode, char *tbName, uint64_t *uid) { int metaGetTableTypeByName(void *pVnode, char *tbName, ETableType *tbType) { int code = 0; SMetaReader mr = {0}; - metaReaderDoInit(&mr, ((SVnode *)pVnode)->pMeta, 0); + metaReaderDoInit(&mr, ((SVnode *)pVnode)->pMeta, META_READER_LOCK); code = metaGetTableEntryByName(&mr, tbName); if (code == 0) *tbType = mr.me.type; @@ -215,7 +215,7 @@ int metaReadNext(SMetaReader *pReader) { int metaGetTableTtlByUid(void *meta, uint64_t uid, int64_t *ttlDays) { int code = -1; SMetaReader mr = {0}; - metaReaderDoInit(&mr, (SMeta *)meta, 0); + metaReaderDoInit(&mr, (SMeta *)meta, META_READER_LOCK); code = metaReaderGetTableEntryByUid(&mr, uid); if (code < 0) { goto _exit; @@ -275,7 +275,7 @@ void metaPauseTbCursor(SMTbCursor *pTbCur) { } void metaResumeTbCursor(SMTbCursor *pTbCur, int8_t first) { if (pTbCur->paused) { - metaReaderDoInit(&pTbCur->mr, pTbCur->pMeta, 0); + metaReaderDoInit(&pTbCur->mr, pTbCur->pMeta, META_READER_LOCK); tdbTbcOpen(((SMeta *)pTbCur->pMeta)->pUidIdx, (TBC **)&pTbCur->pDbc, NULL); @@ -820,7 +820,7 @@ STSmaWrapper *metaGetSmaInfoByTable(SMeta *pMeta, tb_uid_t uid, bool deepCopy) { } SMetaReader mr = {0}; - metaReaderDoInit(&mr, pMeta, 0); + metaReaderDoInit(&mr, pMeta, META_READER_LOCK); int64_t smaId; int smaIdx = 0; STSma *pTSma = NULL; @@ -875,7 +875,7 @@ _err: STSma *metaGetSmaInfoByIndex(SMeta *pMeta, int64_t indexUid) { STSma *pTSma = NULL; SMetaReader mr = {0}; - metaReaderDoInit(&mr, pMeta, 0); + metaReaderDoInit(&mr, pMeta, META_READER_LOCK); if (metaReaderGetTableEntryByUid(&mr, indexUid) < 0) { metaWarn("vgId:%d, failed to get table entry for smaId:%" PRIi64, TD_VID(pMeta->pVnode), indexUid); metaReaderClear(&mr); diff --git a/source/dnode/vnode/src/meta/metaSma.c b/source/dnode/vnode/src/meta/metaSma.c index 91704f5c7a..c61725e834 100644 --- a/source/dnode/vnode/src/meta/metaSma.c +++ b/source/dnode/vnode/src/meta/metaSma.c @@ -37,7 +37,7 @@ int32_t metaCreateTSma(SMeta *pMeta, int64_t version, SSmaCfg *pCfg) { // validate req // save smaIndex - metaReaderDoInit(&mr, pMeta, 0); + metaReaderDoInit(&mr, pMeta, META_READER_LOCK); if (metaReaderGetTableEntryByUidCache(&mr, pCfg->indexUid) == 0) { #if 1 terrno = TSDB_CODE_TSMA_ALREADY_EXIST; diff --git a/source/dnode/vnode/src/meta/metaTable.c b/source/dnode/vnode/src/meta/metaTable.c index 4a978c8f41..3c032f193a 100644 --- a/source/dnode/vnode/src/meta/metaTable.c +++ b/source/dnode/vnode/src/meta/metaTable.c @@ -819,7 +819,7 @@ int metaCreateTable(SMeta *pMeta, int64_t ver, SVCreateTbReq *pReq, STableMetaRs } // validate req - metaReaderDoInit(&mr, pMeta, 0); + metaReaderDoInit(&mr, pMeta, META_READER_LOCK); if (metaGetTableEntryByName(&mr, pReq->name) == 0) { if (pReq->type == TSDB_CHILD_TABLE && pReq->ctb.suid != mr.me.ctbEntry.suid) { terrno = TSDB_CODE_TDB_TABLE_IN_OTHER_STABLE; diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index 621651507e..5441d0c4c1 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -1097,7 +1097,7 @@ static int32_t tdRSmaRestoreQTaskInfoInit(SSma *pSma, int64_t *nTables) { } int64_t nRsmaTables = 0; - metaReaderDoInit(&mr, SMA_META(pSma), 0); + metaReaderDoInit(&mr, SMA_META(pSma), META_READER_LOCK); if (!(uidStore.tbUids = taosArrayInit(1024, sizeof(tb_uid_t)))) { code = TSDB_CODE_OUT_OF_MEMORY; TSDB_CHECK_CODE(code, lino, _exit); diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 110cac152d..4d6fb11747 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -115,7 +115,7 @@ bool isValValidForTable(STqHandle* pHandle, SWalCont* pHead) { } SMetaReader mr = {0}; - metaReaderDoInit(&mr, pHandle->execHandle.pTqReader->pVnodeMeta, 0); + metaReaderDoInit(&mr, pHandle->execHandle.pTqReader->pVnodeMeta, META_READER_LOCK); if (metaGetTableEntryByName(&mr, req.tbName) < 0) { metaReaderClear(&mr); @@ -666,20 +666,19 @@ int32_t tqRetrieveDataBlock(STqReader* pReader, SSDataBlock** pRes, const char* int32_t targetIdx = 0; int32_t sourceIdx = 0; while (targetIdx < colActual) { + SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, targetIdx); + if (sourceIdx >= numOfCols) { - tqError("tqRetrieveDataBlock sourceIdx:%d >= numOfCols:%d", sourceIdx, numOfCols); - return -1; + tqError("lostdata tqRetrieveDataBlock sourceIdx:%d >= numOfCols:%d", sourceIdx, numOfCols); + colDataSetNNULL(pColData, 0, numOfRows); + targetIdx++; + continue; } SColData* pCol = taosArrayGet(pCols, sourceIdx); - SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, targetIdx); SColVal colVal; - if (pCol->nVal != numOfRows) { - tqError("tqRetrieveDataBlock pCol->nVal:%d != numOfRows:%d", pCol->nVal, numOfRows); - return -1; - } - + tqTrace("lostdata colActual:%d, sourceIdx:%d, targetIdx:%d, numOfCols:%d, source cid:%d, dst cid:%d", colActual, sourceIdx, targetIdx, numOfCols, pCol->cid, pColData->info.colId); if (pCol->cid < pColData->info.colId) { sourceIdx++; } else if (pCol->cid == pColData->info.colId) { @@ -693,7 +692,7 @@ int32_t tqRetrieveDataBlock(STqReader* pReader, SSDataBlock** pRes, const char* sourceIdx++; targetIdx++; } else { - colDataSetNNULL(pColData, 0, pCol->nVal); + colDataSetNNULL(pColData, 0, numOfRows); targetIdx++; } } @@ -712,9 +711,6 @@ int32_t tqRetrieveDataBlock(STqReader* pReader, SSDataBlock** pRes, const char* SColVal colVal; tRowGet(pRow, pTSchema, sourceIdx, &colVal); if (colVal.cid < pColData->info.colId) { - // tqDebug("colIndex:%d column id:%d in row, ignore, the required colId:%d, total cols in - // schema:%d", - // sourceIdx, colVal.cid, pColData->info.colId, pTSchema->numOfCols); sourceIdx++; continue; } else if (colVal.cid == pColData->info.colId) { diff --git a/source/dnode/vnode/src/tq/tqScan.c b/source/dnode/vnode/src/tq/tqScan.c index 103007eb57..9940164ee2 100644 --- a/source/dnode/vnode/src/tq/tqScan.c +++ b/source/dnode/vnode/src/tq/tqScan.c @@ -47,7 +47,7 @@ static int32_t tqAddBlockSchemaToRsp(const STqExecHandle* pExec, STaosxRsp* pRsp static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, STaosxRsp* pRsp, int32_t n) { SMetaReader mr = {0}; - metaReaderDoInit(&mr, pTq->pVnode->pMeta, 0); + metaReaderDoInit(&mr, pTq->pVnode->pMeta, META_READER_LOCK); // TODO add reference to gurantee success if (metaReaderGetTableEntryByUidCache(&mr, uid) < 0) { diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index 9ff4e3ba62..5374b9aa78 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -612,7 +612,7 @@ int32_t doWaitForDstTableCreated(SVnode* pVnode, SStreamTask* pTask, STableSinkI // wait for the table to be created SMetaReader mr = {0}; - metaReaderDoInit(&mr, pVnode->pMeta, 0); + metaReaderDoInit(&mr, pVnode->pMeta, META_READER_LOCK); int32_t code = metaGetTableEntryByName(&mr, dstTableName); if (code == 0) { // table already exists, check its type and uid @@ -705,7 +705,7 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat // those mismatched table uids. Only the FIRST table has the correct table uid, and those remain all have // randomly generated, but false table uid in the WAL. SMetaReader mr = {0}; - metaReaderDoInit(&mr, pVnode->pMeta, 0); + metaReaderDoInit(&mr, pVnode->pMeta, META_READER_LOCK); // table not in cache, let's try the extract it from tsdb meta if (metaGetTableEntryByName(&mr, dstTableName) < 0) { diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index 0848fd0076..37f82ef873 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -2014,7 +2014,7 @@ static tb_uid_t getTableSuidByUid(tb_uid_t uid, STsdb *pTsdb) { tb_uid_t suid = 0; SMetaReader mr = {0}; - metaReaderDoInit(&mr, pTsdb->pVnode->pMeta, 0); + metaReaderDoInit(&mr, pTsdb->pVnode->pMeta, META_READER_LOCK); if (metaReaderGetTableEntryByUidCache(&mr, uid) < 0) { metaReaderClear(&mr); // table not esist return 0; diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index d740f9491c..c0ad32d269 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -5036,7 +5036,7 @@ int64_t tsdbGetNumOfRowsInMemTable2(STsdbReader* pReader) { int32_t tsdbGetTableSchema(SMeta* pMeta, int64_t uid, STSchema** pSchema, int64_t* suid) { SMetaReader mr = {0}; - metaReaderDoInit(&mr, pMeta, 0); + metaReaderDoInit(&mr, pMeta, META_READER_LOCK); int32_t code = metaReaderGetTableEntryByUidCache(&mr, uid); if (code != TSDB_CODE_SUCCESS) { terrno = TSDB_CODE_TDB_INVALID_TABLE_ID; diff --git a/source/dnode/vnode/src/vnd/vnodeQuery.c b/source/dnode/vnode/src/vnd/vnodeQuery.c index d1c811858a..d244d5b6bf 100644 --- a/source/dnode/vnode/src/vnd/vnodeQuery.c +++ b/source/dnode/vnode/src/vnd/vnodeQuery.c @@ -63,7 +63,7 @@ int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, bool direct) { } // query meta - metaReaderDoInit(&mer1, pVnode->pMeta, 0); + metaReaderDoInit(&mer1, pVnode->pMeta, META_READER_LOCK); if (metaGetTableEntryByName(&mer1, infoReq.tbName) < 0) { code = terrno; @@ -179,7 +179,7 @@ int vnodeGetTableCfg(SVnode *pVnode, SRpcMsg *pMsg, bool direct) { } // query meta - metaReaderDoInit(&mer1, pVnode->pMeta, 0); + metaReaderDoInit(&mer1, pVnode->pMeta, META_READER_LOCK); if (metaGetTableEntryByName(&mer1, cfgReq.tbName) < 0) { code = terrno; @@ -192,7 +192,7 @@ int vnodeGetTableCfg(SVnode *pVnode, SRpcMsg *pMsg, bool direct) { code = TSDB_CODE_VND_HASH_MISMATCH; goto _exit; } else if (mer1.me.type == TSDB_CHILD_TABLE) { - metaReaderDoInit(&mer2, pVnode->pMeta, 0); + metaReaderDoInit(&mer2, pVnode->pMeta, META_READER_LOCK); if (metaReaderGetTableEntryByUid(&mer2, mer1.me.ctbEntry.suid) < 0) goto _exit; strcpy(cfgRsp.stbName, mer2.me.name); diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index bb89fb587b..9a480359ed 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -308,7 +308,7 @@ int32_t isQualifiedTable(STableKeyInfo* info, SNode* pTagCond, void* metaHandle, int32_t code = TSDB_CODE_SUCCESS; SMetaReader mr = {0}; - pAPI->metaReaderFn.initReader(&mr, metaHandle, 0, &pAPI->metaFn); + pAPI->metaReaderFn.initReader(&mr, metaHandle, META_READER_LOCK, &pAPI->metaFn); code = pAPI->metaReaderFn.getEntryGetUidCache(&mr, info->uid); if (TSDB_CODE_SUCCESS != code) { pAPI->metaReaderFn.clearReader(&mr); @@ -1225,7 +1225,7 @@ int32_t getGroupIdFromTagsVal(void* pVnode, uint64_t uid, SNodeList* pGroupNode, SStorageAPI* pAPI) { SMetaReader mr = {0}; - pAPI->metaReaderFn.initReader(&mr, pVnode, 0, &pAPI->metaFn); + pAPI->metaReaderFn.initReader(&mr, pVnode, META_READER_LOCK, &pAPI->metaFn); if (pAPI->metaReaderFn.getEntryGetUidCache(&mr, uid) != 0) { // table not exist pAPI->metaReaderFn.clearReader(&mr); return TSDB_CODE_PAR_TABLE_NOT_EXIST; diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 831fd4e883..26a80cc6b5 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -351,7 +351,7 @@ static SArray* filterUnqualifiedTables(const SStreamScanInfo* pScanInfo, const S // let's discard the tables those are not created according to the queried super table. SMetaReader mr = {0}; - pAPI->metaReaderFn.initReader(&mr, pScanInfo->readHandle.vnode, 0, &pAPI->metaFn); + pAPI->metaReaderFn.initReader(&mr, pScanInfo->readHandle.vnode, META_READER_LOCK, &pAPI->metaFn); for (int32_t i = 0; i < numOfUids; ++i) { uint64_t* id = (uint64_t*)taosArrayGet(tableIdList, i); diff --git a/source/libs/executor/src/querytask.c b/source/libs/executor/src/querytask.c index 9eb1c8d653..0fff5fa649 100644 --- a/source/libs/executor/src/querytask.c +++ b/source/libs/executor/src/querytask.c @@ -127,7 +127,7 @@ int32_t initQueriedTableSchemaInfo(SReadHandle* pHandle, SScanPhysiNode* pScanNo SStorageAPI* pAPI = &pTaskInfo->storageAPI; - pAPI->metaReaderFn.initReader(&mr, pHandle->vnode, 0, &pAPI->metaFn); + pAPI->metaReaderFn.initReader(&mr, pHandle->vnode, META_READER_LOCK, &pAPI->metaFn); int32_t code = pAPI->metaReaderFn.getEntryGetUidCache(&mr, pScanNode->uid); if (code != TSDB_CODE_SUCCESS) { qError("failed to get the table meta, uid:0x%" PRIx64 ", suid:0x%" PRIx64 ", %s", pScanNode->uid, pScanNode->suid, diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 51edfcb42c..a6613e589a 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -505,7 +505,7 @@ int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int // 1. check if it is existed in meta cache if (pCache == NULL) { - pHandle->api.metaReaderFn.initReader(&mr, pHandle->vnode, 0, &pHandle->api.metaFn); + pHandle->api.metaReaderFn.initReader(&mr, pHandle->vnode, META_READER_LOCK, &pHandle->api.metaFn); code = pHandle->api.metaReaderFn.getEntryGetUidCache(&mr, pBlock->info.id.uid); if (code != TSDB_CODE_SUCCESS) { // when encounter the TSDB_CODE_PAR_TABLE_NOT_EXIST error, we proceed. @@ -534,7 +534,7 @@ int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int h = taosLRUCacheLookup(pCache->pTableMetaEntryCache, &pBlock->info.id.uid, sizeof(pBlock->info.id.uid)); if (h == NULL) { - pHandle->api.metaReaderFn.initReader(&mr, pHandle->vnode, 0, &pHandle->api.metaFn); + pHandle->api.metaReaderFn.initReader(&mr, pHandle->vnode, META_READER_LOCK, &pHandle->api.metaFn); code = pHandle->api.metaReaderFn.getEntryGetUidCache(&mr, pBlock->info.id.uid); if (code != TSDB_CODE_SUCCESS) { if (terrno == TSDB_CODE_PAR_TABLE_NOT_EXIST) { @@ -2084,6 +2084,8 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { blockDataCleanup(pInfo->pRes); STimeWindow defaultWindow = {.skey = INT64_MIN, .ekey = INT64_MAX}; setBlockIntoRes(pInfo, pRes, &defaultWindow, true); + qDebug("doQueueScan after filter get data from log %" PRId64 " rows, version:%" PRId64, pInfo->pRes->info.rows, + pTaskInfo->streamInfo.currentOffset.version); if (pInfo->pRes->info.rows > 0) { return pInfo->pRes; } @@ -3318,7 +3320,7 @@ static SSDataBlock* doTagScanFromMetaEntry(SOperatorInfo* pOperator) { char str[512] = {0}; int32_t count = 0; SMetaReader mr = {0}; - pAPI->metaReaderFn.initReader(&mr, pInfo->readHandle.vnode, 0, &pAPI->metaFn); + pAPI->metaReaderFn.initReader(&mr, pInfo->readHandle.vnode, META_READER_LOCK, &pAPI->metaFn); while (pInfo->curPos < size && count < pOperator->resultInfo.capacity) { doTagScanOneTable(pOperator, pRes, count, &mr, &pTaskInfo->storageAPI); diff --git a/source/libs/executor/src/sysscanoperator.c b/source/libs/executor/src/sysscanoperator.c index c38e8dfe5a..5735ee298b 100644 --- a/source/libs/executor/src/sysscanoperator.c +++ b/source/libs/executor/src/sysscanoperator.c @@ -465,7 +465,7 @@ static SSDataBlock* sysTableScanUserCols(SOperatorInfo* pOperator) { STR_TO_VARSTR(tableName, pInfo->req.filterTb); SMetaReader smrTable = {0}; - pAPI->metaReaderFn.initReader(&smrTable, pInfo->readHandle.vnode, 0, &pAPI->metaFn); + pAPI->metaReaderFn.initReader(&smrTable, pInfo->readHandle.vnode, META_READER_LOCK, &pAPI->metaFn); int32_t code = pAPI->metaReaderFn.getTableEntryByName(&smrTable, pInfo->req.filterTb); if (code != TSDB_CODE_SUCCESS) { // terrno has been set by pAPI->metaReaderFn.getTableEntryByName, therefore, return directly @@ -485,7 +485,7 @@ static SSDataBlock* sysTableScanUserCols(SOperatorInfo* pOperator) { if (smrTable.me.type == TSDB_CHILD_TABLE) { int64_t suid = smrTable.me.ctbEntry.suid; pAPI->metaReaderFn.clearReader(&smrTable); - pAPI->metaReaderFn.initReader(&smrTable, pInfo->readHandle.vnode, 0, &pAPI->metaFn); + pAPI->metaReaderFn.initReader(&smrTable, pInfo->readHandle.vnode, META_READER_LOCK, &pAPI->metaFn); code = pAPI->metaReaderFn.getTableEntryByUid(&smrTable, suid); if (code != TSDB_CODE_SUCCESS) { // terrno has been set by pAPI->metaReaderFn.getTableEntryByName, therefore, return directly @@ -568,7 +568,7 @@ static SSDataBlock* sysTableScanUserCols(SOperatorInfo* pOperator) { schemaRow = *(SSchemaWrapper**)schema; } else { SMetaReader smrSuperTable = {0}; - pAPI->metaReaderFn.initReader(&smrSuperTable, pInfo->readHandle.vnode, 0, &pAPI->metaFn); + pAPI->metaReaderFn.initReader(&smrSuperTable, pInfo->readHandle.vnode, META_READER_LOCK, &pAPI->metaFn); int code = pAPI->metaReaderFn.getTableEntryByUid(&smrSuperTable, suid); if (code != TSDB_CODE_SUCCESS) { // terrno has been set by pAPI->metaReaderFn.getTableEntryByName, therefore, return directly @@ -657,7 +657,7 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) { STR_TO_VARSTR(tableName, condTableName); SMetaReader smrChildTable = {0}; - pAPI->metaReaderFn.initReader(&smrChildTable, pInfo->readHandle.vnode, 0, &pAPI->metaFn); + pAPI->metaReaderFn.initReader(&smrChildTable, pInfo->readHandle.vnode, META_READER_LOCK, &pAPI->metaFn); int32_t code = pAPI->metaReaderFn.getTableEntryByName(&smrChildTable, condTableName); if (code != TSDB_CODE_SUCCESS) { // terrno has been set by pAPI->metaReaderFn.getTableEntryByName, therefore, return directly @@ -714,7 +714,7 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) { STR_TO_VARSTR(tableName, pInfo->pCur->mr.me.name); SMetaReader smrSuperTable = {0}; - pAPI->metaReaderFn.initReader(&smrSuperTable, pInfo->readHandle.vnode, 0, &pAPI->metaFn); + pAPI->metaReaderFn.initReader(&smrSuperTable, pInfo->readHandle.vnode, META_READER_NOLOCK, &pAPI->metaFn); uint64_t suid = pInfo->pCur->mr.me.ctbEntry.suid; int32_t code = pAPI->metaReaderFn.getTableEntryByUid(&smrSuperTable, suid); if (code != TSDB_CODE_SUCCESS) { @@ -1163,7 +1163,7 @@ static SSDataBlock* sysTableBuildUserTablesByUids(SOperatorInfo* pOperator) { tb_uid_t* uid = taosArrayGet(pIdx->uids, i); SMetaReader mr = {0}; - pAPI->metaReaderFn.initReader(&mr, pInfo->readHandle.vnode, 0, &pAPI->metaFn); + pAPI->metaReaderFn.initReader(&mr, pInfo->readHandle.vnode, META_READER_LOCK, &pAPI->metaFn); ret = pAPI->metaReaderFn.getTableEntryByUid(&mr, *uid); if (ret < 0) { pAPI->metaReaderFn.clearReader(&mr); @@ -2204,7 +2204,7 @@ static int32_t doGetTableRowSize(SReadHandle* pHandle, uint64_t uid, int32_t* ro *rowLen = 0; SMetaReader mr = {0}; - pHandle->api.metaReaderFn.initReader(&mr, pHandle->vnode, 0, &pHandle->api.metaFn); + pHandle->api.metaReaderFn.initReader(&mr, pHandle->vnode, META_READER_LOCK, &pHandle->api.metaFn); int32_t code = pHandle->api.metaReaderFn.getTableEntryByUid(&mr, uid); if (code != TSDB_CODE_SUCCESS) { qError("failed to get table meta, uid:0x%" PRIx64 ", code:%s, %s", uid, tstrerror(terrno), idstr); diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index 7125879bab..851fe2679b 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -229,6 +229,7 @@ ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/dataFromTsdbNWal.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/dataFromTsdbNWal-multiCtb.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_taosx.py +,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_ts4563.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_replay.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqSeekAndCommit.py ,,n,system-test,python3 ./test.py -f 7-tmq/tmq_offset.py diff --git a/tests/system-test/7-tmq/tmq_taosx.py b/tests/system-test/7-tmq/tmq_taosx.py index 0330454b73..94e9babf3c 100644 --- a/tests/system-test/7-tmq/tmq_taosx.py +++ b/tests/system-test/7-tmq/tmq_taosx.py @@ -16,6 +16,7 @@ sys.path.append("./7-tmq") from tmqCommon import * class TDTestCase: + updatecfgDict = {'debugFlag': 135, 'asynclog': 0} def init(self, conn, logSql, replicaVar=1): self.replicaVar = int(replicaVar) tdLog.debug(f"start to excute {__file__}") diff --git a/tests/system-test/7-tmq/tmq_ts4563.py b/tests/system-test/7-tmq/tmq_ts4563.py new file mode 100644 index 0000000000..fc1cc259ce --- /dev/null +++ b/tests/system-test/7-tmq/tmq_ts4563.py @@ -0,0 +1,149 @@ + +import taos +import sys +import time +import socket +import os +import threading + +from util.log import * +from util.sql import * +from util.cases import * +from util.dnodes import * +from util.common import * +from taos.tmq import * +from taos import * + +sys.path.append("./7-tmq") +from tmqCommon import * + +class TDTestCase: + updatecfgDict = {'debugFlag': 143, 'asynclog': 0} + def init(self, conn, logSql, replicaVar=1): + self.replicaVar = int(replicaVar) + tdLog.debug(f"start to excute {__file__}") + tdSql.init(conn.cursor()) + #tdSql.init(conn.cursor(), logSql) # output sql.txt file + + def consumeTest_TS_4563(self): + tdSql.execute(f'use db_stmt') + + tdSql.query("select ts,k from st") + tdSql.checkRows(2) + + tdSql.execute(f'create topic t_unorder_data as select ts,k from st') + consumer_dict = { + "group.id": "g1", + "td.connect.user": "root", + "td.connect.pass": "taosdata", + "auto.offset.reset": "earliest", + } + consumer = Consumer(consumer_dict) + + try: + consumer.subscribe(["t_unorder_data"]) + except TmqError: + tdLog.exit(f"subscribe error") + + cnt = 0 + try: + while True: + res = consumer.poll(1) + print(res) + if not res: + if cnt == 0: + tdLog.exit("consume error") + break + val = res.value() + if val is None: + continue + for block in val: + cnt += len(block.fetchall()) + + if cnt != 2: + tdLog.exit("consume error") + + finally: + consumer.close() + + def getBuildPath(self): + selfPath = os.path.dirname(os.path.realpath(__file__)) + + if ("community" in selfPath): + projPath = selfPath[:selfPath.find("community")] + else: + projPath = selfPath[:selfPath.find("tests")] + + for root, dirs, files in os.walk(projPath): + if ("taosd" in files or "taosd.exe" in files): + rootRealPath = os.path.dirname(os.path.realpath(root)) + if ("packaging" not in rootRealPath): + buildPath = root[:len(root)-len("/build/bin")] + break + return buildPath + + def newcon(self,host,cfg): + user = "root" + password = "taosdata" + port =6030 + con=taos.connect(host=host, user=user, password=password, config=cfg ,port=port) + print(con) + return con + + def test_stmt_insert_multi(self,conn): + # type: (TaosConnection) -> None + + dbname = "db_stmt" + try: + conn.execute("drop database if exists %s" % dbname) + conn.execute("create database if not exists %s" % dbname) + conn.select_db(dbname) + + conn.execute( + "create table st(ts timestamp, i int, j int, k int)", + ) + # conn.load_table_info("log") + tdLog.debug("statement start") + start = datetime.now() + stmt = conn.statement("insert into st(ts,j) values(?, ?)") + + params = new_multi_binds(2) + params[0].timestamp((1626861392589, 1626861392590)) + params[1].int([3, None]) + + # print(type(stmt)) + tdLog.debug("bind_param_batch start") + stmt.bind_param_batch(params) + tdLog.debug("bind_param_batch end") + stmt.execute() + tdLog.debug("execute end") + end = datetime.now() + print("elapsed time: ", end - start) + assert stmt.affected_rows == 2 + tdLog.debug("close start") + + stmt.close() + + # conn.execute("drop database if exists %s" % dbname) + conn.close() + + except Exception as err: + # conn.execute("drop database if exists %s" % dbname) + conn.close() + raise err + + def run(self): + buildPath = self.getBuildPath() + config = buildPath+ "../sim/dnode1/cfg/" + host="localhost" + connectstmt=self.newcon(host,config) + self.test_stmt_insert_multi(connectstmt) + self.consumeTest_TS_4563() + + + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase()) diff --git a/tools/shell/src/shellEngine.c b/tools/shell/src/shellEngine.c index 5c71396ea9..8c78a94de6 100644 --- a/tools/shell/src/shellEngine.c +++ b/tools/shell/src/shellEngine.c @@ -1152,7 +1152,7 @@ void shellSourceFile(const char *file) { bool shellGetGrantInfo(char* buf) { bool community = true; - char sinfo[1024] = {0}; + char sinfo[256] = {0}; tstrncpy(sinfo, taos_get_server_info(shell.conn), sizeof(sinfo)); strtok(sinfo, "\r\n"); @@ -1368,7 +1368,7 @@ int32_t shellExecute() { #ifdef WEBSOCKET if (!shell.args.restful && !shell.args.cloud) { #endif -char buf[512] = ""; +char* buf = taosMemoryMalloc(512); bool community = shellGetGrantInfo(buf); #ifndef WINDOWS printfIntroduction(community); @@ -1383,6 +1383,7 @@ bool community = shellGetGrantInfo(buf); if(!community) { printf("%s\n", buf); } +taosMemoryFree(buf); #ifdef WEBSOCKET }