diff --git a/docs/en/12-taos-sql/22-meta.md b/docs/en/12-taos-sql/22-meta.md index 4123bdfb58..f165470d10 100644 --- a/docs/en/12-taos-sql/22-meta.md +++ b/docs/en/12-taos-sql/22-meta.md @@ -283,6 +283,8 @@ Provides dnode configuration information. | 2 | consumer_group | BINARY(193) | Subscribed consumer group | | 3 | vgroup_id | INT | Vgroup ID for the consumer | | 4 | consumer_id | BIGINT | Consumer ID | +| 5 | offset | BINARY(64) | Consumption progress | +| 6 | rows | BIGINT | Number of consumption items | ## INS_STREAMS diff --git a/docs/zh/12-taos-sql/22-meta.md b/docs/zh/12-taos-sql/22-meta.md index 3fffbd0706..fe8d6d4c69 100644 --- a/docs/zh/12-taos-sql/22-meta.md +++ b/docs/zh/12-taos-sql/22-meta.md @@ -284,6 +284,8 @@ TDengine 内置了一个名为 `INFORMATION_SCHEMA` 的数据库,提供对数 | 2 | consumer_group | BINARY(193) | 订阅者的消费者组 | | 3 | vgroup_id | INT | 消费者被分配的 vgroup id | | 4 | consumer_id | BIGINT | 消费者的唯一 id | +| 5 | offset | BINARY(64) | 消费者的消费进度 | +| 6 | rows | BIGINT | 消费者的消费的数据条数 | ## INS_STREAMS diff --git a/source/libs/tdb/src/db/tdbPager.c b/source/libs/tdb/src/db/tdbPager.c index 469416cd1b..896b0713df 100644 --- a/source/libs/tdb/src/db/tdbPager.c +++ b/source/libs/tdb/src/db/tdbPager.c @@ -720,15 +720,39 @@ int tdbPagerInsertFreePage(SPager *pPager, SPage *pPage, TXN *pTxn) { int code = 0; SPgno pgno = TDB_PAGE_PGNO(pPage); + if (pPager->frps) { + taosArrayPush(pPager->frps, &pgno); + pPage->pPager = NULL; + return code; + } + + pPager->frps = taosArrayInit(8, sizeof(SPgno)); // memset(pPage->pData, 0, pPage->pageSize); tdbTrace("tdb/insert-free-page: tbc recycle page: %d.", pgno); // printf("tdb/insert-free-page: tbc recycle page: %d.\n", pgno); code = tdbTbInsert(pPager->pEnv->pFreeDb, &pgno, sizeof(pgno), NULL, 0, pTxn); if (code < 0) { tdbError("tdb/insert-free-page: tb insert failed with ret: %d.", code); + taosArrayDestroy(pPager->frps); + pPager->frps = NULL; return -1; } + while (TARRAY_SIZE(pPager->frps) > 0) { + pgno = *(SPgno *)taosArrayPop(pPager->frps); + + code = tdbTbInsert(pPager->pEnv->pFreeDb, &pgno, sizeof(pgno), NULL, 0, pTxn); + if (code < 0) { + tdbError("tdb/insert-free-page: tb insert failed with ret: %d.", code); + taosArrayDestroy(pPager->frps); + pPager->frps = NULL; + return -1; + } + } + + taosArrayDestroy(pPager->frps); + pPager->frps = NULL; + pPage->pPager = NULL; return code; @@ -739,7 +763,11 @@ static int tdbPagerRemoveFreePage(SPager *pPager, SPgno *pPgno, TXN *pTxn) { TBC *pCur; if (!pPager->pEnv->pFreeDb) { - return 0; + return code; + } + + if (pPager->frps) { + return code; } code = tdbTbcOpen(pPager->pEnv->pFreeDb, &pCur, pTxn); diff --git a/source/libs/tdb/src/inc/tdbInt.h b/source/libs/tdb/src/inc/tdbInt.h index 8defe54868..8ce294a3c6 100644 --- a/source/libs/tdb/src/inc/tdbInt.h +++ b/source/libs/tdb/src/inc/tdbInt.h @@ -408,6 +408,7 @@ struct SPager { // u8 inTran; TXN *pActiveTxn; SArray *ofps; + SArray *frps; SPager *pNext; // used by TDB SPager *pHashNext; // used by TDB #ifdef USE_MAINDB