Merge remote-tracking branch 'origin/3.0' into feature/dnode3

This commit is contained in:
Shengliang Guan 2021-12-17 13:50:33 +08:00
commit 7bf10f1a48
16 changed files with 168 additions and 195 deletions

View File

@ -50,8 +50,7 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_CONSUME, "mq-consume" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_QUERY, "mq-query" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_QUERY, "mq-query" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_CONNECT, "mq-connect" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_CONNECT, "mq-connect" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_DISCONNECT, "mq-disconnect" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_DISCONNECT, "mq-disconnect" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_ACK, "mq-ack" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_SET, "mq-set" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_RESET, "mq-reset" )
// message from client to mnode // message from client to mnode
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CONNECT, "connect" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CONNECT, "connect" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_ACCT, "create-acct" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_ACCT, "create-acct" )

View File

@ -62,10 +62,10 @@ typedef struct TmqDisconnectRsp {
int8_t status; int8_t status;
} TmqDisconnectRsp; } TmqDisconnectRsp;
typedef struct TmqConsumeReq { typedef struct STqConsumeReq {
TmqMsgHead head; TmqMsgHead head;
TmqAcks acks; TmqAcks acks;
} TmqConsumeReq; } STqConsumeReq;
typedef struct TmqMsgContent { typedef struct TmqMsgContent {
int64_t topicId; int64_t topicId;
@ -73,11 +73,11 @@ typedef struct TmqMsgContent {
char msg[]; char msg[];
} TmqMsgContent; } TmqMsgContent;
typedef struct TmqConsumeRsp { typedef struct STqConsumeRsp {
TmqMsgHead head; TmqMsgHead head;
int64_t bodySize; int64_t bodySize;
TmqMsgContent msgs[]; TmqMsgContent msgs[];
} TmqConsumeRsp; } STqConsumeRsp;
typedef struct TmqSubscribeReq { typedef struct TmqSubscribeReq {
TmqMsgHead head; TmqMsgHead head;
@ -261,13 +261,14 @@ typedef struct STQ {
// open in each vnode // open in each vnode
STQ* tqOpen(const char* path, STqCfg* tqConfig, STqLogReader* tqLogReader, SMemAllocatorFactory* allocFac); STQ* tqOpen(const char* path, STqCfg* tqConfig, STqLogReader* tqLogReader, SMemAllocatorFactory* allocFac);
void tqDestroy(STQ*); void tqClose(STQ*);
// void* will be replace by a msg type // void* will be replace by a msg type
int tqPushMsg(STQ*, void* msg, int64_t version); int tqPushMsg(STQ*, void* msg, int64_t version);
int tqCommit(STQ*); int tqCommit(STQ*);
int tqSetCursor(STQ*, void* msg);
int tqConsume(STQ*, TmqConsumeReq*); int tqConsume(STQ*, STqConsumeReq*);
STqGroupHandle* tqGetGroupHandle(STQ*, int64_t cId); STqGroupHandle* tqGetGroupHandle(STQ*, int64_t cId);

View File

@ -44,8 +44,7 @@ static void dndInitMsgFp(STransMgmt *pMgmt) {
pMgmt->msgFp[TSDB_MSG_TYPE_MQ_CONSUME] = dndProcessVnodeQueryMsg; pMgmt->msgFp[TSDB_MSG_TYPE_MQ_CONSUME] = dndProcessVnodeQueryMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_MQ_CONNECT] = dndProcessVnodeWriteMsg; pMgmt->msgFp[TSDB_MSG_TYPE_MQ_CONNECT] = dndProcessVnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_MQ_DISCONNECT] = dndProcessVnodeWriteMsg; pMgmt->msgFp[TSDB_MSG_TYPE_MQ_DISCONNECT] = dndProcessVnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_MQ_ACK] = dndProcessVnodeWriteMsg; pMgmt->msgFp[TSDB_MSG_TYPE_MQ_SET] = dndProcessVnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_MQ_RESET] = dndProcessVnodeWriteMsg;
// msg from client to mnode // msg from client to mnode
pMgmt->msgFp[TSDB_MSG_TYPE_CONNECT] = dndProcessMnodeReadMsg; pMgmt->msgFp[TSDB_MSG_TYPE_CONNECT] = dndProcessMnodeReadMsg;

View File

@ -21,9 +21,9 @@ extern "C" {
#endif #endif
typedef struct { typedef struct {
uint64_t processed; int64_t processed;
uint64_t committed; int64_t committed;
uint64_t applied; int64_t applied;
} SVState; } SVState;
#ifdef __cplusplus #ifdef __cplusplus

View File

@ -15,16 +15,31 @@
#include "vnodeDef.h" #include "vnodeDef.h"
int vnodeProcessNoWalWMsgs(SVnode *pVnode, SRpcMsg *pMsg) {
SVnodeReq *pVnodeReq;
switch (pMsg->msgType) {
case TSDB_MSG_TYPE_MQ_SET:
if (tqSetCursor(pVnode->pTq, pMsg->pCont) < 0) {
// TODO: handle error
}
break;
}
void *pBuf = pMsg->pCont;
return 0;
}
int vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs) { int vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs) {
SRpcMsg * pMsg; SRpcMsg *pMsg;
SVnodeReq *pVnodeReq; SVnodeReq *pVnodeReq;
for (int i = 0; i < taosArrayGetSize(pMsgs); i++) { for (int i = 0; i < taosArrayGetSize(pMsgs); i++) {
pMsg = *(SRpcMsg **)taosArrayGet(pMsgs, i); pMsg = *(SRpcMsg **)taosArrayGet(pMsgs, i);
// ser request version // ser request version
void * pBuf = pMsg->pCont; void *pBuf = pMsg->pCont;
uint64_t ver = pVnode->state.processed++; int64_t ver = pVnode->state.processed++;
taosEncodeFixedU64(&pBuf, ver); taosEncodeFixedU64(&pBuf, ver);
if (walWrite(pVnode->pWal, ver, pMsg->msgType, pMsg->pCont, pMsg->contLen) < 0) { if (walWrite(pVnode->pWal, ver, pMsg->msgType, pMsg->pCont, pMsg->contLen) < 0) {

View File

@ -22,16 +22,16 @@
extern "C" { extern "C" {
#endif #endif
//create persistent storage for meta info such as consuming offset // create persistent storage for meta info such as consuming offset
//return value > 0: cgId // return value > 0: cgId
//return value <= 0: error code // return value <= 0: error code
//int tqCreateTCGroup(STQ*, const char* topic, int cgId, tqBufferHandle** handle); // int tqCreateTCGroup(STQ*, const char* topic, int cgId, tqBufferHandle** handle);
//create ring buffer in memory and load consuming offset // create ring buffer in memory and load consuming offset
//int tqOpenTCGroup(STQ*, const char* topic, int cgId); // int tqOpenTCGroup(STQ*, const char* topic, int cgId);
//destroy ring buffer and persist consuming offset // destroy ring buffer and persist consuming offset
//int tqCloseTCGroup(STQ*, const char* topic, int cgId); // int tqCloseTCGroup(STQ*, const char* topic, int cgId);
//delete persistent storage for meta info // delete persistent storage for meta info
//int tqDropTCGroup(STQ*, const char* topic, int cgId); // int tqDropTCGroup(STQ*, const char* topic, int cgId);
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -23,27 +23,22 @@
extern "C" { extern "C" {
#endif #endif
STqMetaStore* tqStoreOpen(const char* path, FTqSerialize pSerializer, FTqDeserialize pDeserializer, FTqDelete pDeleter,
STqMetaStore* tqStoreOpen(const char* path, int32_t tqConfigFlag);
FTqSerialize pSerializer,
FTqDeserialize pDeserializer,
FTqDelete pDeleter,
int32_t tqConfigFlag
);
int32_t tqStoreClose(STqMetaStore*); int32_t tqStoreClose(STqMetaStore*);
//int32_t tqStoreDelete(TqMetaStore*); // int32_t tqStoreDelete(TqMetaStore*);
//int32_t tqStoreCommitAll(TqMetaStore*); // int32_t tqStoreCommitAll(TqMetaStore*);
int32_t tqStorePersist(STqMetaStore*); int32_t tqStorePersist(STqMetaStore*);
//clean deleted idx and data from persistent file // clean deleted idx and data from persistent file
int32_t tqStoreCompact(STqMetaStore*); int32_t tqStoreCompact(STqMetaStore*);
void* tqHandleGet(STqMetaStore*, int64_t key); void* tqHandleGet(STqMetaStore*, int64_t key);
//make it unpersist // make it unpersist
void* tqHandleTouchGet(STqMetaStore*, int64_t key); void* tqHandleTouchGet(STqMetaStore*, int64_t key);
int32_t tqHandleMovePut(STqMetaStore*, int64_t key, void* value); int32_t tqHandleMovePut(STqMetaStore*, int64_t key, void* value);
int32_t tqHandleCopyPut(STqMetaStore*, int64_t key, void* value, size_t vsize); int32_t tqHandleCopyPut(STqMetaStore*, int64_t key, void* value, size_t vsize);
//delete committed kv pair // delete committed kv pair
//notice that a delete action still needs to be committed // notice that a delete action still needs to be committed
int32_t tqHandleDel(STqMetaStore*, int64_t key); int32_t tqHandleDel(STqMetaStore*, int64_t key);
int32_t tqHandleCommit(STqMetaStore*, int64_t key); int32_t tqHandleCommit(STqMetaStore*, int64_t key);
int32_t tqHandleAbort(STqMetaStore*, int64_t key); int32_t tqHandleAbort(STqMetaStore*, int64_t key);

View File

@ -214,7 +214,11 @@ int tqCommit(STQ* pTq) {
return 0; return 0;
} }
int tqConsume(STQ* pTq, TmqConsumeReq* pMsg) { int tqSetCursor(STQ* pTq, void* msg) {
return 0;
}
int tqConsume(STQ* pTq, STqConsumeReq* pMsg) {
if (!tqProtoCheck((TmqMsgHead*)pMsg)) { if (!tqProtoCheck((TmqMsgHead*)pMsg)) {
// proto version invalid // proto version invalid
return -1; return -1;
@ -232,7 +236,7 @@ int tqConsume(STQ* pTq, TmqConsumeReq* pMsg) {
} }
} }
TmqConsumeRsp* pRsp = (TmqConsumeRsp*)pMsg; STqConsumeRsp* pRsp = (STqConsumeRsp*)pMsg;
if (tqFetch(gHandle, (void**)&pRsp->msgs) <= 0) { if (tqFetch(gHandle, (void**)&pRsp->msgs) <= 0) {
// fetch error // fetch error

View File

@ -70,9 +70,10 @@ extern int32_t ctgDebugFlag;
#define ctgDebugL(...) do { if (ctgDebugFlag & DEBUG_DEBUG) { taosPrintLongString("CTG ", ctgDebugFlag, __VA_ARGS__); }} while(0) #define ctgDebugL(...) do { if (ctgDebugFlag & DEBUG_DEBUG) { taosPrintLongString("CTG ", ctgDebugFlag, __VA_ARGS__); }} while(0)
#define CTG_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { return _code; } } while (0) #define CTG_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; return _code; } } while (0)
#define CTG_ERR_LRET(c,...) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { ctgError(__VA_ARGS__); return _code; } } while (0) #define CTG_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0)
#define CTG_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { goto _return; } } while (0) #define CTG_ERR_LRET(c,...) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { ctgError(__VA_ARGS__); terrno = _code; return _code; } } while (0)
#define CTG_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _return; } } while (0)
#ifdef __cplusplus #ifdef __cplusplus

View File

@ -49,16 +49,13 @@ int32_t ctgGetDBVgroupFromMnode(struct SCatalog* pCatalog, void *pRpc, const SEp
SEpSet *pVnodeEpSet = NULL; SEpSet *pVnodeEpSet = NULL;
int32_t msgLen = 0; int32_t msgLen = 0;
int32_t code = queryBuildMsg[TSDB_MSG_TYPE_USE_DB](input, &msg, 0, &msgLen); CTG_ERR_RET(queryBuildMsg[TSDB_MSG_TYPE_USE_DB](input, &msg, 0, &msgLen));
if (code) {
return code;
}
char *pMsg = rpcMallocCont(msgLen); char *pMsg = rpcMallocCont(msgLen);
if (NULL == pMsg) { if (NULL == pMsg) {
ctgError("rpc malloc %d failed", msgLen); ctgError("rpc malloc %d failed", msgLen);
tfree(msg); tfree(msg);
return TSDB_CODE_CTG_MEM_ERROR; CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
} }
memcpy(pMsg, msg, msgLen); memcpy(pMsg, msg, msgLen);
@ -76,13 +73,10 @@ int32_t ctgGetDBVgroupFromMnode(struct SCatalog* pCatalog, void *pRpc, const SEp
rpcSendRecv(pRpc, (SEpSet*)pMgmtEps, &rpcMsg, &rpcRsp); rpcSendRecv(pRpc, (SEpSet*)pMgmtEps, &rpcMsg, &rpcRsp);
if (TSDB_CODE_SUCCESS != rpcRsp.code) { if (TSDB_CODE_SUCCESS != rpcRsp.code) {
ctgError("error rsp for use db, code:%x", rpcRsp.code); ctgError("error rsp for use db, code:%x", rpcRsp.code);
return rpcRsp.code; CTG_ERR_RET(rpcRsp.code);
} }
code = queryProcessMsgRsp[TSDB_MSG_TYPE_USE_DB](out, rpcRsp.pCont, rpcRsp.contLen); CTG_ERR_RET(queryProcessMsgRsp[TSDB_MSG_TYPE_USE_DB](out, rpcRsp.pCont, rpcRsp.contLen));
if (code) {
return code;
}
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -114,14 +108,14 @@ int32_t ctgGetTableMetaFromCache(struct SCatalog* pCatalog, const char *dbName,
if ((*stbMeta)->suid != tbMeta->suid) { if ((*stbMeta)->suid != tbMeta->suid) {
ctgError("stable cache error, expected suid:%"PRId64 ",actual suid:%"PRId64, tbMeta->suid, (*stbMeta)->suid); ctgError("stable cache error, expected suid:%"PRId64 ",actual suid:%"PRId64, tbMeta->suid, (*stbMeta)->suid);
return TSDB_CODE_CTG_INTERNAL_ERROR; CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
} }
int32_t metaSize = sizeof(STableMeta) + ((*stbMeta)->tableInfo.numOfTags + (*stbMeta)->tableInfo.numOfColumns) * sizeof(SSchema); int32_t metaSize = sizeof(STableMeta) + ((*stbMeta)->tableInfo.numOfTags + (*stbMeta)->tableInfo.numOfColumns) * sizeof(SSchema);
*pTableMeta = calloc(1, metaSize); *pTableMeta = calloc(1, metaSize);
if (NULL == *pTableMeta) { if (NULL == *pTableMeta) {
ctgError("calloc size[%d] failed", metaSize); ctgError("calloc size[%d] failed", metaSize);
return TSDB_CODE_CTG_MEM_ERROR; CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
} }
memcpy(*pTableMeta, tbMeta, sizeof(SCTableMeta)); memcpy(*pTableMeta, tbMeta, sizeof(SCTableMeta));
@ -131,7 +125,7 @@ int32_t ctgGetTableMetaFromCache(struct SCatalog* pCatalog, const char *dbName,
*pTableMeta = calloc(1, metaSize); *pTableMeta = calloc(1, metaSize);
if (NULL == *pTableMeta) { if (NULL == *pTableMeta) {
ctgError("calloc size[%d] failed", metaSize); ctgError("calloc size[%d] failed", metaSize);
return TSDB_CODE_CTG_MEM_ERROR; CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
} }
memcpy(*pTableMeta, tbMeta, metaSize); memcpy(*pTableMeta, tbMeta, metaSize);
@ -155,7 +149,7 @@ void ctgGenEpSet(SEpSet *epSet, SVgroupInfo *vgroupInfo) {
int32_t ctgGetTableMetaFromMnode(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char *pDBName, const char* pTableName, SVgroupInfo *vgroupInfo, STableMetaOutput* output) { int32_t ctgGetTableMetaFromMnode(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char *pDBName, const char* pTableName, SVgroupInfo *vgroupInfo, STableMetaOutput* output) {
if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pDBName || NULL == pTableName || NULL == vgroupInfo || NULL == output) { if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pDBName || NULL == pTableName || NULL == vgroupInfo || NULL == output) {
return TSDB_CODE_CTG_INVALID_INPUT; CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
} }
char tbFullName[TSDB_TABLE_FNAME_LEN]; char tbFullName[TSDB_TABLE_FNAME_LEN];
@ -167,10 +161,7 @@ int32_t ctgGetTableMetaFromMnode(struct SCatalog* pCatalog, void *pRpc, const SE
SEpSet *pVnodeEpSet = NULL; SEpSet *pVnodeEpSet = NULL;
int32_t msgLen = 0; int32_t msgLen = 0;
int32_t code = queryBuildMsg[TSDB_MSG_TYPE_TABLE_META](&bInput, &msg, 0, &msgLen); CTG_ERR_RET(queryBuildMsg[TSDB_MSG_TYPE_TABLE_META](&bInput, &msg, 0, &msgLen));
if (code) {
return code;
}
SRpcMsg rpcMsg = { SRpcMsg rpcMsg = {
.msgType = TSDB_MSG_TYPE_TABLE_META, .msgType = TSDB_MSG_TYPE_TABLE_META,
@ -187,13 +178,10 @@ int32_t ctgGetTableMetaFromMnode(struct SCatalog* pCatalog, void *pRpc, const SE
if (TSDB_CODE_SUCCESS != rpcRsp.code) { if (TSDB_CODE_SUCCESS != rpcRsp.code) {
ctgError("error rsp for table meta, code:%x", rpcRsp.code); ctgError("error rsp for table meta, code:%x", rpcRsp.code);
return rpcRsp.code; CTG_ERR_RET(rpcRsp.code);
} }
code = queryProcessMsgRsp[TSDB_MSG_TYPE_TABLE_META](output, rpcRsp.pCont, rpcRsp.contLen); CTG_ERR_RET(queryProcessMsgRsp[TSDB_MSG_TYPE_TABLE_META](output, rpcRsp.pCont, rpcRsp.contLen));
if (code) {
return code;
}
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -219,7 +207,7 @@ int32_t ctgGetVgInfoFromDB(struct SCatalog *pCatalog, void *pRpc, const SEpSet *
if (NULL == taosArrayPush(vgroupList, vgInfo)) { if (NULL == taosArrayPush(vgroupList, vgInfo)) {
ctgError("taosArrayPush failed"); ctgError("taosArrayPush failed");
break; CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
} }
pIter = taosHashIterate(dbInfo->vgInfo, pIter); pIter = taosHashIterate(dbInfo->vgInfo, pIter);
@ -233,7 +221,7 @@ int32_t ctgGetVgInfoFromHashValue(SDBVgroupInfo *dbInfo, const char *pDBName, co
int32_t vgNum = taosHashGetSize(dbInfo->vgInfo); int32_t vgNum = taosHashGetSize(dbInfo->vgInfo);
if (vgNum <= 0) { if (vgNum <= 0) {
ctgError("db[%s] vgroup cache invalid, vgroup number:%d", pDBName, vgNum); ctgError("db[%s] vgroup cache invalid, vgroup number:%d", pDBName, vgNum);
return TSDB_CODE_TSC_DB_NOT_SELECTED; CTG_ERR_RET(TSDB_CODE_TSC_DB_NOT_SELECTED);
} }
tableNameHashFp fp = NULL; tableNameHashFp fp = NULL;
@ -260,7 +248,7 @@ int32_t ctgGetVgInfoFromHashValue(SDBVgroupInfo *dbInfo, const char *pDBName, co
if (NULL == vgInfo) { if (NULL == vgInfo) {
ctgError("no hash range found for hashvalue[%u]", hashValue); ctgError("no hash range found for hashvalue[%u]", hashValue);
return TSDB_CODE_CTG_INTERNAL_ERROR; CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
} }
*pVgroup = *vgInfo; *pVgroup = *vgInfo;
@ -268,35 +256,9 @@ int32_t ctgGetVgInfoFromHashValue(SDBVgroupInfo *dbInfo, const char *pDBName, co
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
STableMeta* ctgCreateSTableMeta(STableMetaMsg* pChild) {
assert(pChild != NULL);
int32_t total = pChild->numOfColumns + pChild->numOfTags;
STableMeta* pTableMeta = calloc(1, sizeof(STableMeta) + sizeof(SSchema) * total);
pTableMeta->tableType = TSDB_SUPER_TABLE;
pTableMeta->tableInfo.numOfTags = pChild->numOfTags;
pTableMeta->tableInfo.numOfColumns = pChild->numOfColumns;
pTableMeta->tableInfo.precision = pChild->precision;
pTableMeta->uid = pChild->suid;
pTableMeta->tversion = pChild->tversion;
pTableMeta->sversion = pChild->sversion;
memcpy(pTableMeta->schema, pChild->pSchema, sizeof(SSchema) * total);
int32_t num = pTableMeta->tableInfo.numOfColumns;
for(int32_t i = 0; i < num; ++i) {
pTableMeta->tableInfo.rowSize += pTableMeta->schema[i].bytes;
}
return pTableMeta;
}
int32_t ctgGetTableMetaImpl(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, bool forceUpdate, STableMeta** pTableMeta) { int32_t ctgGetTableMetaImpl(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, bool forceUpdate, STableMeta** pTableMeta) {
if (NULL == pCatalog || NULL == pDBName || NULL == pRpc || NULL == pMgmtEps || NULL == pTableName || NULL == pTableMeta) { if (NULL == pCatalog || NULL == pDBName || NULL == pRpc || NULL == pMgmtEps || NULL == pTableName || NULL == pTableMeta) {
return TSDB_CODE_CTG_INVALID_INPUT; CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
} }
int32_t exist = 0; int32_t exist = 0;
@ -315,7 +277,7 @@ int32_t ctgGetTableMetaImpl(struct SCatalog* pCatalog, void *pRpc, const SEpSet*
if (0 == exist) { if (0 == exist) {
ctgError("get table meta from cache failed, but fetch succeed"); ctgError("get table meta from cache failed, but fetch succeed");
return TSDB_CODE_CTG_INTERNAL_ERROR; CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -325,19 +287,19 @@ int32_t ctgGetTableMetaImpl(struct SCatalog* pCatalog, void *pRpc, const SEpSet*
int32_t ctgUpdateTableMetaCache(struct SCatalog *pCatalog, STableMetaOutput *output) { int32_t ctgUpdateTableMetaCache(struct SCatalog *pCatalog, STableMetaOutput *output) {
if (output->metaNum != 1 && output->metaNum != 2) { if (output->metaNum != 1 && output->metaNum != 2) {
ctgError("invalid table meta number[%d] got from meta rsp", output->metaNum); ctgError("invalid table meta number[%d] got from meta rsp", output->metaNum);
return TSDB_CODE_CTG_INTERNAL_ERROR; CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
} }
if (NULL == output->tbMeta) { if (NULL == output->tbMeta) {
ctgError("no valid table meta got from meta rsp"); ctgError("no valid table meta got from meta rsp");
return TSDB_CODE_CTG_INTERNAL_ERROR; CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
} }
if (NULL == pCatalog->tableCache.cache) { if (NULL == pCatalog->tableCache.cache) {
pCatalog->tableCache.cache = taosHashInit(ctgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); pCatalog->tableCache.cache = taosHashInit(ctgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
if (NULL == pCatalog->tableCache.cache) { if (NULL == pCatalog->tableCache.cache) {
ctgError("init hash[%d] for tablemeta cache failed", ctgMgmt.cfg.maxTblCacheNum); ctgError("init hash[%d] for tablemeta cache failed", ctgMgmt.cfg.maxTblCacheNum);
return TSDB_CODE_CTG_MEM_ERROR; CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
} }
} }
@ -345,13 +307,13 @@ int32_t ctgUpdateTableMetaCache(struct SCatalog *pCatalog, STableMetaOutput *out
pCatalog->tableCache.cache = taosHashInit(ctgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); pCatalog->tableCache.cache = taosHashInit(ctgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
if (NULL == pCatalog->tableCache.cache) { if (NULL == pCatalog->tableCache.cache) {
ctgError("init hash[%d] for tablemeta cache failed", ctgMgmt.cfg.maxTblCacheNum); ctgError("init hash[%d] for tablemeta cache failed", ctgMgmt.cfg.maxTblCacheNum);
return TSDB_CODE_CTG_MEM_ERROR; CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
} }
pCatalog->tableCache.stableCache = taosHashInit(ctgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), true, HASH_ENTRY_LOCK); pCatalog->tableCache.stableCache = taosHashInit(ctgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), true, HASH_ENTRY_LOCK);
if (NULL == pCatalog->tableCache.stableCache) { if (NULL == pCatalog->tableCache.stableCache) {
ctgError("init hash[%d] for stablemeta cache failed", ctgMgmt.cfg.maxTblCacheNum); ctgError("init hash[%d] for stablemeta cache failed", ctgMgmt.cfg.maxTblCacheNum);
return TSDB_CODE_CTG_MEM_ERROR; CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
} }
} }
@ -389,7 +351,7 @@ error_exit:
pCatalog->vgroupCache.vgroupVersion = CTG_DEFAULT_INVALID_VERSION; pCatalog->vgroupCache.vgroupVersion = CTG_DEFAULT_INVALID_VERSION;
return TSDB_CODE_CTG_INTERNAL_ERROR; CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
} }
int32_t catalogInit(SCatalogCfg *cfg) { int32_t catalogInit(SCatalogCfg *cfg) {
@ -411,12 +373,12 @@ int32_t catalogInit(SCatalogCfg *cfg) {
int32_t catalogGetHandle(const char *clusterId, struct SCatalog** catalogHandle) { int32_t catalogGetHandle(const char *clusterId, struct SCatalog** catalogHandle) {
if (NULL == clusterId || NULL == catalogHandle) { if (NULL == clusterId || NULL == catalogHandle) {
return TSDB_CODE_CTG_INVALID_INPUT; CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
} }
if (NULL == ctgMgmt.pCluster) { if (NULL == ctgMgmt.pCluster) {
ctgError("cluster cache are not ready"); ctgError("cluster cache are not ready");
return TSDB_CODE_CTG_NOT_READY; CTG_ERR_RET(TSDB_CODE_CTG_NOT_READY);
} }
size_t clen = strlen(clusterId); size_t clen = strlen(clusterId);
@ -430,7 +392,7 @@ int32_t catalogGetHandle(const char *clusterId, struct SCatalog** catalogHandle)
clusterCtg = calloc(1, sizeof(*clusterCtg)); clusterCtg = calloc(1, sizeof(*clusterCtg));
if (NULL == clusterCtg) { if (NULL == clusterCtg) {
ctgError("calloc %d failed", (int32_t)sizeof(*clusterCtg)); ctgError("calloc %d failed", (int32_t)sizeof(*clusterCtg));
return TSDB_CODE_CTG_MEM_ERROR; CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
} }
clusterCtg->vgroupCache.vgroupVersion = CTG_DEFAULT_INVALID_VERSION; clusterCtg->vgroupCache.vgroupVersion = CTG_DEFAULT_INVALID_VERSION;
@ -438,7 +400,7 @@ int32_t catalogGetHandle(const char *clusterId, struct SCatalog** catalogHandle)
if (taosHashPut(ctgMgmt.pCluster, clusterId, clen, &clusterCtg, POINTER_BYTES)) { if (taosHashPut(ctgMgmt.pCluster, clusterId, clen, &clusterCtg, POINTER_BYTES)) {
ctgError("put cluster %s cache to hash failed", clusterId); ctgError("put cluster %s cache to hash failed", clusterId);
tfree(clusterCtg); tfree(clusterCtg);
return TSDB_CODE_CTG_INTERNAL_ERROR; CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
} }
*catalogHandle = clusterCtg; *catalogHandle = clusterCtg;
@ -448,7 +410,7 @@ int32_t catalogGetHandle(const char *clusterId, struct SCatalog** catalogHandle)
int32_t catalogGetDBVgroupVersion(struct SCatalog* pCatalog, const char* dbName, int32_t* version) { int32_t catalogGetDBVgroupVersion(struct SCatalog* pCatalog, const char* dbName, int32_t* version) {
if (NULL == pCatalog || NULL == dbName || NULL == version) { if (NULL == pCatalog || NULL == dbName || NULL == version) {
return TSDB_CODE_CTG_INVALID_INPUT; CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
} }
if (NULL == pCatalog->dbCache.cache) { if (NULL == pCatalog->dbCache.cache) {
@ -469,7 +431,7 @@ int32_t catalogGetDBVgroupVersion(struct SCatalog* pCatalog, const char* dbName,
int32_t catalogUpdateDBVgroupCache(struct SCatalog* pCatalog, const char* dbName, SDBVgroupInfo* dbInfo) { int32_t catalogUpdateDBVgroupCache(struct SCatalog* pCatalog, const char* dbName, SDBVgroupInfo* dbInfo) {
if (NULL == pCatalog || NULL == dbName || NULL == dbInfo) { if (NULL == pCatalog || NULL == dbName || NULL == dbInfo) {
return TSDB_CODE_CTG_INVALID_INPUT; CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
} }
if (dbInfo->vgVersion < 0) { if (dbInfo->vgVersion < 0) {
@ -485,7 +447,7 @@ int32_t catalogUpdateDBVgroupCache(struct SCatalog* pCatalog, const char* dbName
pCatalog->dbCache.cache = taosHashInit(CTG_DEFAULT_CACHE_DB_NUMBER, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); pCatalog->dbCache.cache = taosHashInit(CTG_DEFAULT_CACHE_DB_NUMBER, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
if (NULL == pCatalog->dbCache.cache) { if (NULL == pCatalog->dbCache.cache) {
ctgError("init hash[%d] for db cache failed", CTG_DEFAULT_CACHE_DB_NUMBER); ctgError("init hash[%d] for db cache failed", CTG_DEFAULT_CACHE_DB_NUMBER);
return TSDB_CODE_CTG_MEM_ERROR; CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
} }
} else { } else {
SDBVgroupInfo *oldInfo = taosHashGet(pCatalog->dbCache.cache, dbName, strlen(dbName)); SDBVgroupInfo *oldInfo = taosHashGet(pCatalog->dbCache.cache, dbName, strlen(dbName));
@ -497,7 +459,7 @@ int32_t catalogUpdateDBVgroupCache(struct SCatalog* pCatalog, const char* dbName
if (taosHashPut(pCatalog->dbCache.cache, dbName, strlen(dbName), dbInfo, sizeof(*dbInfo)) != 0) { if (taosHashPut(pCatalog->dbCache.cache, dbName, strlen(dbName), dbInfo, sizeof(*dbInfo)) != 0) {
ctgError("push to vgroup hash cache failed"); ctgError("push to vgroup hash cache failed");
return TSDB_CODE_CTG_MEM_ERROR; CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -508,11 +470,10 @@ int32_t catalogUpdateDBVgroupCache(struct SCatalog* pCatalog, const char* dbName
int32_t catalogGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* dbName, int32_t forceUpdate, SDBVgroupInfo* dbInfo) { int32_t catalogGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* dbName, int32_t forceUpdate, SDBVgroupInfo* dbInfo) {
if (NULL == pCatalog || NULL == dbName || NULL == pRpc || NULL == pMgmtEps) { if (NULL == pCatalog || NULL == dbName || NULL == pRpc || NULL == pMgmtEps) {
return TSDB_CODE_CTG_INVALID_INPUT; CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
} }
int32_t exist = 0; int32_t exist = 0;
int32_t code = 0;
if (0 == forceUpdate) { if (0 == forceUpdate) {
CTG_ERR_RET(ctgGetDBVgroupFromCache(pCatalog, dbName, dbInfo, &exist)); CTG_ERR_RET(ctgGetDBVgroupFromCache(pCatalog, dbName, dbInfo, &exist));
@ -537,7 +498,7 @@ int32_t catalogGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet*
*dbInfo = DbOut.dbVgroup; *dbInfo = DbOut.dbVgroup;
} }
return code; return TSDB_CODE_SUCCESS;
} }
int32_t catalogGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, STableMeta** pTableMeta) { int32_t catalogGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, STableMeta** pTableMeta) {
@ -546,7 +507,7 @@ int32_t catalogGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet*
int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName) { int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName) {
if (NULL == pCatalog || NULL == pDBName || NULL == pRpc || NULL == pMgmtEps || NULL == pTableName) { if (NULL == pCatalog || NULL == pDBName || NULL == pRpc || NULL == pMgmtEps || NULL == pTableName) {
return TSDB_CODE_CTG_INVALID_INPUT; CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
} }
SVgroupInfo vgroupInfo = {0}; SVgroupInfo vgroupInfo = {0};
@ -570,7 +531,7 @@ int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const
int32_t catalogGetTableDistVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, SArray* pVgroupList) { int32_t catalogGetTableDistVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, SArray* pVgroupList) {
if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pDBName || NULL == pTableName || NULL == pVgroupList) { if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pDBName || NULL == pTableName || NULL == pVgroupList) {
return TSDB_CODE_CTG_INVALID_INPUT; CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
} }
STableMeta *tbMeta = NULL; STableMeta *tbMeta = NULL;
@ -588,7 +549,7 @@ int32_t catalogGetTableDistVgroup(struct SCatalog* pCatalog, void *pRpc, const S
int32_t vgId = tbMeta->vgId; int32_t vgId = tbMeta->vgId;
if (NULL == taosHashGetClone(dbVgroup.vgInfo, &vgId, sizeof(vgId), &vgroupInfo)) { if (NULL == taosHashGetClone(dbVgroup.vgInfo, &vgId, sizeof(vgId), &vgroupInfo)) {
ctgError("vgId[%d] not found in vgroup list", vgId); ctgError("vgId[%d] not found in vgroup list", vgId);
return TSDB_CODE_CTG_INTERNAL_ERROR; CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
} }
if (NULL == taosArrayPush(pVgroupList, &vgroupInfo)) { if (NULL == taosArrayPush(pVgroupList, &vgroupInfo)) {
@ -600,7 +561,7 @@ int32_t catalogGetTableDistVgroup(struct SCatalog* pCatalog, void *pRpc, const S
_return: _return:
tfree(tbMeta); tfree(tbMeta);
return code; CTG_RET(code);
} }
@ -613,18 +574,18 @@ int32_t catalogGetTableHashVgroup(struct SCatalog *pCatalog, void *pRpc, const S
if (dbInfo.vgVersion < 0 || NULL == dbInfo.vgInfo) { if (dbInfo.vgVersion < 0 || NULL == dbInfo.vgInfo) {
ctgError("db[%s] vgroup cache invalid, vgroup version:%d, vgInfo:%p", pDBName, dbInfo.vgVersion, dbInfo.vgInfo); ctgError("db[%s] vgroup cache invalid, vgroup version:%d, vgInfo:%p", pDBName, dbInfo.vgVersion, dbInfo.vgInfo);
return TSDB_CODE_TSC_DB_NOT_SELECTED; CTG_ERR_RET(TSDB_CODE_TSC_DB_NOT_SELECTED);
} }
CTG_ERR_RET(ctgGetVgInfoFromHashValue(&dbInfo, pDBName, pTableName, pVgroup)); CTG_ERR_RET(ctgGetVgInfoFromHashValue(&dbInfo, pDBName, pTableName, pVgroup));
return code; CTG_RET(code);
} }
int32_t catalogGetAllMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SCatalogReq* pReq, SMetaData* pRsp) { int32_t catalogGetAllMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SCatalogReq* pReq, SMetaData* pRsp) {
if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pReq || NULL == pRsp) { if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pReq || NULL == pRsp) {
return TSDB_CODE_CTG_INVALID_INPUT; CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
} }
int32_t code = 0; int32_t code = 0;
@ -636,7 +597,7 @@ int32_t catalogGetAllMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* p
pRsp->pTableMeta = taosArrayInit(tbNum, POINTER_BYTES); pRsp->pTableMeta = taosArrayInit(tbNum, POINTER_BYTES);
if (NULL == pRsp->pTableMeta) { if (NULL == pRsp->pTableMeta) {
ctgError("taosArrayInit num[%d] failed", tbNum); ctgError("taosArrayInit num[%d] failed", tbNum);
return TSDB_CODE_CTG_MEM_ERROR; CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
} }
} }
@ -670,7 +631,7 @@ _return:
taosArrayDestroy(pRsp->pTableMeta); taosArrayDestroy(pRsp->pTableMeta);
} }
return code; CTG_RET(code);
} }
void catalogDestroy(void) { void catalogDestroy(void) {

View File

@ -16,72 +16,70 @@
#ifndef _TD_WAL_INT_H_ #ifndef _TD_WAL_INT_H_
#define _TD_WAL_INT_H_ #define _TD_WAL_INT_H_
#include "wal.h"
#include "compare.h" #include "compare.h"
#include "tchecksum.h" #include "tchecksum.h"
#include "wal.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
//meta section begin // meta section begin
typedef struct WalFileInfo { typedef struct WalFileInfo {
int64_t firstVer; int64_t firstVer;
int64_t lastVer; int64_t lastVer;
int64_t createTs; int64_t createTs;
int64_t closeTs; int64_t closeTs;
int64_t fileSize; int64_t fileSize;
} WalFileInfo; } SWalFileInfo;
typedef struct WalIdxEntry { typedef struct WalIdxEntry {
int64_t ver; int64_t ver;
int64_t offset; int64_t offset;
} WalIdxEntry; } SWalIdxEntry;
static inline int32_t compareWalFileInfo(const void* pLeft, const void* pRight) { static inline int32_t compareWalFileInfo(const void* pLeft, const void* pRight) {
WalFileInfo* pInfoLeft = (WalFileInfo*)pLeft; SWalFileInfo* pInfoLeft = (SWalFileInfo*)pLeft;
WalFileInfo* pInfoRight = (WalFileInfo*)pRight; SWalFileInfo* pInfoRight = (SWalFileInfo*)pRight;
return compareInt64Val(&pInfoLeft->firstVer, &pInfoRight->firstVer); return compareInt64Val(&pInfoLeft->firstVer, &pInfoRight->firstVer);
} }
static inline int64_t walGetLastFileSize(SWal* pWal) { static inline int64_t walGetLastFileSize(SWal* pWal) {
WalFileInfo* pInfo = (WalFileInfo*)taosArrayGetLast(pWal->fileInfoSet); SWalFileInfo* pInfo = (SWalFileInfo*)taosArrayGetLast(pWal->fileInfoSet);
return pInfo->fileSize; return pInfo->fileSize;
} }
static inline int64_t walGetLastFileFirstVer(SWal* pWal) { static inline int64_t walGetLastFileFirstVer(SWal* pWal) {
WalFileInfo* pInfo = (WalFileInfo*)taosArrayGetLast(pWal->fileInfoSet); SWalFileInfo* pInfo = (SWalFileInfo*)taosArrayGetLast(pWal->fileInfoSet);
return pInfo->firstVer; return pInfo->firstVer;
} }
static inline int64_t walGetCurFileFirstVer(SWal* pWal) { static inline int64_t walGetCurFileFirstVer(SWal* pWal) {
WalFileInfo* pInfo = (WalFileInfo*)taosArrayGet(pWal->fileInfoSet, pWal->writeCur); SWalFileInfo* pInfo = (SWalFileInfo*)taosArrayGet(pWal->fileInfoSet, pWal->writeCur);
return pInfo->firstVer; return pInfo->firstVer;
} }
static inline int64_t walGetCurFileLastVer(SWal* pWal) { static inline int64_t walGetCurFileLastVer(SWal* pWal) {
WalFileInfo* pInfo = (WalFileInfo*)taosArrayGet(pWal->fileInfoSet, pWal->writeCur); SWalFileInfo* pInfo = (SWalFileInfo*)taosArrayGet(pWal->fileInfoSet, pWal->writeCur);
return pInfo->firstVer; return pInfo->firstVer;
} }
static inline int64_t walGetCurFileOffset(SWal* pWal) { static inline int64_t walGetCurFileOffset(SWal* pWal) {
WalFileInfo* pInfo = (WalFileInfo*)taosArrayGet(pWal->fileInfoSet, pWal->writeCur); SWalFileInfo* pInfo = (SWalFileInfo*)taosArrayGet(pWal->fileInfoSet, pWal->writeCur);
return pInfo->fileSize; return pInfo->fileSize;
} }
static inline bool walCurFileClosed(SWal* pWal) { static inline bool walCurFileClosed(SWal* pWal) { return taosArrayGetSize(pWal->fileInfoSet) != pWal->writeCur; }
return taosArrayGetSize(pWal->fileInfoSet) != pWal->writeCur;
static inline SWalFileInfo* walGetCurFileInfo(SWal* pWal) {
return (SWalFileInfo*)taosArrayGet(pWal->fileInfoSet, pWal->writeCur);
} }
static inline WalFileInfo* walGetCurFileInfo(SWal* pWal) { static inline int walBuildLogName(SWal* pWal, int64_t fileFirstVer, char* buf) {
return (WalFileInfo*)taosArrayGet(pWal->fileInfoSet, pWal->writeCur);
}
static inline int walBuildLogName(SWal*pWal, int64_t fileFirstVer, char* buf) {
return sprintf(buf, "%s/%020" PRId64 "." WAL_LOG_SUFFIX, pWal->path, fileFirstVer); return sprintf(buf, "%s/%020" PRId64 "." WAL_LOG_SUFFIX, pWal->path, fileFirstVer);
} }
static inline int walBuildIdxName(SWal*pWal, int64_t fileFirstVer, char* buf) { static inline int walBuildIdxName(SWal* pWal, int64_t fileFirstVer, char* buf) {
return sprintf(buf, "%s/%020" PRId64 "." WAL_INDEX_SUFFIX, pWal->path, fileFirstVer); return sprintf(buf, "%s/%020" PRId64 "." WAL_INDEX_SUFFIX, pWal->path, fileFirstVer);
} }
@ -93,11 +91,11 @@ static inline int walValidBodyCksum(SWalHead* pHead) {
return taosCheckChecksum((uint8_t*)pHead->head.body, pHead->head.len, pHead->cksumBody); return taosCheckChecksum((uint8_t*)pHead->head.body, pHead->head.len, pHead->cksumBody);
} }
static inline int walValidCksum(SWalHead *pHead, void* body, int64_t bodyLen) { static inline int walValidCksum(SWalHead* pHead, void* body, int64_t bodyLen) {
return walValidHeadCksum(pHead) && walValidBodyCksum(pHead); return walValidHeadCksum(pHead) && walValidBodyCksum(pHead);
} }
static inline uint32_t walCalcHeadCksum(SWalHead *pHead) { static inline uint32_t walCalcHeadCksum(SWalHead* pHead) {
return taosCalcChecksum(0, (uint8_t*)&pHead->head, sizeof(SWalReadHead)); return taosCalcChecksum(0, (uint8_t*)&pHead->head, sizeof(SWalReadHead));
} }
@ -106,7 +104,7 @@ static inline uint32_t walCalcBodyCksum(const void* body, uint32_t len) {
} }
static inline int64_t walGetVerIdxOffset(SWal* pWal, int64_t ver) { static inline int64_t walGetVerIdxOffset(SWal* pWal, int64_t ver) {
return (ver - walGetCurFileFirstVer(pWal)) * sizeof(WalIdxEntry); return (ver - walGetCurFileFirstVer(pWal)) * sizeof(SWalIdxEntry);
} }
static inline void walResetVer(SWalVer* pVer) { static inline void walResetVer(SWalVer* pVer) {
@ -127,15 +125,15 @@ int walCheckAndRepairIdx(SWal* pWal);
char* walMetaSerialize(SWal* pWal); char* walMetaSerialize(SWal* pWal);
int walMetaDeserialize(SWal* pWal, const char* bytes); int walMetaDeserialize(SWal* pWal, const char* bytes);
//meta section end // meta section end
//seek section // seek section
int walChangeFile(SWal *pWal, int64_t ver); int walChangeFile(SWal* pWal, int64_t ver);
//seek section end // seek section end
int64_t walGetSeq(); int64_t walGetSeq();
int walSeekVer(SWal *pWal, int64_t ver); int walSeekVer(SWal* pWal, int64_t ver);
int walRoll(SWal *pWal); int walRoll(SWal* pWal);
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -79,13 +79,13 @@ int walRollFileInfo(SWal* pWal) {
SArray* pArray = pWal->fileInfoSet; SArray* pArray = pWal->fileInfoSet;
if (taosArrayGetSize(pArray) != 0) { if (taosArrayGetSize(pArray) != 0) {
WalFileInfo* pInfo = taosArrayGetLast(pArray); SWalFileInfo* pInfo = taosArrayGetLast(pArray);
pInfo->lastVer = pWal->vers.lastVer; pInfo->lastVer = pWal->vers.lastVer;
pInfo->closeTs = ts; pInfo->closeTs = ts;
} }
// TODO: change to emplace back // TODO: change to emplace back
WalFileInfo* pNewInfo = malloc(sizeof(WalFileInfo)); SWalFileInfo* pNewInfo = malloc(sizeof(SWalFileInfo));
if (pNewInfo == NULL) { if (pNewInfo == NULL) {
return -1; return -1;
} }
@ -122,9 +122,9 @@ char* walMetaSerialize(SWal* pWal) {
cJSON_AddStringToObject(pMeta, "lastVer", buf); cJSON_AddStringToObject(pMeta, "lastVer", buf);
cJSON_AddItemToObject(pRoot, "files", pFiles); cJSON_AddItemToObject(pRoot, "files", pFiles);
WalFileInfo* pData = pWal->fileInfoSet->pData; SWalFileInfo* pData = pWal->fileInfoSet->pData;
for (int i = 0; i < sz; i++) { for (int i = 0; i < sz; i++) {
WalFileInfo* pInfo = &pData[i]; SWalFileInfo* pInfo = &pData[i];
cJSON_AddItemToArray(pFiles, pField = cJSON_CreateObject()); cJSON_AddItemToArray(pFiles, pField = cJSON_CreateObject());
if (pField == NULL) { if (pField == NULL) {
cJSON_Delete(pRoot); cJSON_Delete(pRoot);
@ -167,10 +167,10 @@ int walMetaDeserialize(SWal* pWal, const char* bytes) {
// deserialize // deserialize
SArray* pArray = pWal->fileInfoSet; SArray* pArray = pWal->fileInfoSet;
taosArrayEnsureCap(pArray, sz); taosArrayEnsureCap(pArray, sz);
WalFileInfo* pData = pArray->pData; SWalFileInfo* pData = pArray->pData;
for (int i = 0; i < sz; i++) { for (int i = 0; i < sz; i++) {
cJSON* pInfoJson = cJSON_GetArrayItem(pFiles, i); cJSON* pInfoJson = cJSON_GetArrayItem(pFiles, i);
WalFileInfo* pInfo = &pData[i]; SWalFileInfo* pInfo = &pData[i];
pField = cJSON_GetObjectItem(pInfoJson, "firstVer"); pField = cJSON_GetObjectItem(pInfoJson, "firstVer");
pInfo->firstVer = atoll(cJSON_GetStringValue(pField)); pInfo->firstVer = atoll(cJSON_GetStringValue(pField));
pField = cJSON_GetObjectItem(pInfoJson, "lastVer"); pField = cJSON_GetObjectItem(pInfoJson, "lastVer");

View File

@ -92,7 +92,7 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) {
pWal->writeLogTfd = -1; pWal->writeLogTfd = -1;
pWal->writeIdxTfd = -1; pWal->writeIdxTfd = -1;
pWal->writeCur = -1; pWal->writeCur = -1;
pWal->fileInfoSet = taosArrayInit(8, sizeof(WalFileInfo)); pWal->fileInfoSet = taosArrayInit(8, sizeof(SWalFileInfo));
if (pWal->fileInfoSet == NULL) { if (pWal->fileInfoSet == NULL) {
wError("vgId:%d, path:%s, failed to init taosArray %s", pWal->cfg.vgId, pWal->path, strerror(errno)); wError("vgId:%d, path:%s, failed to init taosArray %s", pWal->cfg.vgId, pWal->path, strerror(errno));
free(pWal); free(pWal);

View File

@ -52,13 +52,13 @@ static int32_t walReadSeekFilePos(SWalReadHandle *pRead, int64_t fileFirstVer, i
int64_t logTfd = pRead->readLogTfd; int64_t logTfd = pRead->readLogTfd;
// seek position // seek position
int64_t offset = (ver - fileFirstVer) * sizeof(WalIdxEntry); int64_t offset = (ver - fileFirstVer) * sizeof(SWalIdxEntry);
code = tfLseek(idxTfd, offset, SEEK_SET); code = tfLseek(idxTfd, offset, SEEK_SET);
if (code < 0) { if (code < 0) {
return -1; return -1;
} }
WalIdxEntry entry; SWalIdxEntry entry;
if (tfRead(idxTfd, &entry, sizeof(WalIdxEntry)) != sizeof(WalIdxEntry)) { if (tfRead(idxTfd, &entry, sizeof(SWalIdxEntry)) != sizeof(SWalIdxEntry)) {
return -1; return -1;
} }
// TODO:deserialize // TODO:deserialize
@ -105,10 +105,10 @@ static int32_t walReadSeekVer(SWalReadHandle *pRead, int64_t ver) {
if (ver < pWal->vers.snapshotVer) { if (ver < pWal->vers.snapshotVer) {
} }
WalFileInfo tmpInfo; SWalFileInfo tmpInfo;
tmpInfo.firstVer = ver; tmpInfo.firstVer = ver;
// bsearch in fileSet // bsearch in fileSet
WalFileInfo *pRet = taosArraySearch(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE); SWalFileInfo *pRet = taosArraySearch(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE);
ASSERT(pRet != NULL); ASSERT(pRet != NULL);
if (pRead->curFileFirstVer != pRet->firstVer) { if (pRead->curFileFirstVer != pRet->firstVer) {
code = walReadChangeFile(pRead, pRet->firstVer); code = walReadChangeFile(pRead, pRet->firstVer);
@ -159,9 +159,9 @@ int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver) {
return -1; return -1;
} }
/*code = walValidBodyCksum(pRead->pHead);*/
ASSERT(pRead->pHead->head.version == ver); ASSERT(pRead->pHead->head.version == ver);
code = walValidBodyCksum(pRead->pHead);
if (code != 0) { if (code != 0) {
return -1; return -1;
} }

View File

@ -32,9 +32,9 @@ static int walSeekFilePos(SWal* pWal, int64_t ver) {
if (code != 0) { if (code != 0) {
return -1; return -1;
} }
WalIdxEntry entry; SWalIdxEntry entry;
// TODO:deserialize // TODO:deserialize
code = tfRead(idxTfd, &entry, sizeof(WalIdxEntry)); code = tfRead(idxTfd, &entry, sizeof(SWalIdxEntry));
if (code != 0) { if (code != 0) {
return -1; return -1;
} }
@ -48,7 +48,7 @@ static int walSeekFilePos(SWal* pWal, int64_t ver) {
int walChangeFileToLast(SWal* pWal) { int walChangeFileToLast(SWal* pWal) {
int64_t idxTfd, logTfd; int64_t idxTfd, logTfd;
WalFileInfo* pRet = taosArrayGetLast(pWal->fileInfoSet); SWalFileInfo* pRet = taosArrayGetLast(pWal->fileInfoSet);
ASSERT(pRet != NULL); ASSERT(pRet != NULL);
int64_t fileFirstVer = pRet->firstVer; int64_t fileFirstVer = pRet->firstVer;
@ -83,10 +83,10 @@ int walChangeFile(SWal* pWal, int64_t ver) {
// TODO // TODO
return -1; return -1;
} }
WalFileInfo tmpInfo; SWalFileInfo tmpInfo;
tmpInfo.firstVer = ver; tmpInfo.firstVer = ver;
// bsearch in fileSet // bsearch in fileSet
WalFileInfo* pRet = taosArraySearch(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE); SWalFileInfo* pRet = taosArraySearch(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE);
ASSERT(pRet != NULL); ASSERT(pRet != NULL);
int64_t fileFirstVer = pRet->firstVer; int64_t fileFirstVer = pRet->firstVer;
// closed // closed

View File

@ -56,9 +56,9 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
// delete files // delete files
int fileSetSize = taosArrayGetSize(pWal->fileInfoSet); int fileSetSize = taosArrayGetSize(pWal->fileInfoSet);
for (int i = pWal->writeCur; i < fileSetSize; i++) { for (int i = pWal->writeCur; i < fileSetSize; i++) {
walBuildLogName(pWal, ((WalFileInfo *)taosArrayGet(pWal->fileInfoSet, i))->firstVer, fnameStr); walBuildLogName(pWal, ((SWalFileInfo *)taosArrayGet(pWal->fileInfoSet, i))->firstVer, fnameStr);
remove(fnameStr); remove(fnameStr);
walBuildIdxName(pWal, ((WalFileInfo *)taosArrayGet(pWal->fileInfoSet, i))->firstVer, fnameStr); walBuildIdxName(pWal, ((SWalFileInfo *)taosArrayGet(pWal->fileInfoSet, i))->firstVer, fnameStr);
remove(fnameStr); remove(fnameStr);
} }
// pop from fileInfoSet // pop from fileInfoSet
@ -81,8 +81,8 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
} }
// read idx file and get log file pos // read idx file and get log file pos
// TODO:change to deserialize function // TODO:change to deserialize function
WalIdxEntry entry; SWalIdxEntry entry;
if (tfRead(idxTfd, &entry, sizeof(WalIdxEntry)) != sizeof(WalIdxEntry)) { if (tfRead(idxTfd, &entry, sizeof(SWalIdxEntry)) != sizeof(SWalIdxEntry)) {
pthread_mutex_unlock(&pWal->mutex); pthread_mutex_unlock(&pWal->mutex);
return -1; return -1;
} }
@ -128,8 +128,8 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
return -1; return -1;
} }
pWal->vers.lastVer = ver - 1; pWal->vers.lastVer = ver - 1;
((WalFileInfo *)taosArrayGetLast(pWal->fileInfoSet))->lastVer = ver - 1; ((SWalFileInfo *)taosArrayGetLast(pWal->fileInfoSet))->lastVer = ver - 1;
((WalFileInfo *)taosArrayGetLast(pWal->fileInfoSet))->fileSize = entry.offset; ((SWalFileInfo *)taosArrayGetLast(pWal->fileInfoSet))->fileSize = entry.offset;
// unlock // unlock
pthread_mutex_unlock(&pWal->mutex); pthread_mutex_unlock(&pWal->mutex);
@ -155,15 +155,15 @@ int32_t walEndSnapshot(SWal *pWal) {
int deleteCnt = 0; int deleteCnt = 0;
int64_t newTotSize = pWal->totSize; int64_t newTotSize = pWal->totSize;
WalFileInfo tmp; SWalFileInfo tmp;
tmp.firstVer = ver; tmp.firstVer = ver;
// find files safe to delete // find files safe to delete
WalFileInfo *pInfo = taosArraySearch(pWal->fileInfoSet, &tmp, compareWalFileInfo, TD_LE); SWalFileInfo *pInfo = taosArraySearch(pWal->fileInfoSet, &tmp, compareWalFileInfo, TD_LE);
if (ver >= pInfo->lastVer) { if (ver >= pInfo->lastVer) {
pInfo++; pInfo++;
} }
// iterate files, until the searched result // iterate files, until the searched result
for (WalFileInfo *iter = pWal->fileInfoSet->pData; iter < pInfo; iter++) { for (SWalFileInfo *iter = pWal->fileInfoSet->pData; iter < pInfo; iter++) {
if (pWal->totSize > pWal->cfg.retentionSize || iter->closeTs + pWal->cfg.retentionPeriod > ts) { if (pWal->totSize > pWal->cfg.retentionSize || iter->closeTs + pWal->cfg.retentionPeriod > ts) {
// delete according to file size or close time // delete according to file size or close time
deleteCnt++; deleteCnt++;
@ -173,7 +173,7 @@ int32_t walEndSnapshot(SWal *pWal) {
char fnameStr[WAL_FILE_LEN]; char fnameStr[WAL_FILE_LEN];
// remove file // remove file
for (int i = 0; i < deleteCnt; i++) { for (int i = 0; i < deleteCnt; i++) {
WalFileInfo *pInfo = taosArrayGet(pWal->fileInfoSet, i); SWalFileInfo *pInfo = taosArrayGet(pWal->fileInfoSet, i);
walBuildLogName(pWal, pInfo->firstVer, fnameStr); walBuildLogName(pWal, pInfo->firstVer, fnameStr);
remove(fnameStr); remove(fnameStr);
walBuildIdxName(pWal, pInfo->firstVer, fnameStr); walBuildIdxName(pWal, pInfo->firstVer, fnameStr);
@ -186,7 +186,7 @@ int32_t walEndSnapshot(SWal *pWal) {
pWal->writeCur = -1; pWal->writeCur = -1;
pWal->vers.firstVer = -1; pWal->vers.firstVer = -1;
} else { } else {
pWal->vers.firstVer = ((WalFileInfo *)taosArrayGet(pWal->fileInfoSet, 0))->firstVer; pWal->vers.firstVer = ((SWalFileInfo *)taosArrayGet(pWal->fileInfoSet, 0))->firstVer;
} }
pWal->writeCur = taosArrayGetSize(pWal->fileInfoSet) - 1; pWal->writeCur = taosArrayGetSize(pWal->fileInfoSet) - 1;
; ;
@ -248,9 +248,9 @@ int walRoll(SWal *pWal) {
} }
static int walWriteIndex(SWal *pWal, int64_t ver, int64_t offset) { static int walWriteIndex(SWal *pWal, int64_t ver, int64_t offset) {
WalIdxEntry entry = {.ver = ver, .offset = offset}; SWalIdxEntry entry = {.ver = ver, .offset = offset};
int size = tfWrite(pWal->writeIdxTfd, &entry, sizeof(WalIdxEntry)); int size = tfWrite(pWal->writeIdxTfd, &entry, sizeof(SWalIdxEntry));
if (size != sizeof(WalIdxEntry)) { if (size != sizeof(SWalIdxEntry)) {
// TODO truncate // TODO truncate
return -1; return -1;
} }