Merge branch '3.0' into fix/TD-19254-D
This commit is contained in:
commit
624eb89851
|
@ -420,7 +420,18 @@ let mut consumer = tmq.build()?;
|
|||
|
||||
<TabItem value="Python" label="Python">
|
||||
|
||||
Python 使用以下配置项创建一个 Consumer 实例。
|
||||
Python 语言下引入 `taos` 库的 `TaosConsumer` 类,创建一个 Consumer 示例:
|
||||
|
||||
```python
|
||||
from taos.tmq import TaosConsumer
|
||||
|
||||
# Syntax: `consumer = TaosConsumer(*topics, **args)`
|
||||
#
|
||||
# Example:
|
||||
consumer = TaosConsumer('topic1', 'topic2', td_connect_ip = "127.0.0.1", group_id = "local")
|
||||
```
|
||||
|
||||
其中,元组类型参数被视为 *Topics*,字典类型参数用于以下订阅配置设置:
|
||||
|
||||
| 参数名称 | 类型 | 参数说明 | 备注 |
|
||||
| :----------------------------: | :----: | -------------------------------------------------------- | ------------------------------------------- |
|
||||
|
|
|
@ -343,6 +343,8 @@ typedef struct {
|
|||
} SSchemaWrapper;
|
||||
|
||||
static FORCE_INLINE SSchemaWrapper* tCloneSSchemaWrapper(const SSchemaWrapper* pSchemaWrapper) {
|
||||
if (pSchemaWrapper->pSchema == NULL) return NULL;
|
||||
|
||||
SSchemaWrapper* pSW = (SSchemaWrapper*)taosMemoryMalloc(sizeof(SSchemaWrapper));
|
||||
if (pSW == NULL) return pSW;
|
||||
pSW->nCols = pSchemaWrapper->nCols;
|
||||
|
@ -352,6 +354,7 @@ static FORCE_INLINE SSchemaWrapper* tCloneSSchemaWrapper(const SSchemaWrapper* p
|
|||
taosMemoryFree(pSW);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
memcpy(pSW->pSchema, pSchemaWrapper->pSchema, pSW->nCols * sizeof(SSchema));
|
||||
return pSW;
|
||||
}
|
||||
|
|
|
@ -21,7 +21,7 @@ int metaEncodeEntry(SEncoder *pCoder, const SMetaEntry *pME) {
|
|||
if (tEncodeI64(pCoder, pME->version) < 0) return -1;
|
||||
if (tEncodeI8(pCoder, pME->type) < 0) return -1;
|
||||
if (tEncodeI64(pCoder, pME->uid) < 0) return -1;
|
||||
if (tEncodeCStr(pCoder, pME->name) < 0) return -1;
|
||||
if (pME->name == NULL || tEncodeCStr(pCoder, pME->name) < 0) return -1;
|
||||
|
||||
if (pME->type == TSDB_SUPER_TABLE) {
|
||||
if (tEncodeI8(pCoder, pME->flags) < 0) return -1; // TODO: need refactor?
|
||||
|
|
|
@ -505,6 +505,7 @@ typedef struct SCtgOperation {
|
|||
#define CTG_FLAG_UNKNOWN_STB 0x4
|
||||
#define CTG_FLAG_SYS_DB 0x8
|
||||
#define CTG_FLAG_FORCE_UPDATE 0x10
|
||||
#define CTG_FLAG_ONLY_CACHE 0x20
|
||||
|
||||
#define CTG_FLAG_SET(_flag, _v) ((_flag) |= (_v))
|
||||
|
||||
|
@ -783,7 +784,7 @@ void ctgFreeQNode(SCtgQNode* node);
|
|||
void ctgClearHandle(SCatalog* pCtg);
|
||||
void ctgFreeTbCacheImpl(SCtgTbCache* pCache);
|
||||
int32_t ctgRemoveTbMeta(SCatalog* pCtg, SName* pTableName);
|
||||
int32_t ctgGetTbHashVgroup(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName, SVgroupInfo* pVgroup);
|
||||
int32_t ctgGetTbHashVgroup(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName, SVgroupInfo* pVgroup, bool* exists);
|
||||
SName* ctgGetFetchName(SArray* pNames, SCtgFetch* pFetch);
|
||||
|
||||
extern SCatalogMgmt gCtgMgmt;
|
||||
|
|
|
@ -23,12 +23,21 @@
|
|||
SCatalogMgmt gCtgMgmt = {0};
|
||||
|
||||
int32_t ctgGetDBVgInfo(SCatalog* pCtg, SRequestConnInfo* pConn, const char* dbFName, SCtgDBCache** dbCache,
|
||||
SDBVgInfo** pInfo) {
|
||||
SDBVgInfo** pInfo, bool* exists) {
|
||||
int32_t code = 0;
|
||||
|
||||
CTG_ERR_RET(ctgAcquireVgInfoFromCache(pCtg, dbFName, dbCache));
|
||||
|
||||
if (*dbCache) {
|
||||
if (exists) {
|
||||
*exists = true;
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
if (exists) {
|
||||
*exists = false;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -94,7 +103,7 @@ int32_t ctgRefreshTbMeta(SCatalog* pCtg, SRequestConnInfo* pConn, SCtgTbMetaCtx*
|
|||
int32_t code = 0;
|
||||
|
||||
if (!CTG_FLAG_IS_SYS_DB(ctx->flag)) {
|
||||
CTG_ERR_RET(ctgGetTbHashVgroup(pCtg, pConn, ctx->pName, &vgroupInfo));
|
||||
CTG_ERR_RET(ctgGetTbHashVgroup(pCtg, pConn, ctx->pName, &vgroupInfo, NULL));
|
||||
}
|
||||
|
||||
STableMetaOutput moutput = {0};
|
||||
|
@ -194,7 +203,7 @@ int32_t ctgGetTbMeta(SCatalog* pCtg, SRequestConnInfo* pConn, SCtgTbMetaCtx* ctx
|
|||
STableMetaOutput* output = NULL;
|
||||
|
||||
CTG_ERR_RET(ctgGetTbMetaFromCache(pCtg, pConn, ctx, pTableMeta));
|
||||
if (*pTableMeta) {
|
||||
if (*pTableMeta || (ctx->flag & CTG_FLAG_ONLY_CACHE)) {
|
||||
goto _return;
|
||||
}
|
||||
|
||||
|
@ -415,7 +424,7 @@ int32_t ctgGetTbCfg(SCatalog* pCtg, SRequestConnInfo* pConn, SName* pTableName,
|
|||
CTG_ERR_RET(ctgGetTableCfgFromMnode(pCtg, pConn, pTableName, pCfg, NULL));
|
||||
} else {
|
||||
SVgroupInfo vgroupInfo = {0};
|
||||
CTG_ERR_RET(ctgGetTbHashVgroup(pCtg, pConn, pTableName, &vgroupInfo));
|
||||
CTG_ERR_RET(ctgGetTbHashVgroup(pCtg, pConn, pTableName, &vgroupInfo, NULL));
|
||||
CTG_ERR_RET(ctgGetTableCfgFromVnode(pCtg, pConn, pTableName, &vgroupInfo, pCfg, NULL));
|
||||
}
|
||||
|
||||
|
@ -441,7 +450,7 @@ int32_t ctgGetTbDistVgInfo(SCatalog* pCtg, SRequestConnInfo* pConn, SName* pTabl
|
|||
tNameGetFullDbName(pTableName, db);
|
||||
|
||||
SHashObj* vgHash = NULL;
|
||||
CTG_ERR_JRET(ctgGetDBVgInfo(pCtg, pConn, db, &dbCache, &vgInfo));
|
||||
CTG_ERR_JRET(ctgGetDBVgInfo(pCtg, pConn, db, &dbCache, &vgInfo, NULL));
|
||||
|
||||
if (dbCache) {
|
||||
vgHash = dbCache->vgCache.vgInfo->vgHash;
|
||||
|
@ -501,7 +510,7 @@ _return:
|
|||
CTG_RET(code);
|
||||
}
|
||||
|
||||
int32_t ctgGetTbHashVgroup(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName, SVgroupInfo* pVgroup) {
|
||||
int32_t ctgGetTbHashVgroup(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName, SVgroupInfo* pVgroup, bool* exists) {
|
||||
if (IS_SYS_DBNAME(pTableName->dbname)) {
|
||||
ctgError("no valid vgInfo for db, dbname:%s", pTableName->dbname);
|
||||
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
|
||||
|
@ -513,8 +522,13 @@ int32_t ctgGetTbHashVgroup(SCatalog* pCtg, SRequestConnInfo* pConn, const SName*
|
|||
tNameGetFullDbName(pTableName, db);
|
||||
|
||||
SDBVgInfo* vgInfo = NULL;
|
||||
CTG_ERR_JRET(ctgGetDBVgInfo(pCtg, pConn, db, &dbCache, &vgInfo));
|
||||
CTG_ERR_JRET(ctgGetDBVgInfo(pCtg, pConn, db, &dbCache, &vgInfo, exists));
|
||||
|
||||
if (exists && false == *exists) {
|
||||
ctgDebug("db %s vgInfo not in cache", pTableName->dbname);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCtg, vgInfo ? vgInfo : dbCache->vgCache.vgInfo, pTableName, pVgroup));
|
||||
|
||||
_return:
|
||||
|
@ -737,7 +751,7 @@ int32_t catalogGetDBVgInfo(SCatalog* pCtg, SRequestConnInfo* pConn, const char*
|
|||
SArray* vgList = NULL;
|
||||
SHashObj* vgHash = NULL;
|
||||
SDBVgInfo* vgInfo = NULL;
|
||||
CTG_ERR_JRET(ctgGetDBVgInfo(pCtg, pConn, dbFName, &dbCache, &vgInfo));
|
||||
CTG_ERR_JRET(ctgGetDBVgInfo(pCtg, pConn, dbFName, &dbCache, &vgInfo, NULL));
|
||||
if (dbCache) {
|
||||
vgHash = dbCache->vgCache.vgInfo->vgHash;
|
||||
} else {
|
||||
|
@ -880,6 +894,17 @@ int32_t catalogGetTableMeta(SCatalog* pCtg, SRequestConnInfo* pConn, const SName
|
|||
CTG_API_LEAVE(ctgGetTbMeta(pCtg, pConn, &ctx, pTableMeta));
|
||||
}
|
||||
|
||||
int32_t catalogGetCachedTableMeta(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName, STableMeta** pTableMeta) {
|
||||
CTG_API_ENTER();
|
||||
|
||||
SCtgTbMetaCtx ctx = {0};
|
||||
ctx.pName = (SName*)pTableName;
|
||||
ctx.flag = CTG_FLAG_UNKNOWN_STB | CTG_FLAG_ONLY_CACHE;
|
||||
|
||||
CTG_API_LEAVE(ctgGetTbMeta(pCtg, pConn, &ctx, pTableMeta));
|
||||
}
|
||||
|
||||
|
||||
int32_t catalogGetSTableMeta(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName,
|
||||
STableMeta** pTableMeta) {
|
||||
CTG_API_ENTER();
|
||||
|
@ -891,6 +916,18 @@ int32_t catalogGetSTableMeta(SCatalog* pCtg, SRequestConnInfo* pConn, const SNam
|
|||
CTG_API_LEAVE(ctgGetTbMeta(pCtg, pConn, &ctx, pTableMeta));
|
||||
}
|
||||
|
||||
int32_t catalogGetCachedSTableMeta(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName,
|
||||
STableMeta** pTableMeta) {
|
||||
CTG_API_ENTER();
|
||||
|
||||
SCtgTbMetaCtx ctx = {0};
|
||||
ctx.pName = (SName*)pTableName;
|
||||
ctx.flag = CTG_FLAG_STB | CTG_FLAG_ONLY_CACHE;
|
||||
|
||||
CTG_API_LEAVE(ctgGetTbMeta(pCtg, pConn, &ctx, pTableMeta));
|
||||
}
|
||||
|
||||
|
||||
int32_t catalogUpdateTableMeta(SCatalog* pCtg, STableMetaRsp* pMsg) {
|
||||
CTG_API_ENTER();
|
||||
|
||||
|
@ -1009,7 +1046,14 @@ int32_t catalogGetTableHashVgroup(SCatalog* pCtg, SRequestConnInfo* pConn, const
|
|||
SVgroupInfo* pVgroup) {
|
||||
CTG_API_ENTER();
|
||||
|
||||
CTG_API_LEAVE(ctgGetTbHashVgroup(pCtg, pConn, pTableName, pVgroup));
|
||||
CTG_API_LEAVE(ctgGetTbHashVgroup(pCtg, pConn, pTableName, pVgroup, NULL));
|
||||
}
|
||||
|
||||
int32_t catalogGetCachedTableHashVgroup(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName,
|
||||
SVgroupInfo* pVgroup, bool* exists) {
|
||||
CTG_API_ENTER();
|
||||
|
||||
CTG_API_LEAVE(ctgGetTbHashVgroup(pCtg, pConn, pTableName, pVgroup, exists));
|
||||
}
|
||||
|
||||
int32_t catalogGetAllMeta(SCatalog* pCtg, SRequestConnInfo* pConn, const SCatalogReq* pReq, SMetaData* pRsp) {
|
||||
|
|
|
@ -1094,6 +1094,7 @@ static int tdbBtreeEncodePayload(SPage *pPage, SCell *pCell, int nHeader, const
|
|||
// fetch next ofp, a new ofp and make it dirty
|
||||
ret = tdbFetchOvflPage(&pgno, &nextOfp, pTxn, pBt);
|
||||
if (ret < 0) {
|
||||
tdbFree(pBuf);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
@ -1135,6 +1136,11 @@ static int tdbBtreeEncodePayload(SPage *pPage, SCell *pCell, int nHeader, const
|
|||
memcpy(pBuf, ((SCell *)pVal) + vLen - nLeft, bytes);
|
||||
memcpy(pBuf + bytes, &pgno, sizeof(pgno));
|
||||
|
||||
if (ofp == NULL) {
|
||||
tdbFree(pBuf);
|
||||
return -1;
|
||||
}
|
||||
|
||||
ret = tdbPageInsertCell(ofp, 0, pBuf, bytes + sizeof(pgno), 0);
|
||||
if (ret < 0) {
|
||||
tdbFree(pBuf);
|
||||
|
|
|
@ -106,7 +106,8 @@ int32_t tdbBegin(TDB *pDb, TXN *pTxn) {
|
|||
for (pPager = pDb->pgrList; pPager; pPager = pPager->pNext) {
|
||||
ret = tdbPagerBegin(pPager, pTxn);
|
||||
if (ret < 0) {
|
||||
tdbError("failed to begin pager since %s. dbName:%s, txnId:%ld", tstrerror(terrno), pDb->dbName, pTxn->txnId);
|
||||
tdbError("failed to begin pager since %s. dbName:%s, txnId:%" PRId64, tstrerror(terrno), pDb->dbName,
|
||||
pTxn->txnId);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
@ -121,7 +122,8 @@ int32_t tdbCommit(TDB *pDb, TXN *pTxn) {
|
|||
for (pPager = pDb->pgrList; pPager; pPager = pPager->pNext) {
|
||||
ret = tdbPagerCommit(pPager, pTxn);
|
||||
if (ret < 0) {
|
||||
tdbError("failed to commit pager since %s. dbName:%s, txnId:%ld", tstrerror(terrno), pDb->dbName, pTxn->txnId);
|
||||
tdbError("failed to commit pager since %s. dbName:%s, txnId:%" PRId64, tstrerror(terrno), pDb->dbName,
|
||||
pTxn->txnId);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
@ -151,7 +153,8 @@ int32_t tdbAbort(TDB *pDb, TXN *pTxn) {
|
|||
for (pPager = pDb->pgrList; pPager; pPager = pPager->pNext) {
|
||||
ret = tdbPagerAbort(pPager, pTxn);
|
||||
if (ret < 0) {
|
||||
tdbError("failed to abort pager since %s. dbName:%s, txnId:%ld", tstrerror(terrno), pDb->dbName, pTxn->txnId);
|
||||
tdbError("failed to abort pager since %s. dbName:%s, txnId:%" PRId64, tstrerror(terrno), pDb->dbName,
|
||||
pTxn->txnId);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -105,7 +105,7 @@ static int tdbPCacheAlterImpl(SPCache *pCache, int32_t nPage) {
|
|||
for (int32_t iPage = pCache->nPages; iPage < nPage; iPage++) {
|
||||
if (tdbPageCreate(pCache->szPage, &aPage[iPage], tdbDefaultMalloc, NULL) < 0) {
|
||||
// TODO: handle error
|
||||
tdbOsFree(pCache->aPage);
|
||||
tdbOsFree(aPage);
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
|
|
@ -575,7 +575,7 @@ static int tdbPagerWritePageToDB(SPager *pPager, SPage *pPage) {
|
|||
|
||||
offset = (i64)pPage->pageSize * (TDB_PAGE_PGNO(pPage) - 1);
|
||||
if (tdbOsLSeek(pPager->fd, offset, SEEK_SET) < 0) {
|
||||
tdbError("failed to lseek due to %s. file:%s, offset:%ld", strerror(errno), pPager->dbFileName, offset);
|
||||
tdbError("failed to lseek due to %s. file:%s, offset:%" PRId64, strerror(errno), pPager->dbFileName, offset);
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
return -1;
|
||||
}
|
||||
|
@ -630,7 +630,7 @@ int tdbPagerRestore(SPager *pPager, SBTree *pBt) {
|
|||
|
||||
i64 offset = pPager->pageSize * (pgno - 1);
|
||||
if (tdbOsLSeek(pPager->fd, offset, SEEK_SET) < 0) {
|
||||
tdbError("failed to lseek fd due to %s. file:%s, offset:%ld", strerror(errno), pPager->dbFileName, offset);
|
||||
tdbError("failed to lseek fd due to %s. file:%s, offset:%" PRId64, strerror(errno), pPager->dbFileName, offset);
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
tdbOsFree(pageBuf);
|
||||
return -1;
|
||||
|
|
Loading…
Reference in New Issue