Merge pull request #11788 from taosdata/feature/tq
refacor(wal): rename len to bodyLen
This commit is contained in:
commit
314a92fb76
|
@ -491,7 +491,7 @@ typedef struct {
|
||||||
char intervalUnit;
|
char intervalUnit;
|
||||||
char slidingUnit;
|
char slidingUnit;
|
||||||
char
|
char
|
||||||
offsetUnit; // TODO Remove it, the offset is the number of precision tickle, and it must be a immutable duration.
|
offsetUnit; // TODO Remove it, the offset is the number of precision tickle, and it must be a immutable duration.
|
||||||
int8_t precision;
|
int8_t precision;
|
||||||
int64_t interval;
|
int64_t interval;
|
||||||
int64_t sliding;
|
int64_t sliding;
|
||||||
|
@ -651,7 +651,6 @@ typedef struct SQueryNodeAddr {
|
||||||
SEpSet epSet;
|
SEpSet epSet;
|
||||||
} SQueryNodeAddr;
|
} SQueryNodeAddr;
|
||||||
|
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
SArray* pArray; // Array of SUseDbRsp
|
SArray* pArray; // Array of SUseDbRsp
|
||||||
} SUseDbBatchRsp;
|
} SUseDbBatchRsp;
|
||||||
|
@ -724,7 +723,7 @@ typedef struct {
|
||||||
|
|
||||||
int32_t tSerializeSRetrieveFuncRsp(void* buf, int32_t bufLen, SRetrieveFuncRsp* pRsp);
|
int32_t tSerializeSRetrieveFuncRsp(void* buf, int32_t bufLen, SRetrieveFuncRsp* pRsp);
|
||||||
int32_t tDeserializeSRetrieveFuncRsp(void* buf, int32_t bufLen, SRetrieveFuncRsp* pRsp);
|
int32_t tDeserializeSRetrieveFuncRsp(void* buf, int32_t bufLen, SRetrieveFuncRsp* pRsp);
|
||||||
void tFreeSFuncInfo(SFuncInfo *pInfo);
|
void tFreeSFuncInfo(SFuncInfo* pInfo);
|
||||||
void tFreeSRetrieveFuncRsp(SRetrieveFuncRsp* pRsp);
|
void tFreeSRetrieveFuncRsp(SRetrieveFuncRsp* pRsp);
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
@ -1289,14 +1288,14 @@ typedef struct {
|
||||||
} SMVCreateStreamRsp, SMSCreateStreamRsp;
|
} SMVCreateStreamRsp, SMSCreateStreamRsp;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
char name[TSDB_TOPIC_FNAME_LEN]; // accout.topic
|
char name[TSDB_TOPIC_FNAME_LEN]; // accout.topic
|
||||||
int8_t igExists;
|
int8_t igExists;
|
||||||
int8_t withTbName;
|
int8_t withTbName;
|
||||||
int8_t withSchema;
|
int8_t withSchema;
|
||||||
int8_t withTag;
|
int8_t withTag;
|
||||||
char* sql;
|
char* sql;
|
||||||
char* ast;
|
char* ast;
|
||||||
char subscribeDbName[TSDB_DB_NAME_LEN];
|
char subscribeDbName[TSDB_DB_NAME_LEN];
|
||||||
} SCMCreateTopicReq;
|
} SCMCreateTopicReq;
|
||||||
|
|
||||||
int32_t tSerializeSCMCreateTopicReq(void* buf, int32_t bufLen, const SCMCreateTopicReq* pReq);
|
int32_t tSerializeSCMCreateTopicReq(void* buf, int32_t bufLen, const SCMCreateTopicReq* pReq);
|
||||||
|
|
|
@ -92,7 +92,7 @@ typedef struct SWalReadHead {
|
||||||
int8_t headVer;
|
int8_t headVer;
|
||||||
int8_t reserved;
|
int8_t reserved;
|
||||||
int16_t msgType;
|
int16_t msgType;
|
||||||
int32_t len;
|
int32_t bodyLen;
|
||||||
int64_t ingestTs; // not implemented
|
int64_t ingestTs; // not implemented
|
||||||
int64_t version;
|
int64_t version;
|
||||||
|
|
||||||
|
|
|
@ -172,7 +172,7 @@ static int32_t mndProcessCommitOffsetReq(SNodeMsg *pMsg) {
|
||||||
bool create = false;
|
bool create = false;
|
||||||
SMqOffsetObj *pOffsetObj = mndAcquireOffset(pMnode, key);
|
SMqOffsetObj *pOffsetObj = mndAcquireOffset(pMnode, key);
|
||||||
if (pOffsetObj == NULL) {
|
if (pOffsetObj == NULL) {
|
||||||
pOffsetObj = taosMemoryMalloc(sizeof(SMqOffset));
|
pOffsetObj = taosMemoryMalloc(sizeof(SMqOffsetObj));
|
||||||
memcpy(pOffsetObj->key, key, TSDB_PARTITION_KEY_LEN);
|
memcpy(pOffsetObj->key, key, TSDB_PARTITION_KEY_LEN);
|
||||||
create = true;
|
create = true;
|
||||||
}
|
}
|
||||||
|
|
|
@ -71,7 +71,7 @@ SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) {
|
||||||
SWalReadHandle* pWalHandle = walOpenReadHandle(pWal);
|
SWalReadHandle* pWalHandle = walOpenReadHandle(pWal);
|
||||||
assert(walReadWithHandle(pWalHandle, index) == 0);
|
assert(walReadWithHandle(pWalHandle, index) == 0);
|
||||||
|
|
||||||
SSyncRaftEntry* pEntry = syncEntryBuild(pWalHandle->pHead->head.len);
|
SSyncRaftEntry* pEntry = syncEntryBuild(pWalHandle->pHead->head.bodyLen);
|
||||||
assert(pEntry != NULL);
|
assert(pEntry != NULL);
|
||||||
|
|
||||||
pEntry->msgType = TDMT_VND_SYNC_CLIENT_REQUEST;
|
pEntry->msgType = TDMT_VND_SYNC_CLIENT_REQUEST;
|
||||||
|
@ -80,8 +80,8 @@ SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) {
|
||||||
pEntry->isWeak = pWalHandle->pHead->head.syncMeta.isWeek;
|
pEntry->isWeak = pWalHandle->pHead->head.syncMeta.isWeek;
|
||||||
pEntry->term = pWalHandle->pHead->head.syncMeta.term;
|
pEntry->term = pWalHandle->pHead->head.syncMeta.term;
|
||||||
pEntry->index = index;
|
pEntry->index = index;
|
||||||
assert(pEntry->dataLen == pWalHandle->pHead->head.len);
|
assert(pEntry->dataLen == pWalHandle->pHead->head.bodyLen);
|
||||||
memcpy(pEntry->data, pWalHandle->pHead->head.body, pWalHandle->pHead->head.len);
|
memcpy(pEntry->data, pWalHandle->pHead->head.body, pWalHandle->pHead->head.bodyLen);
|
||||||
|
|
||||||
// need to hold, do not new every time!!
|
// need to hold, do not new every time!!
|
||||||
walCloseReadHandle(pWalHandle);
|
walCloseReadHandle(pWalHandle);
|
||||||
|
@ -257,4 +257,4 @@ void logStoreSimpleLog2(char* s, SSyncLogStore* pLogStore) {
|
||||||
char* serialized = logStoreSimple2Str(pLogStore);
|
char* serialized = logStoreSimple2Str(pLogStore);
|
||||||
sTrace("logStoreSimpleLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized);
|
sTrace("logStoreSimpleLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized);
|
||||||
taosMemoryFree(serialized);
|
taosMemoryFree(serialized);
|
||||||
}
|
}
|
||||||
|
|
|
@ -16,10 +16,10 @@
|
||||||
#ifndef _TD_WAL_INT_H_
|
#ifndef _TD_WAL_INT_H_
|
||||||
#define _TD_WAL_INT_H_
|
#define _TD_WAL_INT_H_
|
||||||
|
|
||||||
#include "tcompare.h"
|
|
||||||
#include "taoserror.h"
|
#include "taoserror.h"
|
||||||
#include "tchecksum.h"
|
#include "tchecksum.h"
|
||||||
#include "tcoding.h"
|
#include "tcoding.h"
|
||||||
|
#include "tcompare.h"
|
||||||
#include "wal.h"
|
#include "wal.h"
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
|
@ -41,10 +41,10 @@ typedef struct WalIdxEntry {
|
||||||
} SWalIdxEntry;
|
} SWalIdxEntry;
|
||||||
|
|
||||||
static inline int tSerializeWalIdxEntry(void** buf, SWalIdxEntry* pIdxEntry) {
|
static inline int tSerializeWalIdxEntry(void** buf, SWalIdxEntry* pIdxEntry) {
|
||||||
int tlen;
|
int tlen = 0;
|
||||||
tlen += taosEncodeFixedI64(buf, pIdxEntry->ver);
|
tlen += taosEncodeFixedI64(buf, pIdxEntry->ver);
|
||||||
tlen += taosEncodeFixedI64(buf, pIdxEntry->offset);
|
tlen += taosEncodeFixedI64(buf, pIdxEntry->offset);
|
||||||
return 0;
|
return tlen;
|
||||||
}
|
}
|
||||||
|
|
||||||
static inline void* tDeserializeWalIdxEntry(void* buf, SWalIdxEntry* pIdxEntry) {
|
static inline void* tDeserializeWalIdxEntry(void* buf, SWalIdxEntry* pIdxEntry) {
|
||||||
|
@ -103,7 +103,7 @@ static inline int walValidHeadCksum(SWalHead* pHead) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static inline int walValidBodyCksum(SWalHead* pHead) {
|
static inline int walValidBodyCksum(SWalHead* pHead) {
|
||||||
return taosCheckChecksum((uint8_t*)pHead->head.body, pHead->head.len, pHead->cksumBody);
|
return taosCheckChecksum((uint8_t*)pHead->head.body, pHead->head.bodyLen, pHead->cksumBody);
|
||||||
}
|
}
|
||||||
|
|
||||||
static inline int walValidCksum(SWalHead* pHead, void* body, int64_t bodyLen) {
|
static inline int walValidCksum(SWalHead* pHead, void* body, int64_t bodyLen) {
|
||||||
|
|
|
@ -144,12 +144,12 @@ int32_t walReadWithHandle_s(SWalReadHandle *pRead, int64_t ver, SWalReadHead **p
|
||||||
taosThreadMutexUnlock(&pRead->mutex);
|
taosThreadMutexUnlock(&pRead->mutex);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
*ppHead = taosMemoryMalloc(sizeof(SWalReadHead) + pRead->pHead->head.len);
|
*ppHead = taosMemoryMalloc(sizeof(SWalReadHead) + pRead->pHead->head.bodyLen);
|
||||||
if (*ppHead == NULL) {
|
if (*ppHead == NULL) {
|
||||||
taosThreadMutexUnlock(&pRead->mutex);
|
taosThreadMutexUnlock(&pRead->mutex);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
memcpy(*ppHead, &pRead->pHead->head, sizeof(SWalReadHead) + pRead->pHead->head.len);
|
memcpy(*ppHead, &pRead->pHead->head, sizeof(SWalReadHead) + pRead->pHead->head.bodyLen);
|
||||||
taosThreadMutexUnlock(&pRead->mutex);
|
taosThreadMutexUnlock(&pRead->mutex);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -178,16 +178,18 @@ int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver) {
|
||||||
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
if (pRead->capacity < pRead->pHead->head.len) {
|
if (pRead->capacity < pRead->pHead->head.bodyLen) {
|
||||||
void *ptr = taosMemoryRealloc(pRead->pHead, sizeof(SWalHead) + pRead->pHead->head.len);
|
void *ptr = taosMemoryRealloc(pRead->pHead, sizeof(SWalHead) + pRead->pHead->head.bodyLen);
|
||||||
if (ptr == NULL) {
|
if (ptr == NULL) {
|
||||||
terrno = TSDB_CODE_WAL_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_WAL_OUT_OF_MEMORY;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
pRead->pHead = ptr;
|
pRead->pHead = ptr;
|
||||||
pRead->capacity = pRead->pHead->head.len;
|
pRead->capacity = pRead->pHead->head.bodyLen;
|
||||||
}
|
}
|
||||||
if (pRead->pHead->head.len != taosReadFile(pRead->pReadLogTFile, pRead->pHead->head.body, pRead->pHead->head.len)) {
|
|
||||||
|
if (pRead->pHead->head.bodyLen !=
|
||||||
|
taosReadFile(pRead->pReadLogTFile, pRead->pHead->head.body, pRead->pHead->head.bodyLen)) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -295,7 +295,7 @@ int64_t walWriteWithSyncInfo(SWal *pWal, int64_t index, tmsg_t msgType, SSyncLog
|
||||||
pWal->writeHead.head.version = index;
|
pWal->writeHead.head.version = index;
|
||||||
|
|
||||||
int64_t offset = walGetCurFileOffset(pWal);
|
int64_t offset = walGetCurFileOffset(pWal);
|
||||||
pWal->writeHead.head.len = bodyLen;
|
pWal->writeHead.head.bodyLen = bodyLen;
|
||||||
pWal->writeHead.head.msgType = msgType;
|
pWal->writeHead.head.msgType = msgType;
|
||||||
|
|
||||||
// sync info
|
// sync info
|
||||||
|
|
|
@ -320,7 +320,7 @@ TEST_F(WalKeepEnv, readHandleRead) {
|
||||||
char newStr[100];
|
char newStr[100];
|
||||||
sprintf(newStr, "%s-%d", ranStr, ver);
|
sprintf(newStr, "%s-%d", ranStr, ver);
|
||||||
int len = strlen(newStr);
|
int len = strlen(newStr);
|
||||||
ASSERT_EQ(pRead->pHead->head.len, len);
|
ASSERT_EQ(pRead->pHead->head.bodyLen, len);
|
||||||
for (int j = 0; j < len; j++) {
|
for (int j = 0; j < len; j++) {
|
||||||
EXPECT_EQ(newStr[j], pRead->pHead->head.body[j]);
|
EXPECT_EQ(newStr[j], pRead->pHead->head.body[j]);
|
||||||
}
|
}
|
||||||
|
@ -372,7 +372,7 @@ TEST_F(WalRetentionEnv, repairMeta1) {
|
||||||
char newStr[100];
|
char newStr[100];
|
||||||
sprintf(newStr, "%s-%d", ranStr, ver);
|
sprintf(newStr, "%s-%d", ranStr, ver);
|
||||||
int len = strlen(newStr);
|
int len = strlen(newStr);
|
||||||
ASSERT_EQ(pRead->pHead->head.len, len);
|
ASSERT_EQ(pRead->pHead->head.bodyLen, len);
|
||||||
for (int j = 0; j < len; j++) {
|
for (int j = 0; j < len; j++) {
|
||||||
EXPECT_EQ(newStr[j], pRead->pHead->head.body[j]);
|
EXPECT_EQ(newStr[j], pRead->pHead->head.body[j]);
|
||||||
}
|
}
|
||||||
|
@ -402,7 +402,7 @@ TEST_F(WalRetentionEnv, repairMeta1) {
|
||||||
char newStr[100];
|
char newStr[100];
|
||||||
sprintf(newStr, "%s-%d", ranStr, ver);
|
sprintf(newStr, "%s-%d", ranStr, ver);
|
||||||
int len = strlen(newStr);
|
int len = strlen(newStr);
|
||||||
ASSERT_EQ(pRead->pHead->head.len, len);
|
ASSERT_EQ(pRead->pHead->head.bodyLen, len);
|
||||||
for (int j = 0; j < len; j++) {
|
for (int j = 0; j < len; j++) {
|
||||||
EXPECT_EQ(newStr[j], pRead->pHead->head.body[j]);
|
EXPECT_EQ(newStr[j], pRead->pHead->head.body[j]);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue