Merge pull request #24088 from taosdata/fix/3_liaohj

fix(query): add rows info if blocks exist.
This commit is contained in:
Haojun Liao 2023-12-15 18:24:45 +08:00 committed by GitHub
commit a847bf468a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 15 additions and 14 deletions

View File

@ -847,11 +847,12 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
mInfo("trans:%d, used to create stream:%s", pTrans->id, createStreamReq.name);
mndTransSetDbName(pTrans, createStreamReq.sourceDB, streamObj.targetDb);
mndTransSetDbName(pTrans, createStreamReq.sourceDB, streamObj.targetSTbName);
if (mndTransCheckConflict(pMnode, pTrans) != 0) {
mndTransDrop(pTrans);
goto _OVER;
}
// create stb for stream
if (createStreamReq.createStb == STREAM_CREATE_STABLE_TRUE &&
mndCreateStbForStream(pMnode, pTrans, &streamObj, pReq->info.conn.user) < 0) {
@ -938,7 +939,8 @@ int64_t mndStreamGenChkpId(SMnode *pMnode) {
if (pIter == NULL) break;
maxChkpId = TMAX(maxChkpId, pStream->checkpointId);
mDebug("stream %p checkpoint %" PRId64 "", pStream, pStream->checkpointId);
mDebug("stream:%p, %s id:%" PRIx64 "checkpoint %" PRId64 "", pStream, pStream->name, pStream->uid,
pStream->checkpointId);
sdbRelease(pSdb, pStream);
}
@ -1380,12 +1382,12 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
}
// check if it is conflict with other trans in both sourceDb and targetDb.
bool conflict = streamTransConflictOtherTrans(pMnode, pStream->sourceDb, pStream->targetDb, true);
if (conflict) {
sdbRelease(pMnode->pSdb, pStream);
tFreeMDropStreamReq(&dropReq);
return -1;
}
// bool conflict = streamTransConflictOtherTrans(pMnode, pStream->sourceDb, pStream->targetDb, true);
// if (conflict) {
// sdbRelease(pMnode->pSdb, pStream);
// tFreeMDropStreamReq(&dropReq);
// return -1;
// }
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pReq, MND_STREAM_DROP_NAME);
if (pTrans == NULL) {

View File

@ -4720,14 +4720,13 @@ static int32_t getBucketIndex(int32_t startRow, int32_t bucketRange, int32_t num
}
int32_t tsdbGetFileBlocksDistInfo2(STsdbReader* pReader, STableBlockDistInfo* pTableBlockInfo) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t code = TSDB_CODE_SUCCESS;
const int32_t numOfBuckets = 20.0;
pTableBlockInfo->totalSize = 0;
pTableBlockInfo->totalRows = 0;
pTableBlockInfo->numOfVgroups = 1;
const int32_t numOfBuckets = 20.0;
const int32_t defaultRows = 4096;
// find the start data block in file
tsdbAcquireReader(pReader);
if (pReader->flag == READER_STATUS_SUSPEND) {

View File

@ -5634,7 +5634,7 @@ int32_t blockDistFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
double factor = pData->numOfBlocks / 80.0;
int32_t numOfBuckets = sizeof(pData->blockRowsHisto) / sizeof(pData->blockRowsHisto[0]);
int32_t bucketRange = (pData->defMaxRows - pData->defMinRows) / numOfBuckets;
int32_t bucketRange = ceil(((double) (pData->defMaxRows - pData->defMinRows)) / numOfBuckets);
for (int32_t i = 0; i < tListLen(pData->blockRowsHisto); ++i) {
len = sprintf(st + VARSTR_HEADER_SIZE, "%04d |", pData->defMinRows + bucketRange * (i + 1));
@ -5649,7 +5649,7 @@ int32_t blockDistFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
len += x;
}
if (num > 0) {
if (pData->blockRowsHisto[i] > 0) {
double v = pData->blockRowsHisto[i] * 100.0 / pData->numOfBlocks;
len += sprintf(st + VARSTR_HEADER_SIZE + len, " %d (%.2f%c)", pData->blockRowsHisto[i], v, '%');
}