From ff7168f18b71314e88c298d7023b817a382f7b9c Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Fri, 17 Dec 2021 13:39:42 +0800 Subject: [PATCH 1/2] correct terrno usage --- source/libs/catalog/inc/catalogInt.h | 7 +- source/libs/catalog/src/catalog.c | 121 +++++++++------------------ 2 files changed, 45 insertions(+), 83 deletions(-) diff --git a/source/libs/catalog/inc/catalogInt.h b/source/libs/catalog/inc/catalogInt.h index 455c82b1bc..7b26079f11 100644 --- a/source/libs/catalog/inc/catalogInt.h +++ b/source/libs/catalog/inc/catalogInt.h @@ -70,9 +70,10 @@ extern int32_t ctgDebugFlag; #define ctgDebugL(...) do { if (ctgDebugFlag & DEBUG_DEBUG) { taosPrintLongString("CTG ", ctgDebugFlag, __VA_ARGS__); }} while(0) -#define CTG_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { return _code; } } while (0) -#define CTG_ERR_LRET(c,...) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { ctgError(__VA_ARGS__); return _code; } } while (0) -#define CTG_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { goto _return; } } while (0) +#define CTG_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; return _code; } } while (0) +#define CTG_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0) +#define CTG_ERR_LRET(c,...) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { ctgError(__VA_ARGS__); terrno = _code; return _code; } } while (0) +#define CTG_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _return; } } while (0) #ifdef __cplusplus diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index 248cdbe51c..32cedb82b0 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -49,16 +49,13 @@ int32_t ctgGetDBVgroupFromMnode(struct SCatalog* pCatalog, void *pRpc, const SEp SEpSet *pVnodeEpSet = NULL; int32_t msgLen = 0; - int32_t code = queryBuildMsg[TSDB_MSG_TYPE_USE_DB](input, &msg, 0, &msgLen); - if (code) { - return code; - } + CTG_ERR_RET(queryBuildMsg[TSDB_MSG_TYPE_USE_DB](input, &msg, 0, &msgLen)); char *pMsg = rpcMallocCont(msgLen); if (NULL == pMsg) { ctgError("rpc malloc %d failed", msgLen); tfree(msg); - return TSDB_CODE_CTG_MEM_ERROR; + CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); } memcpy(pMsg, msg, msgLen); @@ -76,13 +73,10 @@ int32_t ctgGetDBVgroupFromMnode(struct SCatalog* pCatalog, void *pRpc, const SEp rpcSendRecv(pRpc, (SEpSet*)pMgmtEps, &rpcMsg, &rpcRsp); if (TSDB_CODE_SUCCESS != rpcRsp.code) { ctgError("error rsp for use db, code:%x", rpcRsp.code); - return rpcRsp.code; + CTG_ERR_RET(rpcRsp.code); } - code = queryProcessMsgRsp[TSDB_MSG_TYPE_USE_DB](out, rpcRsp.pCont, rpcRsp.contLen); - if (code) { - return code; - } + CTG_ERR_RET(queryProcessMsgRsp[TSDB_MSG_TYPE_USE_DB](out, rpcRsp.pCont, rpcRsp.contLen)); return TSDB_CODE_SUCCESS; } @@ -114,14 +108,14 @@ int32_t ctgGetTableMetaFromCache(struct SCatalog* pCatalog, const char *dbName, if ((*stbMeta)->suid != tbMeta->suid) { ctgError("stable cache error, expected suid:%"PRId64 ",actual suid:%"PRId64, tbMeta->suid, (*stbMeta)->suid); - return TSDB_CODE_CTG_INTERNAL_ERROR; + CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR); } int32_t metaSize = sizeof(STableMeta) + ((*stbMeta)->tableInfo.numOfTags + (*stbMeta)->tableInfo.numOfColumns) * sizeof(SSchema); *pTableMeta = calloc(1, metaSize); if (NULL == *pTableMeta) { ctgError("calloc size[%d] failed", metaSize); - return TSDB_CODE_CTG_MEM_ERROR; + CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); } memcpy(*pTableMeta, tbMeta, sizeof(SCTableMeta)); @@ -131,7 +125,7 @@ int32_t ctgGetTableMetaFromCache(struct SCatalog* pCatalog, const char *dbName, *pTableMeta = calloc(1, metaSize); if (NULL == *pTableMeta) { ctgError("calloc size[%d] failed", metaSize); - return TSDB_CODE_CTG_MEM_ERROR; + CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); } memcpy(*pTableMeta, tbMeta, metaSize); @@ -155,7 +149,7 @@ void ctgGenEpSet(SEpSet *epSet, SVgroupInfo *vgroupInfo) { int32_t ctgGetTableMetaFromMnode(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char *pDBName, const char* pTableName, SVgroupInfo *vgroupInfo, STableMetaOutput* output) { if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pDBName || NULL == pTableName || NULL == vgroupInfo || NULL == output) { - return TSDB_CODE_CTG_INVALID_INPUT; + CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); } char tbFullName[TSDB_TABLE_FNAME_LEN]; @@ -167,10 +161,7 @@ int32_t ctgGetTableMetaFromMnode(struct SCatalog* pCatalog, void *pRpc, const SE SEpSet *pVnodeEpSet = NULL; int32_t msgLen = 0; - int32_t code = queryBuildMsg[TSDB_MSG_TYPE_TABLE_META](&bInput, &msg, 0, &msgLen); - if (code) { - return code; - } + CTG_ERR_RET(queryBuildMsg[TSDB_MSG_TYPE_TABLE_META](&bInput, &msg, 0, &msgLen)); SRpcMsg rpcMsg = { .msgType = TSDB_MSG_TYPE_TABLE_META, @@ -187,13 +178,10 @@ int32_t ctgGetTableMetaFromMnode(struct SCatalog* pCatalog, void *pRpc, const SE if (TSDB_CODE_SUCCESS != rpcRsp.code) { ctgError("error rsp for table meta, code:%x", rpcRsp.code); - return rpcRsp.code; + CTG_ERR_RET(rpcRsp.code); } - code = queryProcessMsgRsp[TSDB_MSG_TYPE_TABLE_META](output, rpcRsp.pCont, rpcRsp.contLen); - if (code) { - return code; - } + CTG_ERR_RET(queryProcessMsgRsp[TSDB_MSG_TYPE_TABLE_META](output, rpcRsp.pCont, rpcRsp.contLen)); return TSDB_CODE_SUCCESS; } @@ -219,7 +207,7 @@ int32_t ctgGetVgInfoFromDB(struct SCatalog *pCatalog, void *pRpc, const SEpSet * if (NULL == taosArrayPush(vgroupList, vgInfo)) { ctgError("taosArrayPush failed"); - break; + CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); } pIter = taosHashIterate(dbInfo->vgInfo, pIter); @@ -233,7 +221,7 @@ int32_t ctgGetVgInfoFromHashValue(SDBVgroupInfo *dbInfo, const char *pDBName, co int32_t vgNum = taosHashGetSize(dbInfo->vgInfo); if (vgNum <= 0) { ctgError("db[%s] vgroup cache invalid, vgroup number:%d", pDBName, vgNum); - return TSDB_CODE_TSC_DB_NOT_SELECTED; + CTG_ERR_RET(TSDB_CODE_TSC_DB_NOT_SELECTED); } tableNameHashFp fp = NULL; @@ -260,7 +248,7 @@ int32_t ctgGetVgInfoFromHashValue(SDBVgroupInfo *dbInfo, const char *pDBName, co if (NULL == vgInfo) { ctgError("no hash range found for hashvalue[%u]", hashValue); - return TSDB_CODE_CTG_INTERNAL_ERROR; + CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR); } *pVgroup = *vgInfo; @@ -268,35 +256,9 @@ int32_t ctgGetVgInfoFromHashValue(SDBVgroupInfo *dbInfo, const char *pDBName, co return TSDB_CODE_SUCCESS; } - - -STableMeta* ctgCreateSTableMeta(STableMetaMsg* pChild) { - assert(pChild != NULL); - int32_t total = pChild->numOfColumns + pChild->numOfTags; - - STableMeta* pTableMeta = calloc(1, sizeof(STableMeta) + sizeof(SSchema) * total); - pTableMeta->tableType = TSDB_SUPER_TABLE; - pTableMeta->tableInfo.numOfTags = pChild->numOfTags; - pTableMeta->tableInfo.numOfColumns = pChild->numOfColumns; - pTableMeta->tableInfo.precision = pChild->precision; - - pTableMeta->uid = pChild->suid; - pTableMeta->tversion = pChild->tversion; - pTableMeta->sversion = pChild->sversion; - - memcpy(pTableMeta->schema, pChild->pSchema, sizeof(SSchema) * total); - - int32_t num = pTableMeta->tableInfo.numOfColumns; - for(int32_t i = 0; i < num; ++i) { - pTableMeta->tableInfo.rowSize += pTableMeta->schema[i].bytes; - } - - return pTableMeta; -} - int32_t ctgGetTableMetaImpl(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, bool forceUpdate, STableMeta** pTableMeta) { if (NULL == pCatalog || NULL == pDBName || NULL == pRpc || NULL == pMgmtEps || NULL == pTableName || NULL == pTableMeta) { - return TSDB_CODE_CTG_INVALID_INPUT; + CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); } int32_t exist = 0; @@ -315,7 +277,7 @@ int32_t ctgGetTableMetaImpl(struct SCatalog* pCatalog, void *pRpc, const SEpSet* if (0 == exist) { ctgError("get table meta from cache failed, but fetch succeed"); - return TSDB_CODE_CTG_INTERNAL_ERROR; + CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR); } return TSDB_CODE_SUCCESS; @@ -325,19 +287,19 @@ int32_t ctgGetTableMetaImpl(struct SCatalog* pCatalog, void *pRpc, const SEpSet* int32_t ctgUpdateTableMetaCache(struct SCatalog *pCatalog, STableMetaOutput *output) { if (output->metaNum != 1 && output->metaNum != 2) { ctgError("invalid table meta number[%d] got from meta rsp", output->metaNum); - return TSDB_CODE_CTG_INTERNAL_ERROR; + CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR); } if (NULL == output->tbMeta) { ctgError("no valid table meta got from meta rsp"); - return TSDB_CODE_CTG_INTERNAL_ERROR; + CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR); } if (NULL == pCatalog->tableCache.cache) { pCatalog->tableCache.cache = taosHashInit(ctgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); if (NULL == pCatalog->tableCache.cache) { ctgError("init hash[%d] for tablemeta cache failed", ctgMgmt.cfg.maxTblCacheNum); - return TSDB_CODE_CTG_MEM_ERROR; + CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); } } @@ -345,13 +307,13 @@ int32_t ctgUpdateTableMetaCache(struct SCatalog *pCatalog, STableMetaOutput *out pCatalog->tableCache.cache = taosHashInit(ctgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); if (NULL == pCatalog->tableCache.cache) { ctgError("init hash[%d] for tablemeta cache failed", ctgMgmt.cfg.maxTblCacheNum); - return TSDB_CODE_CTG_MEM_ERROR; + CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); } pCatalog->tableCache.stableCache = taosHashInit(ctgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), true, HASH_ENTRY_LOCK); if (NULL == pCatalog->tableCache.stableCache) { ctgError("init hash[%d] for stablemeta cache failed", ctgMgmt.cfg.maxTblCacheNum); - return TSDB_CODE_CTG_MEM_ERROR; + CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); } } @@ -389,7 +351,7 @@ error_exit: pCatalog->vgroupCache.vgroupVersion = CTG_DEFAULT_INVALID_VERSION; - return TSDB_CODE_CTG_INTERNAL_ERROR; + CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR); } int32_t catalogInit(SCatalogCfg *cfg) { @@ -411,12 +373,12 @@ int32_t catalogInit(SCatalogCfg *cfg) { int32_t catalogGetHandle(const char *clusterId, struct SCatalog** catalogHandle) { if (NULL == clusterId || NULL == catalogHandle) { - return TSDB_CODE_CTG_INVALID_INPUT; + CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); } if (NULL == ctgMgmt.pCluster) { ctgError("cluster cache are not ready"); - return TSDB_CODE_CTG_NOT_READY; + CTG_ERR_RET(TSDB_CODE_CTG_NOT_READY); } size_t clen = strlen(clusterId); @@ -430,7 +392,7 @@ int32_t catalogGetHandle(const char *clusterId, struct SCatalog** catalogHandle) clusterCtg = calloc(1, sizeof(*clusterCtg)); if (NULL == clusterCtg) { ctgError("calloc %d failed", (int32_t)sizeof(*clusterCtg)); - return TSDB_CODE_CTG_MEM_ERROR; + CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); } clusterCtg->vgroupCache.vgroupVersion = CTG_DEFAULT_INVALID_VERSION; @@ -438,7 +400,7 @@ int32_t catalogGetHandle(const char *clusterId, struct SCatalog** catalogHandle) if (taosHashPut(ctgMgmt.pCluster, clusterId, clen, &clusterCtg, POINTER_BYTES)) { ctgError("put cluster %s cache to hash failed", clusterId); tfree(clusterCtg); - return TSDB_CODE_CTG_INTERNAL_ERROR; + CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR); } *catalogHandle = clusterCtg; @@ -448,7 +410,7 @@ int32_t catalogGetHandle(const char *clusterId, struct SCatalog** catalogHandle) int32_t catalogGetDBVgroupVersion(struct SCatalog* pCatalog, const char* dbName, int32_t* version) { if (NULL == pCatalog || NULL == dbName || NULL == version) { - return TSDB_CODE_CTG_INVALID_INPUT; + CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); } if (NULL == pCatalog->dbCache.cache) { @@ -469,7 +431,7 @@ int32_t catalogGetDBVgroupVersion(struct SCatalog* pCatalog, const char* dbName, int32_t catalogUpdateDBVgroupCache(struct SCatalog* pCatalog, const char* dbName, SDBVgroupInfo* dbInfo) { if (NULL == pCatalog || NULL == dbName || NULL == dbInfo) { - return TSDB_CODE_CTG_INVALID_INPUT; + CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); } if (dbInfo->vgVersion < 0) { @@ -485,7 +447,7 @@ int32_t catalogUpdateDBVgroupCache(struct SCatalog* pCatalog, const char* dbName pCatalog->dbCache.cache = taosHashInit(CTG_DEFAULT_CACHE_DB_NUMBER, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); if (NULL == pCatalog->dbCache.cache) { ctgError("init hash[%d] for db cache failed", CTG_DEFAULT_CACHE_DB_NUMBER); - return TSDB_CODE_CTG_MEM_ERROR; + CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); } } else { SDBVgroupInfo *oldInfo = taosHashGet(pCatalog->dbCache.cache, dbName, strlen(dbName)); @@ -497,7 +459,7 @@ int32_t catalogUpdateDBVgroupCache(struct SCatalog* pCatalog, const char* dbName if (taosHashPut(pCatalog->dbCache.cache, dbName, strlen(dbName), dbInfo, sizeof(*dbInfo)) != 0) { ctgError("push to vgroup hash cache failed"); - return TSDB_CODE_CTG_MEM_ERROR; + CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); } return TSDB_CODE_SUCCESS; @@ -508,11 +470,10 @@ int32_t catalogUpdateDBVgroupCache(struct SCatalog* pCatalog, const char* dbName int32_t catalogGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* dbName, int32_t forceUpdate, SDBVgroupInfo* dbInfo) { if (NULL == pCatalog || NULL == dbName || NULL == pRpc || NULL == pMgmtEps) { - return TSDB_CODE_CTG_INVALID_INPUT; + CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); } int32_t exist = 0; - int32_t code = 0; if (0 == forceUpdate) { CTG_ERR_RET(ctgGetDBVgroupFromCache(pCatalog, dbName, dbInfo, &exist)); @@ -537,7 +498,7 @@ int32_t catalogGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* *dbInfo = DbOut.dbVgroup; } - return code; + return TSDB_CODE_SUCCESS; } int32_t catalogGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, STableMeta** pTableMeta) { @@ -546,7 +507,7 @@ int32_t catalogGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName) { if (NULL == pCatalog || NULL == pDBName || NULL == pRpc || NULL == pMgmtEps || NULL == pTableName) { - return TSDB_CODE_CTG_INVALID_INPUT; + CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); } SVgroupInfo vgroupInfo = {0}; @@ -570,7 +531,7 @@ int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const int32_t catalogGetTableDistVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, SArray* pVgroupList) { if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pDBName || NULL == pTableName || NULL == pVgroupList) { - return TSDB_CODE_CTG_INVALID_INPUT; + CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); } STableMeta *tbMeta = NULL; @@ -588,7 +549,7 @@ int32_t catalogGetTableDistVgroup(struct SCatalog* pCatalog, void *pRpc, const S int32_t vgId = tbMeta->vgId; if (NULL == taosHashGetClone(dbVgroup.vgInfo, &vgId, sizeof(vgId), &vgroupInfo)) { ctgError("vgId[%d] not found in vgroup list", vgId); - return TSDB_CODE_CTG_INTERNAL_ERROR; + CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR); } if (NULL == taosArrayPush(pVgroupList, &vgroupInfo)) { @@ -600,7 +561,7 @@ int32_t catalogGetTableDistVgroup(struct SCatalog* pCatalog, void *pRpc, const S _return: tfree(tbMeta); - return code; + CTG_RET(code); } @@ -613,18 +574,18 @@ int32_t catalogGetTableHashVgroup(struct SCatalog *pCatalog, void *pRpc, const S if (dbInfo.vgVersion < 0 || NULL == dbInfo.vgInfo) { ctgError("db[%s] vgroup cache invalid, vgroup version:%d, vgInfo:%p", pDBName, dbInfo.vgVersion, dbInfo.vgInfo); - return TSDB_CODE_TSC_DB_NOT_SELECTED; + CTG_ERR_RET(TSDB_CODE_TSC_DB_NOT_SELECTED); } CTG_ERR_RET(ctgGetVgInfoFromHashValue(&dbInfo, pDBName, pTableName, pVgroup)); - return code; + CTG_RET(code); } int32_t catalogGetAllMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SCatalogReq* pReq, SMetaData* pRsp) { if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pReq || NULL == pRsp) { - return TSDB_CODE_CTG_INVALID_INPUT; + CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); } int32_t code = 0; @@ -636,7 +597,7 @@ int32_t catalogGetAllMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* p pRsp->pTableMeta = taosArrayInit(tbNum, POINTER_BYTES); if (NULL == pRsp->pTableMeta) { ctgError("taosArrayInit num[%d] failed", tbNum); - return TSDB_CODE_CTG_MEM_ERROR; + CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); } } @@ -670,7 +631,7 @@ _return: taosArrayDestroy(pRsp->pTableMeta); } - return code; + CTG_RET(code); } void catalogDestroy(void) { From d037b24e059f0ae25e6e987cf4c770b4abe30ecd Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Fri, 17 Dec 2021 13:40:27 +0800 Subject: [PATCH 2/2] refactor wal --- include/common/taosmsg.h | 5 +- include/dnode/vnode/tq/tq.h | 13 ++--- source/dnode/mgmt/impl/src/dndTransport.c | 3 +- source/dnode/vnode/impl/inc/vnodeStateMgr.h | 8 +-- source/dnode/vnode/impl/src/vnodeWrite.c | 23 +++++++-- source/dnode/vnode/tq/inc/tqInt.h | 20 ++++---- source/dnode/vnode/tq/inc/tqMetaStore.h | 27 ++++------ source/dnode/vnode/tq/src/tq.c | 8 ++- source/libs/wal/inc/walInt.h | 56 ++++++++++----------- source/libs/wal/src/walMeta.c | 14 +++--- source/libs/wal/src/walMgmt.c | 2 +- source/libs/wal/src/walRead.c | 12 ++--- source/libs/wal/src/walSeek.c | 12 ++--- source/libs/wal/src/walWrite.c | 32 ++++++------ 14 files changed, 123 insertions(+), 112 deletions(-) diff --git a/include/common/taosmsg.h b/include/common/taosmsg.h index 8abfe0ffed..1cc6c3b5a2 100644 --- a/include/common/taosmsg.h +++ b/include/common/taosmsg.h @@ -50,10 +50,9 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_CONSUME, "mq-consume" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_QUERY, "mq-query" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_CONNECT, "mq-connect" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_DISCONNECT, "mq-disconnect" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_ACK, "mq-ack" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_RESET, "mq-reset" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_SET, "mq-set" ) // message from client to mnode -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CONNECT, "connect" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CONNECT, "connect" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_ACCT, "create-acct" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_ALTER_ACCT, "alter-acct" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_ACCT, "drop-acct" ) diff --git a/include/dnode/vnode/tq/tq.h b/include/dnode/vnode/tq/tq.h index d6f7b46870..7993a8f1ab 100644 --- a/include/dnode/vnode/tq/tq.h +++ b/include/dnode/vnode/tq/tq.h @@ -62,10 +62,10 @@ typedef struct TmqDisconnectRsp { int8_t status; } TmqDisconnectRsp; -typedef struct TmqConsumeReq { +typedef struct STqConsumeReq { TmqMsgHead head; TmqAcks acks; -} TmqConsumeReq; +} STqConsumeReq; typedef struct TmqMsgContent { int64_t topicId; @@ -73,11 +73,11 @@ typedef struct TmqMsgContent { char msg[]; } TmqMsgContent; -typedef struct TmqConsumeRsp { +typedef struct STqConsumeRsp { TmqMsgHead head; int64_t bodySize; TmqMsgContent msgs[]; -} TmqConsumeRsp; +} STqConsumeRsp; typedef struct TmqSubscribeReq { TmqMsgHead head; @@ -261,13 +261,14 @@ typedef struct STQ { // open in each vnode STQ* tqOpen(const char* path, STqCfg* tqConfig, STqLogReader* tqLogReader, SMemAllocatorFactory* allocFac); -void tqDestroy(STQ*); +void tqClose(STQ*); // void* will be replace by a msg type int tqPushMsg(STQ*, void* msg, int64_t version); int tqCommit(STQ*); +int tqSetCursor(STQ*, void* msg); -int tqConsume(STQ*, TmqConsumeReq*); +int tqConsume(STQ*, STqConsumeReq*); STqGroupHandle* tqGetGroupHandle(STQ*, int64_t cId); diff --git a/source/dnode/mgmt/impl/src/dndTransport.c b/source/dnode/mgmt/impl/src/dndTransport.c index 6dc46cefcd..1db92644ae 100644 --- a/source/dnode/mgmt/impl/src/dndTransport.c +++ b/source/dnode/mgmt/impl/src/dndTransport.c @@ -44,8 +44,7 @@ static void dndInitMsgFp(STransMgmt *pMgmt) { pMgmt->msgFp[TSDB_MSG_TYPE_MQ_CONSUME] = dndProcessVnodeQueryMsg; pMgmt->msgFp[TSDB_MSG_TYPE_MQ_CONNECT] = dndProcessVnodeWriteMsg; pMgmt->msgFp[TSDB_MSG_TYPE_MQ_DISCONNECT] = dndProcessVnodeWriteMsg; - pMgmt->msgFp[TSDB_MSG_TYPE_MQ_ACK] = dndProcessVnodeWriteMsg; - pMgmt->msgFp[TSDB_MSG_TYPE_MQ_RESET] = dndProcessVnodeWriteMsg; + pMgmt->msgFp[TSDB_MSG_TYPE_MQ_SET] = dndProcessVnodeWriteMsg; // msg from client to mnode pMgmt->msgFp[TSDB_MSG_TYPE_CONNECT] = dndProcessMnodeReadMsg; diff --git a/source/dnode/vnode/impl/inc/vnodeStateMgr.h b/source/dnode/vnode/impl/inc/vnodeStateMgr.h index 788426e25e..5862b304ed 100644 --- a/source/dnode/vnode/impl/inc/vnodeStateMgr.h +++ b/source/dnode/vnode/impl/inc/vnodeStateMgr.h @@ -21,13 +21,13 @@ extern "C" { #endif typedef struct { - uint64_t processed; - uint64_t committed; - uint64_t applied; + int64_t processed; + int64_t committed; + int64_t applied; } SVState; #ifdef __cplusplus } #endif -#endif /*_TD_VNODE_STATE_MGR_H_*/ \ No newline at end of file +#endif /*_TD_VNODE_STATE_MGR_H_*/ diff --git a/source/dnode/vnode/impl/src/vnodeWrite.c b/source/dnode/vnode/impl/src/vnodeWrite.c index 85460e8d91..85e044266a 100644 --- a/source/dnode/vnode/impl/src/vnodeWrite.c +++ b/source/dnode/vnode/impl/src/vnodeWrite.c @@ -15,16 +15,31 @@ #include "vnodeDef.h" +int vnodeProcessNoWalWMsgs(SVnode *pVnode, SRpcMsg *pMsg) { + SVnodeReq *pVnodeReq; + + switch (pMsg->msgType) { + case TSDB_MSG_TYPE_MQ_SET: + if (tqSetCursor(pVnode->pTq, pMsg->pCont) < 0) { + // TODO: handle error + } + break; + } + + void *pBuf = pMsg->pCont; + return 0; +} + int vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs) { - SRpcMsg * pMsg; + SRpcMsg *pMsg; SVnodeReq *pVnodeReq; for (int i = 0; i < taosArrayGetSize(pMsgs); i++) { pMsg = *(SRpcMsg **)taosArrayGet(pMsgs, i); // ser request version - void * pBuf = pMsg->pCont; - uint64_t ver = pVnode->state.processed++; + void *pBuf = pMsg->pCont; + int64_t ver = pVnode->state.processed++; taosEncodeFixedU64(&pBuf, ver); if (walWrite(pVnode->pWal, ver, pMsg->msgType, pMsg->pCont, pMsg->contLen) < 0) { @@ -99,4 +114,4 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { return 0; } -/* ------------------------ STATIC METHODS ------------------------ */ \ No newline at end of file +/* ------------------------ STATIC METHODS ------------------------ */ diff --git a/source/dnode/vnode/tq/inc/tqInt.h b/source/dnode/vnode/tq/inc/tqInt.h index 100149c0ea..022b599816 100644 --- a/source/dnode/vnode/tq/inc/tqInt.h +++ b/source/dnode/vnode/tq/inc/tqInt.h @@ -22,16 +22,16 @@ extern "C" { #endif -//create persistent storage for meta info such as consuming offset -//return value > 0: cgId -//return value <= 0: error code -//int tqCreateTCGroup(STQ*, const char* topic, int cgId, tqBufferHandle** handle); -//create ring buffer in memory and load consuming offset -//int tqOpenTCGroup(STQ*, const char* topic, int cgId); -//destroy ring buffer and persist consuming offset -//int tqCloseTCGroup(STQ*, const char* topic, int cgId); -//delete persistent storage for meta info -//int tqDropTCGroup(STQ*, const char* topic, int cgId); +// create persistent storage for meta info such as consuming offset +// return value > 0: cgId +// return value <= 0: error code +// int tqCreateTCGroup(STQ*, const char* topic, int cgId, tqBufferHandle** handle); +// create ring buffer in memory and load consuming offset +// int tqOpenTCGroup(STQ*, const char* topic, int cgId); +// destroy ring buffer and persist consuming offset +// int tqCloseTCGroup(STQ*, const char* topic, int cgId); +// delete persistent storage for meta info +// int tqDropTCGroup(STQ*, const char* topic, int cgId); #ifdef __cplusplus } diff --git a/source/dnode/vnode/tq/inc/tqMetaStore.h b/source/dnode/vnode/tq/inc/tqMetaStore.h index 253852b00f..5bcedaed74 100644 --- a/source/dnode/vnode/tq/inc/tqMetaStore.h +++ b/source/dnode/vnode/tq/inc/tqMetaStore.h @@ -23,27 +23,22 @@ extern "C" { #endif - -STqMetaStore* tqStoreOpen(const char* path, - FTqSerialize pSerializer, - FTqDeserialize pDeserializer, - FTqDelete pDeleter, - int32_t tqConfigFlag - ); +STqMetaStore* tqStoreOpen(const char* path, FTqSerialize pSerializer, FTqDeserialize pDeserializer, FTqDelete pDeleter, + int32_t tqConfigFlag); int32_t tqStoreClose(STqMetaStore*); -//int32_t tqStoreDelete(TqMetaStore*); -//int32_t tqStoreCommitAll(TqMetaStore*); -int32_t tqStorePersist(STqMetaStore*); -//clean deleted idx and data from persistent file -int32_t tqStoreCompact(STqMetaStore*); +// int32_t tqStoreDelete(TqMetaStore*); +// int32_t tqStoreCommitAll(TqMetaStore*); +int32_t tqStorePersist(STqMetaStore*); +// clean deleted idx and data from persistent file +int32_t tqStoreCompact(STqMetaStore*); -void* tqHandleGet(STqMetaStore*, int64_t key); -//make it unpersist +void* tqHandleGet(STqMetaStore*, int64_t key); +// make it unpersist void* tqHandleTouchGet(STqMetaStore*, int64_t key); int32_t tqHandleMovePut(STqMetaStore*, int64_t key, void* value); int32_t tqHandleCopyPut(STqMetaStore*, int64_t key, void* value, size_t vsize); -//delete committed kv pair -//notice that a delete action still needs to be committed +// delete committed kv pair +// notice that a delete action still needs to be committed int32_t tqHandleDel(STqMetaStore*, int64_t key); int32_t tqHandleCommit(STqMetaStore*, int64_t key); int32_t tqHandleAbort(STqMetaStore*, int64_t key); diff --git a/source/dnode/vnode/tq/src/tq.c b/source/dnode/vnode/tq/src/tq.c index 1326666857..2c07529219 100644 --- a/source/dnode/vnode/tq/src/tq.c +++ b/source/dnode/vnode/tq/src/tq.c @@ -214,7 +214,11 @@ int tqCommit(STQ* pTq) { return 0; } -int tqConsume(STQ* pTq, TmqConsumeReq* pMsg) { +int tqSetCursor(STQ* pTq, void* msg) { + return 0; +} + +int tqConsume(STQ* pTq, STqConsumeReq* pMsg) { if (!tqProtoCheck((TmqMsgHead*)pMsg)) { // proto version invalid return -1; @@ -232,7 +236,7 @@ int tqConsume(STQ* pTq, TmqConsumeReq* pMsg) { } } - TmqConsumeRsp* pRsp = (TmqConsumeRsp*)pMsg; + STqConsumeRsp* pRsp = (STqConsumeRsp*)pMsg; if (tqFetch(gHandle, (void**)&pRsp->msgs) <= 0) { // fetch error diff --git a/source/libs/wal/inc/walInt.h b/source/libs/wal/inc/walInt.h index 819afcc411..1579cad7b6 100644 --- a/source/libs/wal/inc/walInt.h +++ b/source/libs/wal/inc/walInt.h @@ -16,72 +16,70 @@ #ifndef _TD_WAL_INT_H_ #define _TD_WAL_INT_H_ -#include "wal.h" #include "compare.h" #include "tchecksum.h" +#include "wal.h" #ifdef __cplusplus extern "C" { #endif -//meta section begin +// meta section begin typedef struct WalFileInfo { int64_t firstVer; int64_t lastVer; int64_t createTs; int64_t closeTs; int64_t fileSize; -} WalFileInfo; +} SWalFileInfo; typedef struct WalIdxEntry { int64_t ver; int64_t offset; -} WalIdxEntry; +} SWalIdxEntry; static inline int32_t compareWalFileInfo(const void* pLeft, const void* pRight) { - WalFileInfo* pInfoLeft = (WalFileInfo*)pLeft; - WalFileInfo* pInfoRight = (WalFileInfo*)pRight; + SWalFileInfo* pInfoLeft = (SWalFileInfo*)pLeft; + SWalFileInfo* pInfoRight = (SWalFileInfo*)pRight; return compareInt64Val(&pInfoLeft->firstVer, &pInfoRight->firstVer); } static inline int64_t walGetLastFileSize(SWal* pWal) { - WalFileInfo* pInfo = (WalFileInfo*)taosArrayGetLast(pWal->fileInfoSet); + SWalFileInfo* pInfo = (SWalFileInfo*)taosArrayGetLast(pWal->fileInfoSet); return pInfo->fileSize; } static inline int64_t walGetLastFileFirstVer(SWal* pWal) { - WalFileInfo* pInfo = (WalFileInfo*)taosArrayGetLast(pWal->fileInfoSet); + SWalFileInfo* pInfo = (SWalFileInfo*)taosArrayGetLast(pWal->fileInfoSet); return pInfo->firstVer; } static inline int64_t walGetCurFileFirstVer(SWal* pWal) { - WalFileInfo* pInfo = (WalFileInfo*)taosArrayGet(pWal->fileInfoSet, pWal->writeCur); + SWalFileInfo* pInfo = (SWalFileInfo*)taosArrayGet(pWal->fileInfoSet, pWal->writeCur); return pInfo->firstVer; } static inline int64_t walGetCurFileLastVer(SWal* pWal) { - WalFileInfo* pInfo = (WalFileInfo*)taosArrayGet(pWal->fileInfoSet, pWal->writeCur); + SWalFileInfo* pInfo = (SWalFileInfo*)taosArrayGet(pWal->fileInfoSet, pWal->writeCur); return pInfo->firstVer; } static inline int64_t walGetCurFileOffset(SWal* pWal) { - WalFileInfo* pInfo = (WalFileInfo*)taosArrayGet(pWal->fileInfoSet, pWal->writeCur); + SWalFileInfo* pInfo = (SWalFileInfo*)taosArrayGet(pWal->fileInfoSet, pWal->writeCur); return pInfo->fileSize; } -static inline bool walCurFileClosed(SWal* pWal) { - return taosArrayGetSize(pWal->fileInfoSet) != pWal->writeCur; +static inline bool walCurFileClosed(SWal* pWal) { return taosArrayGetSize(pWal->fileInfoSet) != pWal->writeCur; } + +static inline SWalFileInfo* walGetCurFileInfo(SWal* pWal) { + return (SWalFileInfo*)taosArrayGet(pWal->fileInfoSet, pWal->writeCur); } -static inline WalFileInfo* walGetCurFileInfo(SWal* pWal) { - return (WalFileInfo*)taosArrayGet(pWal->fileInfoSet, pWal->writeCur); -} - -static inline int walBuildLogName(SWal*pWal, int64_t fileFirstVer, char* buf) { +static inline int walBuildLogName(SWal* pWal, int64_t fileFirstVer, char* buf) { return sprintf(buf, "%s/%020" PRId64 "." WAL_LOG_SUFFIX, pWal->path, fileFirstVer); } -static inline int walBuildIdxName(SWal*pWal, int64_t fileFirstVer, char* buf) { +static inline int walBuildIdxName(SWal* pWal, int64_t fileFirstVer, char* buf) { return sprintf(buf, "%s/%020" PRId64 "." WAL_INDEX_SUFFIX, pWal->path, fileFirstVer); } @@ -93,11 +91,11 @@ static inline int walValidBodyCksum(SWalHead* pHead) { return taosCheckChecksum((uint8_t*)pHead->head.body, pHead->head.len, pHead->cksumBody); } -static inline int walValidCksum(SWalHead *pHead, void* body, int64_t bodyLen) { +static inline int walValidCksum(SWalHead* pHead, void* body, int64_t bodyLen) { return walValidHeadCksum(pHead) && walValidBodyCksum(pHead); } -static inline uint32_t walCalcHeadCksum(SWalHead *pHead) { +static inline uint32_t walCalcHeadCksum(SWalHead* pHead) { return taosCalcChecksum(0, (uint8_t*)&pHead->head, sizeof(SWalReadHead)); } @@ -106,7 +104,7 @@ static inline uint32_t walCalcBodyCksum(const void* body, uint32_t len) { } static inline int64_t walGetVerIdxOffset(SWal* pWal, int64_t ver) { - return (ver - walGetCurFileFirstVer(pWal)) * sizeof(WalIdxEntry); + return (ver - walGetCurFileFirstVer(pWal)) * sizeof(SWalIdxEntry); } static inline void walResetVer(SWalVer* pVer) { @@ -126,16 +124,16 @@ int walCheckAndRepairMeta(SWal* pWal); int walCheckAndRepairIdx(SWal* pWal); char* walMetaSerialize(SWal* pWal); -int walMetaDeserialize(SWal* pWal, const char* bytes); -//meta section end +int walMetaDeserialize(SWal* pWal, const char* bytes); +// meta section end -//seek section -int walChangeFile(SWal *pWal, int64_t ver); -//seek section end +// seek section +int walChangeFile(SWal* pWal, int64_t ver); +// seek section end int64_t walGetSeq(); -int walSeekVer(SWal *pWal, int64_t ver); -int walRoll(SWal *pWal); +int walSeekVer(SWal* pWal, int64_t ver); +int walRoll(SWal* pWal); #ifdef __cplusplus } diff --git a/source/libs/wal/src/walMeta.c b/source/libs/wal/src/walMeta.c index e86a2b6221..0f155f9553 100644 --- a/source/libs/wal/src/walMeta.c +++ b/source/libs/wal/src/walMeta.c @@ -79,13 +79,13 @@ int walRollFileInfo(SWal* pWal) { SArray* pArray = pWal->fileInfoSet; if (taosArrayGetSize(pArray) != 0) { - WalFileInfo* pInfo = taosArrayGetLast(pArray); + SWalFileInfo* pInfo = taosArrayGetLast(pArray); pInfo->lastVer = pWal->vers.lastVer; pInfo->closeTs = ts; } // TODO: change to emplace back - WalFileInfo* pNewInfo = malloc(sizeof(WalFileInfo)); + SWalFileInfo* pNewInfo = malloc(sizeof(SWalFileInfo)); if (pNewInfo == NULL) { return -1; } @@ -122,9 +122,9 @@ char* walMetaSerialize(SWal* pWal) { cJSON_AddStringToObject(pMeta, "lastVer", buf); cJSON_AddItemToObject(pRoot, "files", pFiles); - WalFileInfo* pData = pWal->fileInfoSet->pData; + SWalFileInfo* pData = pWal->fileInfoSet->pData; for (int i = 0; i < sz; i++) { - WalFileInfo* pInfo = &pData[i]; + SWalFileInfo* pInfo = &pData[i]; cJSON_AddItemToArray(pFiles, pField = cJSON_CreateObject()); if (pField == NULL) { cJSON_Delete(pRoot); @@ -167,10 +167,10 @@ int walMetaDeserialize(SWal* pWal, const char* bytes) { // deserialize SArray* pArray = pWal->fileInfoSet; taosArrayEnsureCap(pArray, sz); - WalFileInfo* pData = pArray->pData; + SWalFileInfo* pData = pArray->pData; for (int i = 0; i < sz; i++) { - cJSON* pInfoJson = cJSON_GetArrayItem(pFiles, i); - WalFileInfo* pInfo = &pData[i]; + cJSON* pInfoJson = cJSON_GetArrayItem(pFiles, i); + SWalFileInfo* pInfo = &pData[i]; pField = cJSON_GetObjectItem(pInfoJson, "firstVer"); pInfo->firstVer = atoll(cJSON_GetStringValue(pField)); pField = cJSON_GetObjectItem(pInfoJson, "lastVer"); diff --git a/source/libs/wal/src/walMgmt.c b/source/libs/wal/src/walMgmt.c index 5b6a8c6b29..189881c86d 100644 --- a/source/libs/wal/src/walMgmt.c +++ b/source/libs/wal/src/walMgmt.c @@ -92,7 +92,7 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) { pWal->writeLogTfd = -1; pWal->writeIdxTfd = -1; pWal->writeCur = -1; - pWal->fileInfoSet = taosArrayInit(8, sizeof(WalFileInfo)); + pWal->fileInfoSet = taosArrayInit(8, sizeof(SWalFileInfo)); if (pWal->fileInfoSet == NULL) { wError("vgId:%d, path:%s, failed to init taosArray %s", pWal->cfg.vgId, pWal->path, strerror(errno)); free(pWal); diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c index b6e232fa5c..48eb84b536 100644 --- a/source/libs/wal/src/walRead.c +++ b/source/libs/wal/src/walRead.c @@ -52,13 +52,13 @@ static int32_t walReadSeekFilePos(SWalReadHandle *pRead, int64_t fileFirstVer, i int64_t logTfd = pRead->readLogTfd; // seek position - int64_t offset = (ver - fileFirstVer) * sizeof(WalIdxEntry); + int64_t offset = (ver - fileFirstVer) * sizeof(SWalIdxEntry); code = tfLseek(idxTfd, offset, SEEK_SET); if (code < 0) { return -1; } - WalIdxEntry entry; - if (tfRead(idxTfd, &entry, sizeof(WalIdxEntry)) != sizeof(WalIdxEntry)) { + SWalIdxEntry entry; + if (tfRead(idxTfd, &entry, sizeof(SWalIdxEntry)) != sizeof(SWalIdxEntry)) { return -1; } // TODO:deserialize @@ -105,10 +105,10 @@ static int32_t walReadSeekVer(SWalReadHandle *pRead, int64_t ver) { if (ver < pWal->vers.snapshotVer) { } - WalFileInfo tmpInfo; + SWalFileInfo tmpInfo; tmpInfo.firstVer = ver; // bsearch in fileSet - WalFileInfo *pRet = taosArraySearch(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE); + SWalFileInfo *pRet = taosArraySearch(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE); ASSERT(pRet != NULL); if (pRead->curFileFirstVer != pRet->firstVer) { code = walReadChangeFile(pRead, pRet->firstVer); @@ -159,9 +159,9 @@ int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver) { return -1; } - /*code = walValidBodyCksum(pRead->pHead);*/ ASSERT(pRead->pHead->head.version == ver); + code = walValidBodyCksum(pRead->pHead); if (code != 0) { return -1; } diff --git a/source/libs/wal/src/walSeek.c b/source/libs/wal/src/walSeek.c index 82c596d225..1f9b7b6e58 100644 --- a/source/libs/wal/src/walSeek.c +++ b/source/libs/wal/src/walSeek.c @@ -32,9 +32,9 @@ static int walSeekFilePos(SWal* pWal, int64_t ver) { if (code != 0) { return -1; } - WalIdxEntry entry; + SWalIdxEntry entry; // TODO:deserialize - code = tfRead(idxTfd, &entry, sizeof(WalIdxEntry)); + code = tfRead(idxTfd, &entry, sizeof(SWalIdxEntry)); if (code != 0) { return -1; } @@ -47,8 +47,8 @@ static int walSeekFilePos(SWal* pWal, int64_t ver) { } int walChangeFileToLast(SWal* pWal) { - int64_t idxTfd, logTfd; - WalFileInfo* pRet = taosArrayGetLast(pWal->fileInfoSet); + int64_t idxTfd, logTfd; + SWalFileInfo* pRet = taosArrayGetLast(pWal->fileInfoSet); ASSERT(pRet != NULL); int64_t fileFirstVer = pRet->firstVer; @@ -83,10 +83,10 @@ int walChangeFile(SWal* pWal, int64_t ver) { // TODO return -1; } - WalFileInfo tmpInfo; + SWalFileInfo tmpInfo; tmpInfo.firstVer = ver; // bsearch in fileSet - WalFileInfo* pRet = taosArraySearch(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE); + SWalFileInfo* pRet = taosArraySearch(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE); ASSERT(pRet != NULL); int64_t fileFirstVer = pRet->firstVer; // closed diff --git a/source/libs/wal/src/walWrite.c b/source/libs/wal/src/walWrite.c index c3a7ca5f4d..c8ffd9d07d 100644 --- a/source/libs/wal/src/walWrite.c +++ b/source/libs/wal/src/walWrite.c @@ -56,9 +56,9 @@ int32_t walRollback(SWal *pWal, int64_t ver) { // delete files int fileSetSize = taosArrayGetSize(pWal->fileInfoSet); for (int i = pWal->writeCur; i < fileSetSize; i++) { - walBuildLogName(pWal, ((WalFileInfo *)taosArrayGet(pWal->fileInfoSet, i))->firstVer, fnameStr); + walBuildLogName(pWal, ((SWalFileInfo *)taosArrayGet(pWal->fileInfoSet, i))->firstVer, fnameStr); remove(fnameStr); - walBuildIdxName(pWal, ((WalFileInfo *)taosArrayGet(pWal->fileInfoSet, i))->firstVer, fnameStr); + walBuildIdxName(pWal, ((SWalFileInfo *)taosArrayGet(pWal->fileInfoSet, i))->firstVer, fnameStr); remove(fnameStr); } // pop from fileInfoSet @@ -81,8 +81,8 @@ int32_t walRollback(SWal *pWal, int64_t ver) { } // read idx file and get log file pos // TODO:change to deserialize function - WalIdxEntry entry; - if (tfRead(idxTfd, &entry, sizeof(WalIdxEntry)) != sizeof(WalIdxEntry)) { + SWalIdxEntry entry; + if (tfRead(idxTfd, &entry, sizeof(SWalIdxEntry)) != sizeof(SWalIdxEntry)) { pthread_mutex_unlock(&pWal->mutex); return -1; } @@ -128,8 +128,8 @@ int32_t walRollback(SWal *pWal, int64_t ver) { return -1; } pWal->vers.lastVer = ver - 1; - ((WalFileInfo *)taosArrayGetLast(pWal->fileInfoSet))->lastVer = ver - 1; - ((WalFileInfo *)taosArrayGetLast(pWal->fileInfoSet))->fileSize = entry.offset; + ((SWalFileInfo *)taosArrayGetLast(pWal->fileInfoSet))->lastVer = ver - 1; + ((SWalFileInfo *)taosArrayGetLast(pWal->fileInfoSet))->fileSize = entry.offset; // unlock pthread_mutex_unlock(&pWal->mutex); @@ -153,17 +153,17 @@ int32_t walEndSnapshot(SWal *pWal) { pWal->vers.snapshotVer = ver; int ts = taosGetTimestampSec(); - int deleteCnt = 0; - int64_t newTotSize = pWal->totSize; - WalFileInfo tmp; + int deleteCnt = 0; + int64_t newTotSize = pWal->totSize; + SWalFileInfo tmp; tmp.firstVer = ver; // find files safe to delete - WalFileInfo *pInfo = taosArraySearch(pWal->fileInfoSet, &tmp, compareWalFileInfo, TD_LE); + SWalFileInfo *pInfo = taosArraySearch(pWal->fileInfoSet, &tmp, compareWalFileInfo, TD_LE); if (ver >= pInfo->lastVer) { pInfo++; } // iterate files, until the searched result - for (WalFileInfo *iter = pWal->fileInfoSet->pData; iter < pInfo; iter++) { + for (SWalFileInfo *iter = pWal->fileInfoSet->pData; iter < pInfo; iter++) { if (pWal->totSize > pWal->cfg.retentionSize || iter->closeTs + pWal->cfg.retentionPeriod > ts) { // delete according to file size or close time deleteCnt++; @@ -173,7 +173,7 @@ int32_t walEndSnapshot(SWal *pWal) { char fnameStr[WAL_FILE_LEN]; // remove file for (int i = 0; i < deleteCnt; i++) { - WalFileInfo *pInfo = taosArrayGet(pWal->fileInfoSet, i); + SWalFileInfo *pInfo = taosArrayGet(pWal->fileInfoSet, i); walBuildLogName(pWal, pInfo->firstVer, fnameStr); remove(fnameStr); walBuildIdxName(pWal, pInfo->firstVer, fnameStr); @@ -186,7 +186,7 @@ int32_t walEndSnapshot(SWal *pWal) { pWal->writeCur = -1; pWal->vers.firstVer = -1; } else { - pWal->vers.firstVer = ((WalFileInfo *)taosArrayGet(pWal->fileInfoSet, 0))->firstVer; + pWal->vers.firstVer = ((SWalFileInfo *)taosArrayGet(pWal->fileInfoSet, 0))->firstVer; } pWal->writeCur = taosArrayGetSize(pWal->fileInfoSet) - 1; ; @@ -248,9 +248,9 @@ int walRoll(SWal *pWal) { } static int walWriteIndex(SWal *pWal, int64_t ver, int64_t offset) { - WalIdxEntry entry = {.ver = ver, .offset = offset}; - int size = tfWrite(pWal->writeIdxTfd, &entry, sizeof(WalIdxEntry)); - if (size != sizeof(WalIdxEntry)) { + SWalIdxEntry entry = {.ver = ver, .offset = offset}; + int size = tfWrite(pWal->writeIdxTfd, &entry, sizeof(SWalIdxEntry)); + if (size != sizeof(SWalIdxEntry)) { // TODO truncate return -1; }