diff --git a/include/common/tcommon.h b/include/common/tcommon.h index 9e928a79ac..51a714c792 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -185,7 +185,7 @@ typedef struct SBlockID { typedef struct SDataBlockInfo { STimeWindow window; int32_t rowSize; - int32_t rows; // todo hide this attribute + int64_t rows; // todo hide this attribute uint32_t capacity; SBlockID id; int16_t hasVarCol; diff --git a/include/libs/function/function.h b/include/libs/function/function.h index fb6ef26a8a..aa5c78195a 100644 --- a/include/libs/function/function.h +++ b/include/libs/function/function.h @@ -112,7 +112,7 @@ typedef struct SResultDataInfo { typedef struct SInputColumnInfoData { int32_t totalRows; // total rows in current columnar data int32_t startRowIndex; // handle started row index - int32_t numOfRows; // the number of rows needs to be handled + int64_t numOfRows; // the number of rows needs to be handled int32_t numOfInputCols; // PTS is not included bool colDataSMAIsSet; // if agg is set or not SColumnInfoData *pPTS; // primary timestamp column diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 6e2b650b5b..cac8f73042 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -982,7 +982,7 @@ int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo) { taosSort(pColInfoData->pData, pDataBlock->info.rows, pColInfoData->info.bytes, fn); int64_t p1 = taosGetTimestampUs(); - uDebug("blockDataSort easy cost:%" PRId64 ", rows:%d\n", p1 - p0, pDataBlock->info.rows); + uDebug("blockDataSort easy cost:%" PRId64 ", rows:%" PRId64 "\n", p1 - p0, pDataBlock->info.rows); return TSDB_CODE_SUCCESS; } else { // var data type @@ -1189,7 +1189,7 @@ void blockDataCleanup(SSDataBlock* pDataBlock) { void blockDataEmpty(SSDataBlock* pDataBlock) { SDataBlockInfo* pInfo = &pDataBlock->info; - if (pInfo->capacity == 0 || pInfo->rows > pDataBlock->info.capacity) { + if (pInfo->capacity == 0) { return; } @@ -1748,14 +1748,14 @@ int32_t tEncodeDataBlock(void** buf, const SSDataBlock* pBlock) { int64_t tbUid = pBlock->info.id.uid; int16_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); int16_t hasVarCol = pBlock->info.hasVarCol; - int32_t rows = pBlock->info.rows; + int64_t rows = pBlock->info.rows; int32_t sz = taosArrayGetSize(pBlock->pDataBlock); int32_t tlen = 0; tlen += taosEncodeFixedI64(buf, tbUid); tlen += taosEncodeFixedI16(buf, numOfCols); tlen += taosEncodeFixedI16(buf, hasVarCol); - tlen += taosEncodeFixedI32(buf, rows); + tlen += taosEncodeFixedI64(buf, rows); tlen += taosEncodeFixedI32(buf, sz); for (int32_t i = 0; i < sz; i++) { SColumnInfoData* pColData = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, i); @@ -1786,7 +1786,7 @@ void* tDecodeDataBlock(const void* buf, SSDataBlock* pBlock) { buf = taosDecodeFixedU64(buf, &pBlock->info.id.uid); buf = taosDecodeFixedI16(buf, &numOfCols); buf = taosDecodeFixedI16(buf, &pBlock->info.hasVarCol); - buf = taosDecodeFixedI32(buf, &pBlock->info.rows); + buf = taosDecodeFixedI64(buf, &pBlock->info.rows); buf = taosDecodeFixedI32(buf, &sz); pBlock->pDataBlock = taosArrayInit(sz, sizeof(SColumnInfoData)); for (int32_t i = 0; i < sz; i++) { @@ -1990,7 +1990,7 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf) int32_t len = 0; len += snprintf(dumpBuf + len, size - len, "===stream===%s|block type %d|child id %d|group id:%" PRIu64 "|uid:%" PRId64 - "|rows:%d|version:%" PRIu64 "|cal start:%" PRIu64 "|cal end:%" PRIu64 "|tbl:%s\n", + "|rows:%" PRId64 "|version:%" PRIu64 "|cal start:%" PRIu64 "|cal end:%" PRIu64 "|tbl:%s\n", flag, (int32_t)pDataBlock->info.type, pDataBlock->info.childId, pDataBlock->info.id.groupId, pDataBlock->info.id.uid, pDataBlock->info.rows, pDataBlock->info.version, pDataBlock->info.calWin.skey, pDataBlock->info.calWin.ekey, pDataBlock->info.parTbName); diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 7864d80a38..9636dc2872 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -178,7 +178,7 @@ typedef struct STsdbReader STsdbReader; int32_t tsdbSetTableList(STsdbReader *pReader, const void *pTableList, int32_t num); int32_t tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, void *pTableList, int32_t numOfTables, - SSDataBlock *pResBlock, STsdbReader **ppReader, const char *idstr); + SSDataBlock *pResBlock, STsdbReader **ppReader, const char *idstr, bool countOnly); void tsdbReaderSetId(STsdbReader* pReader, const char* idstr); void tsdbReaderClose(STsdbReader *pReader); diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 452b1f6c0b..0c4ada2cb1 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -224,6 +224,8 @@ int32_t tsdbTbDataIterCreate(STbData *pTbData, TSDBKEY *pFrom, int8_t backward, void *tsdbTbDataIterDestroy(STbDataIter *pIter); void tsdbTbDataIterOpen(STbData *pTbData, TSDBKEY *pFrom, int8_t backward, STbDataIter *pIter); bool tsdbTbDataIterNext(STbDataIter *pIter); +void tsdbMemTableCountRows(SMemTable *pMemTable, SHashObj* pTableMap, int64_t *rowsNum); + // STbData int32_t tsdbGetNRowsInTbData(STbData *pTbData); // tsdbFile.c ============================================================================================== diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index 3ed1b083e4..c75c675ec3 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -669,7 +669,7 @@ static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSma #endif for (int32_t i = 0; i < taosArrayGetSize(pResList); ++i) { SSDataBlock *output = taosArrayGetP(pResList, i); - smaDebug("result block, uid:%" PRIu64 ", groupid:%" PRIu64 ", rows:%d", output->info.id.uid, + smaDebug("result block, uid:%" PRIu64 ", groupid:%" PRIu64 ", rows:%" PRId64, output->info.id.uid, output->info.id.groupId, output->info.rows); STsdb *sinkTsdb = (pItem->level == TSDB_RETENTION_L1 ? pSma->pRSmaTsdb[0] : pSma->pRSmaTsdb[1]); diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 7f9563ae5f..90ff1f8a84 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -340,7 +340,7 @@ int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) { continue; } ret->fetchType = FETCH_TYPE__DATA; - tqDebug("return data rows %d", ret->data.info.rows); + tqDebug("return data rows %" PRId64, ret->data.info.rows); return 0; } diff --git a/source/dnode/vnode/src/tq/tqScan.c b/source/dnode/vnode/src/tq/tqScan.c index 85a62c4dd1..c097cac015 100644 --- a/source/dnode/vnode/src/tq/tqScan.c +++ b/source/dnode/vnode/src/tq/tqScan.c @@ -114,7 +114,7 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs pRsp->blockNum++; - tqDebug("vgId:%d, consumer:0x%" PRIx64 " tmq task executed, rows:%d, total blocks:%d", vgId, pHandle->consumerId, + tqDebug("vgId:%d, consumer:0x%" PRIx64 " tmq task executed, rows:%" PRId64 ", total blocks:%d", vgId, pHandle->consumerId, pDataBlock->info.rows, pRsp->blockNum); if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) { diff --git a/source/dnode/vnode/src/tsdb/tsdbMemTable.c b/source/dnode/vnode/src/tsdb/tsdbMemTable.c index d34af9acae..d0ff403bf7 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMemTable.c +++ b/source/dnode/vnode/src/tsdb/tsdbMemTable.c @@ -282,6 +282,40 @@ bool tsdbTbDataIterNext(STbDataIter *pIter) { return true; } +int64_t tsdbCountTbDataRows(STbData *pTbData) { + SMemSkipListNode *pNode = pTbData->sl.pHead; + int64_t rowsNum = 0; + + while (NULL != pNode) { + pNode = SL_GET_NODE_FORWARD(pNode, 0); + if (pNode == pTbData->sl.pTail) { + return rowsNum; + } + + rowsNum++; + } + + return rowsNum; +} + +void tsdbMemTableCountRows(SMemTable *pMemTable, SHashObj* pTableMap, int64_t *rowsNum) { + taosRLockLatch(&pMemTable->latch); + for (int32_t i = 0; i < pMemTable->nBucket; ++i) { + STbData *pTbData = pMemTable->aBucket[i]; + while (pTbData) { + void* p = taosHashGet(pTableMap, &pTbData->uid, sizeof(pTbData->uid)); + if (p == NULL) { + pTbData = pTbData->next; + continue; + } + + *rowsNum += tsdbCountTbDataRows(pTbData); + pTbData = pTbData->next; + } + } + taosRUnLockLatch(&pMemTable->latch); +} + static int32_t tsdbMemTableRehash(SMemTable *pMemTable) { int32_t code = 0; @@ -787,4 +821,4 @@ SArray *tsdbMemTableGetTbDataArray(SMemTable *pMemTable) { _exit: return aTbDataP; -} \ No newline at end of file +} diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index c0d54ea87a..99e2e620e2 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -25,6 +25,11 @@ typedef enum { EXTERNAL_ROWS_NEXT = 0x3, } EContentData; +typedef enum { + READ_MODE_COUNT_ONLY = 0x1, + READ_MODE_ALL, +} EReadMode; + typedef struct { STbDataIter* iter; int32_t index; @@ -168,6 +173,8 @@ struct STsdbReader { uint64_t suid; int16_t order; bool freeBlock; + EReadMode readMode; + uint64_t rowsNum; STimeWindow window; // the primary query time window that applies to all queries SSDataBlock* pResBlock; int32_t capacity; @@ -1777,7 +1784,7 @@ static int32_t buildDataBlockFromBuf(STsdbReader* pReader, STableBlockScanInfo* setComposedBlockFlag(pReader, true); double elapsedTime = (taosGetTimestampUs() - st) / 1000.0; - tsdbDebug("%p build data block from cache completed, elapsed time:%.2f ms, numOfRows:%d, brange:%" PRId64 + tsdbDebug("%p build data block from cache completed, elapsed time:%.2f ms, numOfRows:%" PRId64 ", brange:%" PRId64 " - %" PRId64 ", uid:%" PRIu64 ", %s", pReader, elapsedTime, pBlock->info.rows, pBlock->info.window.skey, pBlock->info.window.ekey, pBlockScanInfo->uid, pReader->idStr); @@ -2770,7 +2777,7 @@ _end: if (pResBlock->info.rows > 0) { tsdbDebug("%p uid:%" PRIu64 ", composed data block created, brange:%" PRIu64 "-%" PRIu64 - " rows:%d, elapsed time:%.2f ms %s", + " rows:%" PRId64 ", elapsed time:%.2f ms %s", pReader, pResBlock->info.id.uid, pResBlock->info.window.skey, pResBlock->info.window.ekey, pResBlock->info.rows, el, pReader->idStr); } @@ -3022,7 +3029,7 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) { if (pResBlock->info.rows > 0) { tsdbDebug("%p uid:%" PRIu64 ", composed data block created, brange:%" PRIu64 "-%" PRIu64 - " rows:%d, elapsed time:%.2f ms %s", + " rows:%" PRId64 ", elapsed time:%.2f ms %s", pReader, pResBlock->info.id.uid, pResBlock->info.window.skey, pResBlock->info.window.ekey, pResBlock->info.rows, el, pReader->idStr); return TSDB_CODE_SUCCESS; @@ -3106,7 +3113,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { if (pResBlock->info.rows > 0) { tsdbDebug("%p uid:%" PRIu64 ", composed data block created, brange:%" PRIu64 "-%" PRIu64 - " rows:%d, elapsed time:%.2f ms %s", + " rows:%" PRId64 ", elapsed time:%.2f ms %s", pReader, pResBlock->info.id.uid, pResBlock->info.window.skey, pResBlock->info.window.ekey, pResBlock->info.rows, el, pReader->idStr); } @@ -3132,6 +3139,151 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { return code; } + +static int32_t doSumFileBlockRows(STsdbReader* pReader, SDataFReader* pFileReader) { + int64_t st = taosGetTimestampUs(); + LRUHandle* handle = NULL; + int32_t code = tsdbCacheGetBlockIdx(pFileReader->pTsdb->biCache, pFileReader, &handle); + if (code != TSDB_CODE_SUCCESS || handle == NULL) { + goto _end; + } + + int32_t numOfTables = taosHashGetSize(pReader->status.pTableMap); + + SArray* aBlockIdx = (SArray*)taosLRUCacheValue(pFileReader->pTsdb->biCache, handle); + size_t num = taosArrayGetSize(aBlockIdx); + if (num == 0) { + tsdbBICacheRelease(pFileReader->pTsdb->biCache, handle); + return TSDB_CODE_SUCCESS; + } + + SBlockIdx* pBlockIdx = NULL; + int32_t i = 0; + for (int32_t i = 0; i < num; ++i) { + pBlockIdx = (SBlockIdx*)taosArrayGet(aBlockIdx, i); + if (pBlockIdx->suid != pReader->suid) { + continue; + } + + STableBlockScanInfo** p = taosHashGet(pReader->status.pTableMap, &pBlockIdx->uid, sizeof(pBlockIdx->uid)); + if (p == NULL) { + continue; + } + + STableBlockScanInfo *pScanInfo = *p; + tMapDataReset(&pScanInfo->mapData); + tsdbReadDataBlk(pReader->pFileReader, pBlockIdx, &pScanInfo->mapData); + + SDataBlk block = {0}; + for (int32_t j = 0; j < pScanInfo->mapData.nItem; ++j) { + tGetDataBlk(pScanInfo->mapData.pData + pScanInfo->mapData.aOffset[j], &block); + pReader->rowsNum += block.nRow; + } + } + +_end: + tsdbBICacheRelease(pFileReader->pTsdb->biCache, handle); + return code; +} + + +static int32_t doSumSttBlockRows(STsdbReader* pReader) { + int32_t code = TSDB_CODE_SUCCESS; + SLastBlockReader* pLastBlockReader = pReader->status.fileIter.pLastBlockReader; + SSttBlockLoadInfo* pBlockLoadInfo = NULL; + + for (int32_t i = 0; i < pReader->pFileReader->pSet->nSttF; ++i) { // open all last file + pBlockLoadInfo = &pLastBlockReader->pInfo[i]; + + code = tsdbReadSttBlk(pReader->pFileReader, i, pBlockLoadInfo->aSttBlk); + if (code) { + return code; + } + + size_t size = taosArrayGetSize(pBlockLoadInfo->aSttBlk); + if (size >= 1) { + SSttBlk *pStart = taosArrayGet(pBlockLoadInfo->aSttBlk, 0); + SSttBlk *pEnd = taosArrayGet(pBlockLoadInfo->aSttBlk, size - 1); + + // all identical + if (pStart->suid == pEnd->suid) { + if (pStart->suid != pReader->suid) { + // no qualified stt block existed + taosArrayClear(pBlockLoadInfo->aSttBlk); + continue; + } + for (int32_t i = 0; i < size; ++i) { + SSttBlk *p = taosArrayGet(pBlockLoadInfo->aSttBlk, i); + pReader->rowsNum += p->nRow; + } + } else { + for (int32_t i = 0; i < size; ++i) { + SSttBlk *p = taosArrayGet(pBlockLoadInfo->aSttBlk, i); + uint64_t s = p->suid; + if (s < pReader->suid) { + continue; + } + + if (s == pReader->suid) { + pReader->rowsNum += p->nRow; + } else if (s > pReader->suid) { + break; + } + } + } + } + } + + return code; +} + +static int32_t readRowsCountFromFiles(STsdbReader* pReader) { + int32_t code = TSDB_CODE_SUCCESS; + + while (1) { + bool hasNext = false; + int32_t code = filesetIteratorNext(&pReader->status.fileIter, pReader, &hasNext); + if (code) { + return code; + } + + if (!hasNext) { // no data files on disk + break; + } + + code = doSumFileBlockRows(pReader, pReader->pFileReader); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + + code = doSumSttBlockRows(pReader); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + } + + pReader->status.loadFromFile = false; + + return code; +} + +static int32_t readRowsCountFromMem(STsdbReader* pReader) { + int32_t code = TSDB_CODE_SUCCESS; + int64_t memNum = 0, imemNum = 0; + if (pReader->pReadSnap->pMem != NULL) { + tsdbMemTableCountRows(pReader->pReadSnap->pMem, pReader->status.pTableMap, &memNum); + } + + if (pReader->pReadSnap->pIMem != NULL) { + tsdbMemTableCountRows(pReader->pReadSnap->pIMem, pReader->status.pTableMap, &imemNum); + } + + pReader->rowsNum += memNum + imemNum; + + return code; +} + + static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader) { SReaderStatus* pStatus = &pReader->status; STableUidList* pUidList = &pStatus->uidList; @@ -4072,6 +4224,8 @@ static int32_t doOpenReaderImpl(STsdbReader* pReader) { int32_t code = TSDB_CODE_SUCCESS; if (pStatus->fileIter.numOfFiles == 0) { pStatus->loadFromFile = false; + } else if (READ_MODE_COUNT_ONLY == pReader->readMode) { + // DO NOTHING } else { code = initForFirstBlockInFile(pReader, pBlockIter); } @@ -4090,7 +4244,7 @@ static void freeSchemaFunc(void* param) { // ====================================== EXPOSED APIs ====================================== int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, void* pTableList, int32_t numOfTables, - SSDataBlock* pResBlock, STsdbReader** ppReader, const char* idstr) { + SSDataBlock* pResBlock, STsdbReader** ppReader, const char* idstr, bool countOnly) { STimeWindow window = pCond->twindows; if (pCond->type == TIMEWINDOW_RANGE_EXTERNAL) { pCond->twindows.skey += 1; @@ -4190,6 +4344,10 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, void* pTableL pReader->suspended = true; + if (countOnly) { + pReader->readMode = READ_MODE_COUNT_ONLY; + } + tsdbDebug("%p total numOfTable:%d in this query %s", pReader, numOfTables, pReader->idStr); return code; @@ -4490,6 +4648,33 @@ _err: return code; } +static bool tsdbReadRowsCountOnly(STsdbReader* pReader) { + int32_t code = TSDB_CODE_SUCCESS; + SSDataBlock* pBlock = pReader->pResBlock; + + if (pReader->status.loadFromFile == false) { + return false; + } + + code = readRowsCountFromFiles(pReader); + if (code != TSDB_CODE_SUCCESS) { + return false; + } + + code = readRowsCountFromMem(pReader); + if (code != TSDB_CODE_SUCCESS) { + return false; + } + + pBlock->info.rows = pReader->rowsNum; + pBlock->info.id.uid = 0; + pBlock->info.dataLoad = 0; + + pReader->rowsNum = 0; + + return pBlock->info.rows > 0; +} + static int32_t doTsdbNextDataBlock(STsdbReader* pReader, bool *hasNext) { int32_t code = TSDB_CODE_SUCCESS; @@ -4504,6 +4689,10 @@ static int32_t doTsdbNextDataBlock(STsdbReader* pReader, bool *hasNext) { return code; } + if (READ_MODE_COUNT_ONLY == pReader->readMode) { + return tsdbReadRowsCountOnly(pReader); + } + if (pStatus->loadFromFile) { code = buildBlockFromFiles(pReader); if (code != TSDB_CODE_SUCCESS) { diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 0cb9955056..4eceefcd51 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -336,6 +336,7 @@ typedef struct STableScanInfo { int8_t scanMode; int8_t assignBlockUid; bool hasGroupByTag; + bool countOnly; } STableScanInfo; typedef struct STableMergeScanInfo { diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 656ffda0ca..eed4476d2c 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -1167,7 +1167,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT if (pScanBaseInfo->dataReader == NULL) { int32_t code = tsdbReaderOpen(pScanBaseInfo->readHandle.vnode, &pScanBaseInfo->cond, &keyInfo, 1, - pScanInfo->pResBlock, &pScanBaseInfo->dataReader, id); + pScanInfo->pResBlock, &pScanBaseInfo->dataReader, id, false); if (code != TSDB_CODE_SUCCESS) { qError("prepare read tsdb snapshot failed, uid:%" PRId64 ", code:%s %s", pOffset->uid, tstrerror(code), id); terrno = code; @@ -1218,7 +1218,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT STableKeyInfo* pList = tableListGetInfo(pTableListInfo, 0); int32_t size = tableListGetSize(pTableListInfo); - tsdbReaderOpen(pInfo->vnode, &pTaskInfo->streamInfo.tableCond, pList, size, NULL, &pInfo->dataReader, NULL); + tsdbReaderOpen(pInfo->vnode, &pTaskInfo->streamInfo.tableCond, pList, size, NULL, &pInfo->dataReader, NULL, false); cleanupQueryTableDataCond(&pTaskInfo->streamInfo.tableCond); strcpy(pTaskInfo->streamInfo.tbName, mtInfo.tbName); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index c2a068e1cb..8068590dad 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -1201,7 +1201,7 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprS if (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) { blockDataEnsureCapacity(pBlock, pBlock->info.rows + pRow->numOfRows); - qDebug("datablock capacity not sufficient, expand to required:%d, current capacity:%d, %s", + qDebug("datablock capacity not sufficient, expand to required:%" PRId64 ", current capacity:%d, %s", (pRow->numOfRows+pBlock->info.rows), pBlock->info.capacity, GET_TASKID(pTaskInfo)); // todo set the pOperator->resultInfo size @@ -1214,7 +1214,7 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprS pBlock->info.rows += pRow->numOfRows; } - qDebug("%s result generated, rows:%d, groupId:%" PRIu64, GET_TASKID(pTaskInfo), pBlock->info.rows, + qDebug("%s result generated, rows:%" PRId64 ", groupId:%" PRIu64, GET_TASKID(pTaskInfo), pBlock->info.rows, pBlock->info.id.groupId); pBlock->info.dataLoad = 1; blockDataUpdateTsWindow(pBlock, 0); diff --git a/source/libs/executor/src/projectoperator.c b/source/libs/executor/src/projectoperator.c index 4e2e105d14..c943270df9 100644 --- a/source/libs/executor/src/projectoperator.c +++ b/source/libs/executor/src/projectoperator.c @@ -271,7 +271,7 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { return NULL; } } - qDebug("set op close, exec %d, status %d rows %d", pTaskInfo->execModel, pOperator->status, + qDebug("set op close, exec %d, status %d rows %" PRId64 , pTaskInfo->execModel, pOperator->status, pFinalRes->info.rows); setOperatorCompleted(pOperator); break; @@ -337,7 +337,7 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { // when apply the limit/offset for each group, pRes->info.rows may be 0, due to limit constraint. if (pFinalRes->info.rows > 0 || (pOperator->status == OP_EXEC_DONE)) { - qDebug("project return %d rows, status %d", pFinalRes->info.rows, pOperator->status); + qDebug("project return %" PRId64 " rows, status %d", pFinalRes->info.rows, pOperator->status); break; } } else { diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 84317c825b..7e1b97b2e9 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -31,6 +31,9 @@ #include "thash.h" #include "ttypes.h" +int32_t scanDebug = 0; + + #define MULTI_READER_MAX_TABLE_NUM 5000 #define SET_REVERSE_SCAN_FLAG(_info) ((_info)->scanFlag = REVERSE_SCAN) #define SWITCH_ORDER(n) (((n) = ((n) == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC)) @@ -308,14 +311,14 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableSca taosMemoryFreeClear(pBlock->pBlockAgg); if (*status == FUNC_DATA_REQUIRED_FILTEROUT) { - qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo), + qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%" PRId64 , GET_TASKID(pTaskInfo), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows); pCost->filterOutBlocks += 1; pCost->totalRows += pBlock->info.rows; tsdbReleaseDataBlock(pTableScanInfo->dataReader); return TSDB_CODE_SUCCESS; } else if (*status == FUNC_DATA_REQUIRED_NOT_LOAD) { - qDebug("%s data block skipped, brange:%" PRId64 "-%" PRId64 ", rows:%d, uid:%" PRIu64, GET_TASKID(pTaskInfo), + qDebug("%s data block skipped, brange:%" PRId64 "-%" PRId64 ", rows:%" PRId64 ", uid:%" PRIu64, GET_TASKID(pTaskInfo), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows, pBlockInfo->id.uid); doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo, 1); pCost->skipBlocks += 1; @@ -326,7 +329,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableSca loadSMA = true; // mark the operation of load sma; bool success = doLoadBlockSMA(pTableScanInfo, pBlock, pTaskInfo); if (success) { // failed to load the block sma data, data block statistics does not exist, load data block instead - qDebug("%s data block SMA loaded, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo), + qDebug("%s data block SMA loaded, brange:%" PRId64 "-%" PRId64 ", rows:%" PRId64 , GET_TASKID(pTaskInfo), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows); doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo, 1); tsdbReleaseDataBlock(pTableScanInfo->dataReader); @@ -346,7 +349,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableSca size_t size = taosArrayGetSize(pBlock->pDataBlock); bool keep = doFilterByBlockSMA(pOperator->exprSupp.pFilterInfo, pBlock->pBlockAgg, size, pBlockInfo->rows); if (!keep) { - qDebug("%s data block filter out by block SMA, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo), + qDebug("%s data block filter out by block SMA, brange:%" PRId64 "-%" PRId64 ", rows:%" PRId64 , GET_TASKID(pTaskInfo), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows); pCost->filterOutBlocks += 1; (*status) = FUNC_DATA_REQUIRED_FILTEROUT; @@ -363,7 +366,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableSca // try to filter data block according to current results doDynamicPruneDataBlock(pOperator, pBlockInfo, status); if (*status == FUNC_DATA_REQUIRED_NOT_LOAD) { - qDebug("%s data block skipped due to dynamic prune, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo), + qDebug("%s data block skipped due to dynamic prune, brange:%" PRId64 "-%" PRId64 ", rows:%" PRId64 , GET_TASKID(pTaskInfo), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows); pCost->skipBlocks += 1; tsdbReleaseDataBlock(pTableScanInfo->dataReader); @@ -394,7 +397,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableSca if (pBlock->info.rows == 0) { pCost->filterOutBlocks += 1; - qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d, elapsed time:%.2f ms", + qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%" PRId64 ", elapsed time:%.2f ms", GET_TASKID(pTaskInfo), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows, el); } else { qDebug("%s data block filter applied, elapsed time:%.2f ms", GET_TASKID(pTaskInfo), el); @@ -672,9 +675,10 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) { continue; } - ASSERT(pBlock->info.id.uid != 0); - pBlock->info.id.groupId = getTableGroupId(pTaskInfo->pTableInfoList, pBlock->info.id.uid); - + if (pBlock->info.id.uid) { + pBlock->info.id.groupId = getTableGroupId(pTaskInfo->pTableInfoList, pBlock->info.id.uid); + } + uint32_t status = 0; int32_t code = loadDataBlock(pOperator, &pTableScanInfo->base, pBlock, &status); if (code != TSDB_CODE_SUCCESS) { @@ -698,7 +702,6 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) { pTaskInfo->streamInfo.lastStatus.uid = pBlock->info.id.uid; pTaskInfo->streamInfo.lastStatus.ts = pBlock->info.window.ekey; - ASSERT(pBlock->info.id.uid != 0); return pBlock; } return NULL; @@ -804,7 +807,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { ASSERT(pInfo->base.dataReader == NULL); int32_t code = tsdbReaderOpen(pInfo->base.readHandle.vnode, &pInfo->base.cond, pList, num, pInfo->pResBlock, - (STsdbReader**)&pInfo->base.dataReader, GET_TASKID(pTaskInfo)); + (STsdbReader**)&pInfo->base.dataReader, GET_TASKID(pTaskInfo), pInfo->countOnly); if (code != TSDB_CODE_SUCCESS) { T_LONG_JMP(pTaskInfo->env, code); } @@ -816,7 +819,6 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { SSDataBlock* result = doGroupedTableScan(pOperator); if (result != NULL) { - ASSERT(result->info.id.uid != 0); return result; } @@ -936,6 +938,10 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, goto _error; } + if (scanDebug) { + pInfo->countOnly = true; + } + taosLRUCacheSetStrictCapacity(pInfo->base.metaCache.pTableMetaEntryCache, false); pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTableScan, NULL, destroyTableScanOperatorInfo, optrDefaultBufFn, getTableScannerExecInfo); @@ -1027,7 +1033,7 @@ static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbU SSDataBlock* pBlock = pTableScanInfo->pResBlock; STsdbReader* pReader = NULL; int32_t code = tsdbReaderOpen(pTableScanInfo->base.readHandle.vnode, &cond, &tblInfo, 1, pBlock, - (STsdbReader**)&pReader, GET_TASKID(pTaskInfo)); + (STsdbReader**)&pReader, GET_TASKID(pTaskInfo), false); if (code != TSDB_CODE_SUCCESS) { terrno = code; T_LONG_JMP(pTaskInfo->env, code); @@ -1049,7 +1055,7 @@ static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbU } tsdbReaderClose(pReader); - qDebug("retrieve prev rows:%d, skey:%" PRId64 ", ekey:%" PRId64 " uid:%" PRIu64 ", max ver:%" PRId64 + qDebug("retrieve prev rows:%" PRId64 ", skey:%" PRId64 ", ekey:%" PRId64 " uid:%" PRIu64 ", max ver:%" PRId64 ", suid:%" PRIu64, pBlock->info.rows, startTs, endTs, tbUid, maxVersion, cond.suid); @@ -1639,7 +1645,7 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_DATA) { SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp); if (pResult && pResult->info.rows > 0) { - qDebug("queue scan tsdb return %d rows min:%" PRId64 " max:%" PRId64 " wal curVersion:%" PRId64" %s", pResult->info.rows, + qDebug("queue scan tsdb return %" PRId64 " rows min:%" PRId64 " max:%" PRId64 " wal curVersion:%" PRId64" %s", pResult->info.rows, pResult->info.window.skey, pResult->info.window.ekey, pInfo->tqReader->pWalReader->curVersion, id); pTaskInfo->streamInfo.returned = 1; return pResult; @@ -1678,7 +1684,7 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { setBlockIntoRes(pInfo, &ret.data, true); if (pInfo->pRes->info.rows > 0) { pOperator->status = OP_EXEC_RECV; - qDebug("queue scan log return %d rows", pInfo->pRes->info.rows); + qDebug("queue scan log return %" PRId64 " rows", pInfo->pRes->info.rows); return pInfo->pRes; } } else if (ret.fetchType == FETCH_TYPE__META) { @@ -1865,7 +1871,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { printDataBlock(pInfo->pCreateTbRes, "recover createTbl"); return pInfo->pCreateTbRes; } - qDebug("stream recover scan get block, rows %d", pInfo->pRecoverRes->info.rows); + qDebug("stream recover scan get block, rows %" PRId64 , pInfo->pRecoverRes->info.rows); printDataBlock(pInfo->pRecoverRes, "scan recover"); return pInfo->pRecoverRes; } @@ -2094,7 +2100,7 @@ FETCH_NEXT_BLOCK: pOperator->resultInfo.totalRows += pBlockInfo->rows; // printDataBlock(pInfo->pRes, "stream scan"); - qDebug("scan rows: %d", pBlockInfo->rows); + qDebug("scan rows: %" PRId64 , pBlockInfo->rows); if (pBlockInfo->rows > 0) { return pInfo->pRes; } @@ -2635,7 +2641,7 @@ static SSDataBlock* getTableDataBlockImpl(void* param) { SReadHandle* pHandle = &pInfo->base.readHandle; if (NULL == source->dataReader || !source->multiReader) { - code = tsdbReaderOpen(pHandle->vnode, pQueryCond, p, 1, pBlock, &source->dataReader, GET_TASKID(pTaskInfo)); + code = tsdbReaderOpen(pHandle->vnode, pQueryCond, p, 1, pBlock, &source->dataReader, GET_TASKID(pTaskInfo), false); if (code != 0) { T_LONG_JMP(pTaskInfo->env, code); } @@ -2862,7 +2868,7 @@ SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, SSDataBlock* } bool limitReached = applyLimitOffset(&pInfo->limitInfo, pResBlock, pTaskInfo); - qDebug("%s get sorted row block, rows:%d, limit:%" PRId64, GET_TASKID(pTaskInfo), pResBlock->info.rows, + qDebug("%s get sorted row block, rows:%" PRId64 ", limit:%" PRId64, GET_TASKID(pTaskInfo), pResBlock->info.rows, pInfo->limitInfo.numOfOutputRows); return (pResBlock->info.rows > 0) ? pResBlock : NULL; diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c index 90c7fa10ca..cb0f1aa068 100644 --- a/source/libs/executor/src/sortoperator.c +++ b/source/libs/executor/src/sortoperator.c @@ -698,7 +698,7 @@ SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pData pDataBlock->info.dataLoad = 1; } - qDebug("%s get sorted block, groupId:0x%" PRIx64 " rows:%d", GET_TASKID(pTaskInfo), pDataBlock->info.id.groupId, + qDebug("%s get sorted block, groupId:0x%" PRIx64 " rows:%" PRId64 , GET_TASKID(pTaskInfo), pDataBlock->info.id.groupId, pDataBlock->info.rows); return (pDataBlock->info.rows > 0) ? pDataBlock : NULL; diff --git a/source/libs/executor/src/sysscanoperator.c b/source/libs/executor/src/sysscanoperator.c index 00a90cc108..68e5cf6ef8 100644 --- a/source/libs/executor/src/sysscanoperator.c +++ b/source/libs/executor/src/sysscanoperator.c @@ -2267,7 +2267,7 @@ SOperatorInfo* createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDi size_t num = tableListGetSize(pTableListInfo); void* pList = tableListGetInfo(pTableListInfo, 0); - code = tsdbReaderOpen(readHandle->vnode, &cond, pList, num, pInfo->pResBlock, &pInfo->pHandle, pTaskInfo->id.str); + code = tsdbReaderOpen(readHandle->vnode, &cond, pList, num, pInfo->pResBlock, &pInfo->pHandle, pTaskInfo->id.str, false); cleanupQueryTableDataCond(&cond); if (code != 0) { goto _error; diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 3c9f2fe8ca..313ab93f8d 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -494,8 +494,8 @@ bool getCountFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) { return true; } -static int32_t getNumOfElems(SqlFunctionCtx* pCtx) { - int32_t numOfElem = 0; +static int64_t getNumOfElems(SqlFunctionCtx* pCtx) { + int64_t numOfElem = 0; /* * 1. column data missing (schema modified) causes pInputCol->hasNull == true. pInput->colDataSMAIsSet == true; @@ -528,7 +528,7 @@ static int32_t getNumOfElems(SqlFunctionCtx* pCtx) { * count function does not use the pCtx->interResBuf to keep the intermediate buffer */ int32_t countFunction(SqlFunctionCtx* pCtx) { - int32_t numOfElem = 0; + int64_t numOfElem = 0; SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); SInputColumnInfoData* pInput = &pCtx->input; @@ -555,7 +555,7 @@ int32_t countFunction(SqlFunctionCtx* pCtx) { } int32_t countInvertFunction(SqlFunctionCtx* pCtx) { - int32_t numOfElem = getNumOfElems(pCtx); + int64_t numOfElem = getNumOfElems(pCtx); SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); char* buf = GET_ROWCELL_INTERBUF(pResInfo); @@ -1929,7 +1929,7 @@ int32_t apercentileFunctionMerge(SqlFunctionCtx* pCtx) { SAPercentileInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo); - qDebug("%s total %d rows will merge, %p", __FUNCTION__, pInput->numOfRows, pInfo->pHisto); + qDebug("%s total %" PRId64 " rows will merge, %p", __FUNCTION__, pInput->numOfRows, pInfo->pHisto); int32_t start = pInput->startRowIndex; for (int32_t i = start; i < start + pInput->numOfRows; ++i) { diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index ec4049a3eb..92c7852dbc 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -199,7 +199,7 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) { QW_ERR_JRET(code); } - QW_TASK_DLOG("data put into sink, rows:%d, continueExecTask:%d", pRes->info.rows, qcontinue); + QW_TASK_DLOG("data put into sink, rows:%" PRId64 ", continueExecTask:%d", pRes->info.rows, qcontinue); } if (numOfResBlock == 0 || (hasMore == false)) {