[td-10564] merge 3.0

This commit is contained in:
Haojun Liao 2021-12-14 15:16:41 +08:00
commit c8ffc27e7e
42 changed files with 1296 additions and 887 deletions

View File

@ -10,7 +10,7 @@ set(CMAKE_SUPPORT_DIR "${CMAKE_SOURCE_DIR}/cmake")
set(CMAKE_CONTRIB_DIR "${CMAKE_SOURCE_DIR}/contrib") set(CMAKE_CONTRIB_DIR "${CMAKE_SOURCE_DIR}/contrib")
include(${CMAKE_SUPPORT_DIR}/cmake.options) include(${CMAKE_SUPPORT_DIR}/cmake.options)
SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -fPIC -gdwarf-2 -msse4.2 -mfma") SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -fPIC -gdwarf-2 -msse4.2 -mfma -g3")
# contrib # contrib
add_subdirectory(contrib) add_subdirectory(contrib)
@ -20,6 +20,10 @@ add_library(api INTERFACE)
target_include_directories(api INTERFACE "include/client") target_include_directories(api INTERFACE "include/client")
# src # src
if(${BUILD_TEST})
include(CTest)
enable_testing()
endif(${BUILD_TEST})
add_subdirectory(source) add_subdirectory(source)
# docs # docs

View File

@ -36,6 +36,8 @@ typedef struct SVnodeCfg {
struct { struct {
/** write buffer size */ /** write buffer size */
uint64_t wsize; uint64_t wsize;
uint64_t ssize;
uint64_t lsize;
/** use heap allocator or arena allocator */ /** use heap allocator or arena allocator */
bool isHeapAllocator; bool isHeapAllocator;
}; };

View File

@ -162,7 +162,7 @@ typedef struct SVgDataBlocks {
int64_t vgId; // virtual group id int64_t vgId; // virtual group id
int32_t numOfTables; // number of tables in current submit block int32_t numOfTables; // number of tables in current submit block
uint32_t size; uint32_t size;
char *pData; char *pData; // SMsgDesc + SSubmitMsg + SSubmitBlk + ...
} SVgDataBlocks; } SVgDataBlocks;
typedef struct SInsertStmtInfo { typedef struct SInsertStmtInfo {

View File

@ -32,6 +32,23 @@ extern int32_t wDebugFlag;
#define wDebug(...) { if (wDebugFlag & DEBUG_DEBUG) { taosPrintLog("WAL ", wDebugFlag, __VA_ARGS__); }} #define wDebug(...) { if (wDebugFlag & DEBUG_DEBUG) { taosPrintLog("WAL ", wDebugFlag, __VA_ARGS__); }}
#define wTrace(...) { if (wDebugFlag & DEBUG_TRACE) { taosPrintLog("WAL ", wDebugFlag, __VA_ARGS__); }} #define wTrace(...) { if (wDebugFlag & DEBUG_TRACE) { taosPrintLog("WAL ", wDebugFlag, __VA_ARGS__); }}
#define WAL_PREFIX "wal"
#define WAL_PREFIX_LEN 3
#define WAL_NOSUFFIX_LEN 20
#define WAL_SUFFIX_AT (WAL_NOSUFFIX_LEN+1)
#define WAL_LOG_SUFFIX "log"
#define WAL_INDEX_SUFFIX "idx"
#define WAL_REFRESH_MS 1000
#define WAL_MAX_SIZE (TSDB_MAX_WAL_SIZE + sizeof(SWalHead) + 16)
#define WAL_PATH_LEN (TSDB_FILENAME_LEN + 12)
#define WAL_FILE_LEN (WAL_PATH_LEN + 32)
#define WAL_IDX_ENTRY_SIZE (sizeof(int64_t)*2)
#define WAL_CUR_POS_WRITABLE 1
#define WAL_CUR_FILE_WRITABLE 2
#define WAL_CUR_FAILED 4
#pragma pack(push,1)
typedef enum { typedef enum {
TAOS_WAL_NOLOG = 0, TAOS_WAL_NOLOG = 0,
TAOS_WAL_WRITE = 1, TAOS_WAL_WRITE = 1,
@ -43,8 +60,9 @@ typedef struct SWalReadHead {
uint8_t msgType; uint8_t msgType;
int8_t reserved[2]; int8_t reserved[2];
int32_t len; int32_t len;
//int64_t ingestTs; //not implemented
int64_t version; int64_t version;
char cont[]; char body[];
} SWalReadHead; } SWalReadHead;
typedef struct { typedef struct {
@ -52,9 +70,9 @@ typedef struct {
int32_t fsyncPeriod; // millisecond int32_t fsyncPeriod; // millisecond
int32_t retentionPeriod; // secs int32_t retentionPeriod; // secs
int32_t rollPeriod; // secs int32_t rollPeriod; // secs
int32_t retentionSize; // secs int64_t retentionSize;
int64_t segSize; int64_t segSize;
EWalType walLevel; // wal level EWalType level; // wal level
} SWalCfg; } SWalCfg;
typedef struct { typedef struct {
@ -71,64 +89,48 @@ typedef struct {
SWalReadHead head; SWalReadHead head;
} SWalHead; } SWalHead;
#define WAL_PREFIX "wal" typedef struct SWalVer {
#define WAL_PREFIX_LEN 3 int64_t firstVer;
#define WAL_NOSUFFIX_LEN 20 int64_t verInSnapshotting;
#define WAL_SUFFIX_AT (WAL_NOSUFFIX_LEN+1) int64_t snapshotVer;
#define WAL_LOG_SUFFIX "log" int64_t commitVer;
#define WAL_INDEX_SUFFIX "idx" int64_t lastVer;
#define WAL_REFRESH_MS 1000 } SWalVer;
#define WAL_MAX_SIZE (TSDB_MAX_WAL_SIZE + sizeof(SWalHead) + 16)
#define WAL_SIGNATURE ((uint32_t)(0xFAFBFDFEUL))
#define WAL_PATH_LEN (TSDB_FILENAME_LEN + 12)
#define WAL_FILE_LEN (WAL_PATH_LEN + 32)
//#define WAL_FILE_NUM 1 // 3
#define WAL_FILESET_MAX 128
#define WAL_IDX_ENTRY_SIZE (sizeof(int64_t)*2)
#define WAL_CUR_POS_WRITABLE 1
#define WAL_CUR_FILE_WRITABLE 2
#define WAL_CUR_FAILED 4
typedef struct SWal { typedef struct SWal {
// cfg // cfg
int32_t vgId; SWalCfg cfg;
int32_t fsyncPeriod; // millisecond SWalVer vers;
int32_t rollPeriod; // second
int64_t segSize;
int64_t retentionSize;
int32_t retentionPeriod;
EWalType level;
//total size
int64_t totSize;
//fsync seq
int32_t fsyncSeq;
//reference
int64_t refId;
//write tfd
int64_t writeLogTfd;
int64_t writeIdxTfd;
//wal lifecycle
int64_t firstVersion;
int64_t snapshotVersion;
int64_t commitVersion;
int64_t lastVersion;
//snapshotting version
int64_t snapshottingVer;
//roll status
int64_t lastRollSeq;
//file set //file set
int32_t writeCur; int32_t writeCur;
int64_t writeLogTfd;
int64_t writeIdxTfd;
SArray* fileInfoSet; SArray* fileInfoSet;
//ctl //ctl
int32_t curStatus; int32_t curStatus;
int32_t fsyncSeq;
int64_t totSize;
int64_t refId;
int64_t lastRollSeq;
pthread_mutex_t mutex; pthread_mutex_t mutex;
//path //path
char path[WAL_PATH_LEN]; char path[WAL_PATH_LEN];
//reusable write head //reusable write head
SWalHead head; SWalHead writeHead;
} SWal; // WAL HANDLE } SWal; // WAL HANDLE
typedef struct SWalReadHandle {
SWal* pWal;
int64_t readLogTfd;
int64_t readIdxTfd;
int64_t curFileFirstVer;
int64_t curVersion;
int64_t capacity;
int64_t status; //if cursor valid
SWalHead* pHead;
} SWalReadHandle;
#pragma pack(pop)
typedef int32_t (*FWalWrite)(void *ahandle, void *pHead); typedef int32_t (*FWalWrite)(void *ahandle, void *pHead);
// module initialization // module initialization
@ -154,6 +156,10 @@ int32_t walEndTakeSnapshot(SWal *);
//int32_t walDataCorrupted(SWal*); //int32_t walDataCorrupted(SWal*);
// read // read
SWalReadHandle* walOpenReadHandle(SWal *);
void walCloseReadHandle(SWalReadHandle *);
int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver);
int32_t walRead(SWal *, SWalHead **, int64_t ver); int32_t walRead(SWal *, SWalHead **, int64_t ver);
int32_t walReadWithFp(SWal *, FWalWrite writeFp, int64_t verStart, int32_t readNum); int32_t walReadWithFp(SWal *, FWalWrite writeFp, int64_t verStart, int32_t readNum);

169
include/util/tdlist.h Normal file
View File

@ -0,0 +1,169 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_UTIL_TDLIST_H_
#define _TD_UTIL_TDLIST_H_
#ifdef __cplusplus
extern "C" {
#endif
// Single linked list
#define TD_SLIST_NODE(TYPE) \
struct { \
struct TYPE *sl_next_; \
}
#define TD_SLIST(TYPE) \
struct { \
struct TYPE *sl_head_; \
int sl_neles_; \
}
#define TD_SLIST_HEAD(sl) ((sl)->sl_head_)
#define TD_SLIST_NELES(sl) ((sl)->sl_neles_)
#define TD_SLIST_NODE_NEXT(sln) ((sln)->sl_next_)
#define tSListInit(sl) \
do { \
(sl)->sl_head_ = NULL; \
(sl)->sl_neles_ = 0; \
} while (0)
#define tSListPush(sl, sln) \
do { \
TD_SLIST_NODE_NEXT(sln) = TD_SLIST_HEAD(sl); \
TD_SLIST_HEAD(sl) = (sln); \
TD_SLIST_NELES(sl) += 1; \
} while (0)
#define tSListPop(sl) \
do { \
TD_SLIST_HEAD(sl) = TD_SLIST_NODE_NEXT(TD_SLIST_HEAD(sl)); \
TD_SLIST_NELES(sl) -= 1; \
} while (0)
// Double linked list
#define TD_DLIST_NODE(TYPE) \
struct { \
TYPE *dl_prev_; \
TYPE *dl_next_; \
}
#define TD_DLIST(TYPE) \
struct { \
struct TYPE *dl_head_; \
struct TYPE *dl_tail_; \
int dl_neles_; \
}
#define TD_DLIST_NODE_PREV(dln) ((dln)->dl_prev_)
#define TD_DLIST_NODE_NEXT(dln) ((dln)->dl_next_)
#define TD_DLIST_HEAD(dl) ((dl)->dl_head_)
#define TD_DLIST_TAIL(dl) ((dl)->dl_tail_)
#define TD_DLIST_NELES(dl) ((dl)->dl_neles_)
#define tDListInit(dl) \
do { \
TD_DLIST_HEAD(dl) = TD_DLIST_TAIL(dl) = NULL; \
TD_DLIST_NELES(dl) = 0; \
} while (0)
#define tDListAppend(dl, dln) \
do { \
if (TD_DLIST_HEAD(dl) == NULL) { \
TD_DLIST_NODE_PREV(dln) = TD_DLIST_NODE_NEXT(dln) = NULL; \
TD_DLIST_HEAD(dl) = TD_DLIST_TAIL(dl) = (dln); \
} else { \
TD_DLIST_NODE_PREV(dln) = TD_DLIST_TAIL(dl); \
TD_DLIST_NODE_NEXT(dln) = NULL; \
TD_DLIST_NODE_NEXT(TD_DLIST_TAIL(dl)) = (dln); \
TD_DLIST_TAIL(dl) = (dln); \
} \
TD_DLIST_NELES(dl) += 1; \
} while (0)
#define tDListPrepend(dl, dln) \
do { \
if (TD_DLIST_HEAD(dl) == NULL) { \
TD_DLIST_NODE_PREV(dln) = TD_DLIST_NODE_NEXT(dln) = NULL; \
TD_DLIST_HEAD(dl) = TD_DLIST_TAIL(dl) = (dln); \
} else { \
TD_DLIST_NODE_PREV(dln) = NULL; \
TD_DLIST_NODE_NEXT(dln) = TD_DLIST_HEAD(dl); \
TD_DLIST_NODE_PREV(TD_DLIST_HEAD(dl)) = (dln); \
TD_DLIST_HEAD(dl) = (dln); \
} \
TD_DLIST_NELES(dl) += 1; \
} while (0)
#define tDListPop(dl, dln) \
do { \
if (TD_DLIST_HEAD(dl) == (dln)) { \
TD_DLIST_HEAD(dl) = TD_DLIST_NODE_NEXT(dln); \
} \
if (TD_DLIST_TAIL(dl) == (dln)) { \
TD_DLIST_TAIL(dl) = TD_DLIST_NODE_PREV(dln); \
} \
if (TD_DLIST_NODE_PREV(dln) != NULL) { \
TD_DLIST_NODE_NEXT(TD_DLIST_NODE_PREV(dln)) = TD_DLIST_NODE_NEXT(dln); \
} \
if (TD_DLIST_NODE_NEXT(dln) != NULL) { \
TD_DLIST_NODE_PREV(TD_DLIST_NODE_NEXT(dln)) = TD_DLIST_NODE_PREV(dln); \
} \
TD_DLIST_NELES(dl) -= 1; \
TD_DLIST_NODE_PREV(dln) = TD_DLIST_NODE_NEXT(dln) = NULL; \
} while (0)
#if 0
// List iterator
#define TD_LIST_FITER 0
#define TD_LIST_BITER 1
#define TD_LIST_ITER(S) \
struct { \
int it_dir_; \
S * it_next_; \
S * it_ptr_; \
TD_DLIST(S) * it_list_; \
}
#define tlistIterInit(it, l, dir) \
(it)->it_dir_ = (dir); \
(it)->it_list_ = l; \
if ((dir) == TD_LIST_FITER) { \
(it)->it_next_ = (l)->dl_head_; \
} else { \
(it)->it_next_ = (l)->dl_tail_; \
}
#define tlistIterNext(it) \
({ \
(it)->it_ptr_ = (it)->it_next_; \
if ((it)->it_next_ != NULL) { \
if ((it)->it_dir_ == TD_LIST_FITER) { \
(it)->it_next_ = (it)->it_next_->next_; \
} else { \
(it)->it_next_ = (it)->it_next_->prev_; \
} \
} \
(it)->it_ptr_; \
})
#endif
#ifdef __cplusplus
}
#endif
#endif /*_TD_UTIL_TDLIST_H_*/

44
include/util/tmacro.h Normal file
View File

@ -0,0 +1,44 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_UTIL_MACRO_H_
#define _TD_UTIL_MACRO_H_
#include "os.h"
#ifdef __cplusplus
extern "C" {
#endif
// Module init/clear MACRO definitions
#define TD_MOD_UNINITIALIZED 0
#define TD_MOD_INITIALIZED 1
#define TD_MOD_UNCLEARD 0
#define TD_MOD_CLEARD 1
#define TD_DEF_MOD_INIT_FLAG(MOD) static int8_t MOD##InitFlag = TD_MOD_UNINITIALIZED
#define TD_DEF_MOD_CLEAR_FLAG(MOD) static int8_t MOD##ClearFlag = TD_MOD_UNCLEARD
#define TD_CHECK_AND_SET_MODE_INIT(MOD) \
atomic_val_compare_exchange_8(&(MOD##InitFlag), TD_MOD_UNINITIALIZED, TD_MOD_INITIALIZED)
#define TD_CHECK_AND_SET_MOD_CLEAR(MOD) atomic_val_compare_exchange_8(&(MOD##ClearFlag), TD_MOD_UNCLEARD, TD_MOD_CLEARD)
#ifdef __cplusplus
}
#endif
#endif /*_TD_UTIL_MACRO_H_*/

View File

@ -271,7 +271,6 @@ TEST_F(DndTestDnode, DropDnode_01) {
ASSERT_NE(pMsg, nullptr); ASSERT_NE(pMsg, nullptr);
ASSERT_EQ(pMsg->code, 0); ASSERT_EQ(pMsg->code, 0);
taosMsleep(1300);
SendTheCheckShowMetaMsg(TSDB_MGMT_TABLE_DNODE, "show dnodes", 7); SendTheCheckShowMetaMsg(TSDB_MGMT_TABLE_DNODE, "show dnodes", 7);
SendThenCheckShowRetrieveMsg(1); SendThenCheckShowRetrieveMsg(1);
CheckInt16(1); CheckInt16(1);
@ -363,7 +362,7 @@ TEST_F(DndTestDnode, CreateDnode_02) {
} }
TEST_F(DndTestDnode, RestartDnode_01) { TEST_F(DndTestDnode, RestartDnode_01) {
uInfo("===> stop all server"); uInfo("stop all server");
stopServer(pServer1); stopServer(pServer1);
stopServer(pServer2); stopServer(pServer2);
stopServer(pServer3); stopServer(pServer3);
@ -375,14 +374,16 @@ TEST_F(DndTestDnode, RestartDnode_01) {
pServer4 = NULL; pServer4 = NULL;
pServer5 = NULL; pServer5 = NULL;
taosMsleep(3000); // wait tcp port cleanedup uInfo("start all server");
uInfo("===> start all server");
const char* fqdn = "localhost"; const char* fqdn = "localhost";
const char* firstEp = "localhost:9521"; const char* firstEp = "localhost:9521";
pServer1 = startServer("/tmp/dndTestDnode1", fqdn, 9521, firstEp); pServer1 = startServer("/tmp/dndTestDnode1", fqdn, 9521, firstEp);
// pServer1 = startServer("/tmp/dndTestDnode3", fqdn, 9523, firstEp);
// pServer1 = startServer("/tmp/dndTestDnode4", fqdn, 9524, firstEp);
// pServer1 = startServer("/tmp/dndTestDnode5", fqdn, 9525, firstEp);
uInfo("===> all server is running"); uInfo("all server is running");
// taosMsleep(1300); // taosMsleep(1300);
// SendTheCheckShowMetaMsg(TSDB_MGMT_TABLE_DNODE, "show dnodes", 7); // SendTheCheckShowMetaMsg(TSDB_MGMT_TABLE_DNODE, "show dnodes", 7);

View File

@ -17,43 +17,40 @@
class DndTestUser : public ::testing::Test { class DndTestUser : public ::testing::Test {
protected: protected:
void SetUp() override {} static SServer* CreateServer(const char* path, const char* fqdn, uint16_t port, const char* firstEp) {
void TearDown() override {} SServer* pServer = createServer(path, fqdn, port, firstEp);
ASSERT(pServer);
return pServer;
}
static void SetUpTestSuite() { static void SetUpTestSuite() {
const char* user = "root"; initLog("/tmp/dndTestUser");
const char* pass = "taosdata";
const char* path = "/tmp/dndTestUser";
const char* fqdn = "localhost";
uint16_t port = 9524;
const char* firstEp = "localhost:9524";
pServer = createServer(path, fqdn, port, firstEp); const char* fqdn = "localhost";
ASSERT(pServer); const char* firstEp = "localhost:9530";
pClient = createClient(user, pass, fqdn, port); pServer = CreateServer("/tmp/dndTestUser", fqdn, 9530, firstEp);
pClient = createClient("root", "taosdata", fqdn, 9530);
taosMsleep(300);
} }
static void TearDownTestSuite() { static void TearDownTestSuite() {
stopServer(pServer); stopServer(pServer);
dropClient(pClient); dropClient(pClient);
pServer = NULL;
pClient = NULL;
} }
static SServer* pServer; static SServer* pServer;
static SClient* pClient; static SClient* pClient;
static int32_t connId; static int32_t connId;
};
SServer* DndTestUser::pServer; public:
SClient* DndTestUser::pClient; void SetUp() override {}
int32_t DndTestUser::connId; void TearDown() override {}
#if 0 void SendTheCheckShowMetaMsg(int8_t showType, const char* showName, int32_t columns) {
TEST_F(DndTestUser, ShowUser) {
int32_t showId = 0;
//--- meta ---
SShowMsg* pShow = (SShowMsg*)rpcMallocCont(sizeof(SShowMsg)); SShowMsg* pShow = (SShowMsg*)rpcMallocCont(sizeof(SShowMsg));
pShow->type = TSDB_MGMT_TABLE_USER; pShow->type = showType;
strcpy(pShow->db, ""); strcpy(pShow->db, "");
SRpcMsg showRpcMsg = {0}; SRpcMsg showRpcMsg = {0};
@ -63,66 +60,45 @@ TEST_F(DndTestUser, ShowUser) {
sendMsg(pClient, &showRpcMsg); sendMsg(pClient, &showRpcMsg);
ASSERT_NE(pClient->pRsp, nullptr); ASSERT_NE(pClient->pRsp, nullptr);
ASSERT_EQ(pClient->pRsp->code, 0);
ASSERT_NE(pClient->pRsp->pCont, nullptr);
SShowRsp* pShowRsp = (SShowRsp*)pClient->pRsp->pCont; SShowRsp* pShowRsp = (SShowRsp*)pClient->pRsp->pCont;
ASSERT_NE(pShowRsp, nullptr); ASSERT_NE(pShowRsp, nullptr);
pShowRsp->showId = htonl(pShowRsp->showId); pShowRsp->showId = htonl(pShowRsp->showId);
STableMetaMsg* pMeta = &pShowRsp->tableMeta; pMeta = &pShowRsp->tableMeta;
pMeta->contLen = htonl(pMeta->contLen); pMeta->numOfTags = htons(pMeta->numOfTags);
pMeta->numOfColumns = htons(pMeta->numOfColumns); pMeta->numOfColumns = htons(pMeta->numOfColumns);
pMeta->sversion = htons(pMeta->sversion); pMeta->sversion = htons(pMeta->sversion);
pMeta->tversion = htons(pMeta->tversion); pMeta->tversion = htons(pMeta->tversion);
pMeta->tid = htonl(pMeta->tid); pMeta->tuid = htobe64(pMeta->tuid);
pMeta->uid = htobe64(pMeta->uid);
pMeta->suid = htobe64(pMeta->suid); pMeta->suid = htobe64(pMeta->suid);
showId = pShowRsp->showId; showId = pShowRsp->showId;
EXPECT_NE(pShowRsp->showId, 0); EXPECT_NE(pShowRsp->showId, 0);
EXPECT_EQ(pMeta->contLen, 0); EXPECT_STREQ(pMeta->tbFname, showName);
EXPECT_STREQ(pMeta->tbFname, "show users");
EXPECT_EQ(pMeta->numOfTags, 0); EXPECT_EQ(pMeta->numOfTags, 0);
EXPECT_EQ(pMeta->numOfColumns, columns);
EXPECT_EQ(pMeta->precision, 0); EXPECT_EQ(pMeta->precision, 0);
EXPECT_EQ(pMeta->tableType, 0); EXPECT_EQ(pMeta->tableType, 0);
EXPECT_EQ(pMeta->numOfColumns, 4); EXPECT_EQ(pMeta->update, 0);
EXPECT_EQ(pMeta->sversion, 0); EXPECT_EQ(pMeta->sversion, 0);
EXPECT_EQ(pMeta->tversion, 0); EXPECT_EQ(pMeta->tversion, 0);
EXPECT_EQ(pMeta->tid, 0); EXPECT_EQ(pMeta->tuid, 0);
EXPECT_EQ(pMeta->uid, 0);
EXPECT_STREQ(pMeta->sTableName, "");
EXPECT_EQ(pMeta->suid, 0); EXPECT_EQ(pMeta->suid, 0);
}
SSchema* pSchema = NULL; void CheckSchema(int32_t index, int8_t type, int32_t bytes, const char* name) {
SSchema* pSchema = &pMeta->pSchema[index];
pSchema = &pMeta->pSchema[0];
pSchema->bytes = htons(pSchema->bytes); pSchema->bytes = htons(pSchema->bytes);
EXPECT_EQ(pSchema->colId, 0); EXPECT_EQ(pSchema->colId, 0);
EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_BINARY); EXPECT_EQ(pSchema->type, type);
EXPECT_EQ(pSchema->bytes, TSDB_USER_LEN + VARSTR_HEADER_SIZE); EXPECT_EQ(pSchema->bytes, bytes);
EXPECT_STREQ(pSchema->name, "name"); EXPECT_STREQ(pSchema->name, name);
}
pSchema = &pMeta->pSchema[1]; void SendThenCheckShowRetrieveMsg(int32_t rows) {
pSchema->bytes = htons(pSchema->bytes);
EXPECT_EQ(pSchema->colId, 0);
EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_BINARY);
EXPECT_EQ(pSchema->bytes, 10 + VARSTR_HEADER_SIZE);
EXPECT_STREQ(pSchema->name, "privilege");
pSchema = &pMeta->pSchema[2];
pSchema->bytes = htons(pSchema->bytes);
EXPECT_EQ(pSchema->colId, 0);
EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_TIMESTAMP);
EXPECT_EQ(pSchema->bytes, 8);
EXPECT_STREQ(pSchema->name, "create_time");
pSchema = &pMeta->pSchema[3];
pSchema->bytes = htons(pSchema->bytes);
EXPECT_EQ(pSchema->colId, 0);
EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_BINARY);
EXPECT_EQ(pSchema->bytes, TSDB_USER_LEN + VARSTR_HEADER_SIZE);
EXPECT_STREQ(pSchema->name, "account");
//--- retrieve ---
SRetrieveTableMsg* pRetrieve = (SRetrieveTableMsg*)rpcMallocCont(sizeof(SRetrieveTableMsg)); SRetrieveTableMsg* pRetrieve = (SRetrieveTableMsg*)rpcMallocCont(sizeof(SRetrieveTableMsg));
pRetrieve->showId = htonl(showId); pRetrieve->showId = htonl(showId);
pRetrieve->free = 0; pRetrieve->free = 0;
@ -133,86 +109,83 @@ TEST_F(DndTestUser, ShowUser) {
retrieveRpcMsg.msgType = TSDB_MSG_TYPE_SHOW_RETRIEVE; retrieveRpcMsg.msgType = TSDB_MSG_TYPE_SHOW_RETRIEVE;
sendMsg(pClient, &retrieveRpcMsg); sendMsg(pClient, &retrieveRpcMsg);
ASSERT_NE(pClient->pRsp, nullptr); ASSERT_NE(pClient->pRsp, nullptr);
ASSERT_EQ(pClient->pRsp->code, 0); ASSERT_EQ(pClient->pRsp->code, 0);
ASSERT_NE(pClient->pRsp->pCont, nullptr);
SRetrieveTableRsp* pRetrieveRsp = (SRetrieveTableRsp*)pClient->pRsp->pCont; pRetrieveRsp = (SRetrieveTableRsp*)pClient->pRsp->pCont;
ASSERT_NE(pRetrieveRsp, nullptr); ASSERT_NE(pRetrieveRsp, nullptr);
pRetrieveRsp->numOfRows = htonl(pRetrieveRsp->numOfRows); pRetrieveRsp->numOfRows = htonl(pRetrieveRsp->numOfRows);
pRetrieveRsp->offset = htobe64(pRetrieveRsp->offset); pRetrieveRsp->offset = htobe64(pRetrieveRsp->offset);
pRetrieveRsp->useconds = htobe64(pRetrieveRsp->useconds); pRetrieveRsp->useconds = htobe64(pRetrieveRsp->useconds);
pRetrieveRsp->compLen = htonl(pRetrieveRsp->compLen); pRetrieveRsp->compLen = htonl(pRetrieveRsp->compLen);
EXPECT_EQ(pRetrieveRsp->numOfRows, 2); EXPECT_EQ(pRetrieveRsp->numOfRows, rows);
EXPECT_EQ(pRetrieveRsp->offset, 0); EXPECT_EQ(pRetrieveRsp->offset, 0);
EXPECT_EQ(pRetrieveRsp->useconds, 0); EXPECT_EQ(pRetrieveRsp->useconds, 0);
EXPECT_EQ(pRetrieveRsp->completed, 1); // EXPECT_EQ(pRetrieveRsp->completed, completed);
EXPECT_EQ(pRetrieveRsp->precision, TSDB_TIME_PRECISION_MILLI); EXPECT_EQ(pRetrieveRsp->precision, TSDB_TIME_PRECISION_MILLI);
EXPECT_EQ(pRetrieveRsp->compressed, 0); EXPECT_EQ(pRetrieveRsp->compressed, 0);
EXPECT_EQ(pRetrieveRsp->reserved, 0); EXPECT_EQ(pRetrieveRsp->reserved, 0);
EXPECT_EQ(pRetrieveRsp->compLen, 0); EXPECT_EQ(pRetrieveRsp->compLen, 0);
char* pData = pRetrieveRsp->data; pData = pRetrieveRsp->data;
int32_t pos = 0; pos = 0;
char* strVal = NULL;
int64_t int64Val = 0;
//--- name ---
{
pos += sizeof(VarDataLenT);
strVal = (char*)(pData + pos);
pos += TSDB_USER_LEN;
EXPECT_STREQ(strVal, "root");
pos += sizeof(VarDataLenT);
strVal = (char*)(pData + pos);
pos += TSDB_USER_LEN;
EXPECT_STREQ(strVal, "_root");
} }
//--- privilege --- void CheckInt16(int16_t val) {
{ int16_t data = *((int16_t*)(pData + pos));
pos += sizeof(VarDataLenT); pos += sizeof(int16_t);
strVal = (char*)(pData + pos); EXPECT_EQ(data, val);
pos += 10;
EXPECT_STREQ(strVal, "super");
pos += sizeof(VarDataLenT);
strVal = (char*)(pData + pos);
pos += 10;
EXPECT_STREQ(strVal, "writable");
} }
//--- create_time --- void CheckInt64(int64_t val) {
{ int64_t data = *((int64_t*)(pData + pos));
int64Val = *((int64_t*)(pData + pos));
pos += sizeof(int64_t); pos += sizeof(int64_t);
EXPECT_GT(int64Val, 0); EXPECT_EQ(data, val);
}
int64Val = *((int64_t*)(pData + pos)); void CheckTimestamp() {
int64_t data = *((int64_t*)(pData + pos));
pos += sizeof(int64_t); pos += sizeof(int64_t);
EXPECT_GT(int64Val, 0); EXPECT_GT(data, 0);
} }
//--- account --- void CheckBinary(const char* val, int32_t len) {
{
pos += sizeof(VarDataLenT); pos += sizeof(VarDataLenT);
strVal = (char*)(pData + pos); char* data = (char*)(pData + pos);
pos += TSDB_USER_LEN; pos += len;
EXPECT_STREQ(strVal, "root"); EXPECT_STREQ(data, val);
}
pos += sizeof(VarDataLenT); int32_t showId;
strVal = (char*)(pData + pos); STableMetaMsg* pMeta;
pos += TSDB_USER_LEN; SRetrieveTableRsp* pRetrieveRsp;
EXPECT_STREQ(strVal, "root"); char* pData;
int32_t pos;
};
SServer* DndTestUser::pServer;
SClient* DndTestUser::pClient;
int32_t DndTestUser::connId;
TEST_F(DndTestUser, ShowUser) {
SendTheCheckShowMetaMsg(TSDB_MGMT_TABLE_USER, "show users", 4);
CheckSchema(0, TSDB_DATA_TYPE_BINARY, TSDB_USER_LEN + VARSTR_HEADER_SIZE, "name");
CheckSchema(1, TSDB_DATA_TYPE_BINARY, 10 + VARSTR_HEADER_SIZE, "privilege");
CheckSchema(2, TSDB_DATA_TYPE_TIMESTAMP, 8, "create time");
CheckSchema(3, TSDB_DATA_TYPE_BINARY, TSDB_USER_LEN + VARSTR_HEADER_SIZE, "account");
SendThenCheckShowRetrieveMsg(1);
CheckBinary("root", TSDB_USER_LEN);
CheckBinary("super", 10);
CheckTimestamp();
CheckBinary("root", TSDB_USER_LEN);
} }
}
#endif
TEST_F(DndTestUser, CreateUser_01) { TEST_F(DndTestUser, CreateUser_01) {
ASSERT_NE(pClient, nullptr); {
//--- create user ---
SCreateUserMsg* pReq = (SCreateUserMsg*)rpcMallocCont(sizeof(SCreateUserMsg)); SCreateUserMsg* pReq = (SCreateUserMsg*)rpcMallocCont(sizeof(SCreateUserMsg));
strcpy(pReq->user, "u1"); strcpy(pReq->user, "u1");
strcpy(pReq->pass, "p1"); strcpy(pReq->pass, "p1");
@ -226,61 +199,41 @@ TEST_F(DndTestUser, CreateUser_01) {
SRpcMsg* pMsg = pClient->pRsp; SRpcMsg* pMsg = pClient->pRsp;
ASSERT_NE(pMsg, nullptr); ASSERT_NE(pMsg, nullptr);
ASSERT_EQ(pMsg->code, 0); ASSERT_EQ(pMsg->code, 0);
//--- meta ---
SShowMsg* pShow = (SShowMsg*)rpcMallocCont(sizeof(SShowMsg));
pShow->type = TSDB_MGMT_TABLE_USER;
SRpcMsg showRpcMsg = {0};
showRpcMsg.pCont = pShow;
showRpcMsg.contLen = sizeof(SShowMsg);
showRpcMsg.msgType = TSDB_MSG_TYPE_SHOW;
sendMsg(pClient, &showRpcMsg);
SShowRsp* pShowRsp = (SShowRsp*)pClient->pRsp->pCont;
STableMetaMsg* pMeta = &pShowRsp->tableMeta;
pMeta->numOfColumns = htons(pMeta->numOfColumns);
EXPECT_EQ(pMeta->numOfColumns, 4);
//--- retrieve ---
SRetrieveTableMsg* pRetrieve = (SRetrieveTableMsg*)rpcMallocCont(sizeof(SRetrieveTableMsg));
pRetrieve->showId = pShowRsp->showId;
SRpcMsg retrieveRpcMsg = {0};
retrieveRpcMsg.pCont = pRetrieve;
retrieveRpcMsg.contLen = sizeof(SRetrieveTableMsg);
retrieveRpcMsg.msgType = TSDB_MSG_TYPE_SHOW_RETRIEVE;
sendMsg(pClient, &retrieveRpcMsg);
SRetrieveTableRsp* pRetrieveRsp = (SRetrieveTableRsp*)pClient->pRsp->pCont;
pRetrieveRsp->numOfRows = htonl(pRetrieveRsp->numOfRows);
EXPECT_EQ(pRetrieveRsp->numOfRows, 3);
char* pData = pRetrieveRsp->data;
int32_t pos = 0;
char* strVal = NULL;
//--- name ---
{
pos += sizeof(VarDataLenT);
strVal = (char*)(pData + pos);
pos += TSDB_USER_LEN;
EXPECT_STREQ(strVal, "u1");
pos += sizeof(VarDataLenT);
strVal = (char*)(pData + pos);
pos += TSDB_USER_LEN;
EXPECT_STREQ(strVal, "root");
pos += sizeof(VarDataLenT);
strVal = (char*)(pData + pos);
pos += TSDB_USER_LEN;
EXPECT_STREQ(strVal, "_root");
} }
{
SCreateUserMsg* pReq = (SCreateUserMsg*)rpcMallocCont(sizeof(SCreateUserMsg));
strcpy(pReq->user, "u2");
strcpy(pReq->pass, "p2");
SRpcMsg rpcMsg = {0};
rpcMsg.pCont = pReq;
rpcMsg.contLen = sizeof(SCreateUserMsg);
rpcMsg.msgType = TSDB_MSG_TYPE_CREATE_USER;
sendMsg(pClient, &rpcMsg);
SRpcMsg* pMsg = pClient->pRsp;
ASSERT_NE(pMsg, nullptr);
ASSERT_EQ(pMsg->code, 0);
}
SendTheCheckShowMetaMsg(TSDB_MGMT_TABLE_USER, "show users", 4);
SendThenCheckShowRetrieveMsg(3);
CheckBinary("u1", TSDB_USER_LEN);
CheckBinary("root", TSDB_USER_LEN);
CheckBinary("u2", TSDB_USER_LEN);
CheckBinary("normal", 10);
CheckBinary("super", 10);
CheckBinary("normal", 10);
CheckTimestamp();
CheckTimestamp();
CheckTimestamp();
CheckBinary("root", TSDB_USER_LEN);
CheckBinary("root", TSDB_USER_LEN);
CheckBinary("root", TSDB_USER_LEN);
} }
TEST_F(DndTestUser, AlterUser_01) { TEST_F(DndTestUser, AlterUser_01) {
ASSERT_NE(pClient, nullptr);
//--- drop user ---
SAlterUserMsg* pReq = (SAlterUserMsg*)rpcMallocCont(sizeof(SAlterUserMsg)); SAlterUserMsg* pReq = (SAlterUserMsg*)rpcMallocCont(sizeof(SAlterUserMsg));
strcpy(pReq->user, "u1"); strcpy(pReq->user, "u1");
strcpy(pReq->pass, "p2"); strcpy(pReq->pass, "p2");
@ -295,60 +248,23 @@ TEST_F(DndTestUser, AlterUser_01) {
ASSERT_NE(pMsg, nullptr); ASSERT_NE(pMsg, nullptr);
ASSERT_EQ(pMsg->code, 0); ASSERT_EQ(pMsg->code, 0);
//--- meta --- SendTheCheckShowMetaMsg(TSDB_MGMT_TABLE_USER, "show users", 4);
SShowMsg* pShow = (SShowMsg*)rpcMallocCont(sizeof(SShowMsg)); SendThenCheckShowRetrieveMsg(3);
pShow->type = TSDB_MGMT_TABLE_USER; CheckBinary("u1", TSDB_USER_LEN);
SRpcMsg showRpcMsg = {0}; CheckBinary("root", TSDB_USER_LEN);
showRpcMsg.pCont = pShow; CheckBinary("u2", TSDB_USER_LEN);
showRpcMsg.contLen = sizeof(SShowMsg); CheckBinary("normal", 10);
showRpcMsg.msgType = TSDB_MSG_TYPE_SHOW; CheckBinary("super", 10);
CheckBinary("normal", 10);
sendMsg(pClient, &showRpcMsg); CheckTimestamp();
SShowRsp* pShowRsp = (SShowRsp*)pClient->pRsp->pCont; CheckTimestamp();
STableMetaMsg* pMeta = &pShowRsp->tableMeta; CheckTimestamp();
pMeta->numOfColumns = htons(pMeta->numOfColumns); CheckBinary("root", TSDB_USER_LEN);
EXPECT_EQ(pMeta->numOfColumns, 4); CheckBinary("root", TSDB_USER_LEN);
CheckBinary("root", TSDB_USER_LEN);
//--- retrieve ---
SRetrieveTableMsg* pRetrieve = (SRetrieveTableMsg*)rpcMallocCont(sizeof(SRetrieveTableMsg));
pRetrieve->showId = pShowRsp->showId;
SRpcMsg retrieveRpcMsg = {0};
retrieveRpcMsg.pCont = pRetrieve;
retrieveRpcMsg.contLen = sizeof(SRetrieveTableMsg);
retrieveRpcMsg.msgType = TSDB_MSG_TYPE_SHOW_RETRIEVE;
sendMsg(pClient, &retrieveRpcMsg);
SRetrieveTableRsp* pRetrieveRsp = (SRetrieveTableRsp*)pClient->pRsp->pCont;
pRetrieveRsp->numOfRows = htonl(pRetrieveRsp->numOfRows);
EXPECT_EQ(pRetrieveRsp->numOfRows, 3);
char* pData = pRetrieveRsp->data;
int32_t pos = 0;
char* strVal = NULL;
//--- name ---
{
pos += sizeof(VarDataLenT);
strVal = (char*)(pData + pos);
pos += TSDB_USER_LEN;
EXPECT_STREQ(strVal, "u1");
pos += sizeof(VarDataLenT);
strVal = (char*)(pData + pos);
pos += TSDB_USER_LEN;
EXPECT_STREQ(strVal, "root");
pos += sizeof(VarDataLenT);
strVal = (char*)(pData + pos);
pos += TSDB_USER_LEN;
EXPECT_STREQ(strVal, "_root");
}
} }
TEST_F(DndTestUser, DropUser_01) { TEST_F(DndTestUser, DropUser_01) {
ASSERT_NE(pClient, nullptr);
//--- drop user ---
SDropUserMsg* pReq = (SDropUserMsg*)rpcMallocCont(sizeof(SDropUserMsg)); SDropUserMsg* pReq = (SDropUserMsg*)rpcMallocCont(sizeof(SDropUserMsg));
strcpy(pReq->user, "u1"); strcpy(pReq->user, "u1");
@ -362,47 +278,38 @@ TEST_F(DndTestUser, DropUser_01) {
ASSERT_NE(pMsg, nullptr); ASSERT_NE(pMsg, nullptr);
ASSERT_EQ(pMsg->code, 0); ASSERT_EQ(pMsg->code, 0);
//--- meta --- SendTheCheckShowMetaMsg(TSDB_MGMT_TABLE_USER, "show users", 4);
SShowMsg* pShow = (SShowMsg*)rpcMallocCont(sizeof(SShowMsg)); SendThenCheckShowRetrieveMsg(2);
pShow->type = TSDB_MGMT_TABLE_USER; CheckBinary("root", TSDB_USER_LEN);
SRpcMsg showRpcMsg = {0}; CheckBinary("u2", TSDB_USER_LEN);
showRpcMsg.pCont = pShow; CheckBinary("super", 10);
showRpcMsg.contLen = sizeof(SShowMsg); CheckBinary("normal", 10);
showRpcMsg.msgType = TSDB_MSG_TYPE_SHOW; CheckTimestamp();
CheckTimestamp();
sendMsg(pClient, &showRpcMsg); CheckBinary("root", TSDB_USER_LEN);
SShowRsp* pShowRsp = (SShowRsp*)pClient->pRsp->pCont; CheckBinary("root", TSDB_USER_LEN);
STableMetaMsg* pMeta = &pShowRsp->tableMeta;
pMeta->numOfColumns = htons(pMeta->numOfColumns);
EXPECT_EQ(pMeta->numOfColumns, 4);
//--- retrieve ---
SRetrieveTableMsg* pRetrieve = (SRetrieveTableMsg*)rpcMallocCont(sizeof(SRetrieveTableMsg));
pRetrieve->showId = pShowRsp->showId;
SRpcMsg retrieveRpcMsg = {0};
retrieveRpcMsg.pCont = pRetrieve;
retrieveRpcMsg.contLen = sizeof(SRetrieveTableMsg);
retrieveRpcMsg.msgType = TSDB_MSG_TYPE_SHOW_RETRIEVE;
sendMsg(pClient, &retrieveRpcMsg);
SRetrieveTableRsp* pRetrieveRsp = (SRetrieveTableRsp*)pClient->pRsp->pCont;
pRetrieveRsp->numOfRows = htonl(pRetrieveRsp->numOfRows);
EXPECT_EQ(pRetrieveRsp->numOfRows, 2);
char* pData = pRetrieveRsp->data;
int32_t pos = 0;
char* strVal = NULL;
//--- name ---
{
pos += sizeof(VarDataLenT);
strVal = (char*)(pData + pos);
pos += TSDB_USER_LEN;
EXPECT_STREQ(strVal, "root");
pos += sizeof(VarDataLenT);
strVal = (char*)(pData + pos);
pos += TSDB_USER_LEN;
EXPECT_STREQ(strVal, "_root");
} }
TEST_F(DndTestUser, RestartDnode) {
stopServer(pServer);
pServer = NULL;
uInfo("start all server");
const char* fqdn = "localhost";
const char* firstEp = "localhost:9530";
pServer = startServer("/tmp/dndTestUser", fqdn, 9530, firstEp);
uInfo("all server is running");
SendTheCheckShowMetaMsg(TSDB_MGMT_TABLE_USER, "show users", 4);
SendThenCheckShowRetrieveMsg(2);
CheckBinary("root", TSDB_USER_LEN);
CheckBinary("u2", TSDB_USER_LEN);
CheckBinary("super", 10);
CheckBinary("normal", 10);
CheckTimestamp();
CheckTimestamp();
CheckBinary("root", TSDB_USER_LEN);
CheckBinary("root", TSDB_USER_LEN);
} }

View File

@ -83,9 +83,11 @@ static int32_t mndCreateDefaultUsers(SMnode *pMnode) {
return -1; return -1;
} }
#if 0
if (mndCreateDefaultUser(pMnode, TSDB_DEFAULT_USER, "_" TSDB_DEFAULT_USER, TSDB_DEFAULT_PASS) != 0) { if (mndCreateDefaultUser(pMnode, TSDB_DEFAULT_USER, "_" TSDB_DEFAULT_USER, TSDB_DEFAULT_PASS) != 0) {
return -1; return -1;
} }
#endif
return 0; return 0;
} }
@ -459,7 +461,7 @@ static int32_t mndGetUserMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *p
pShow->bytes[cols] = 8; pShow->bytes[cols] = 8;
pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP; pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
strcpy(pSchema[cols].name, "create_time"); strcpy(pSchema[cols].name, "create time");
pSchema[cols].bytes = htons(pShow->bytes[cols]); pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++; cols++;
@ -506,12 +508,9 @@ static int32_t mndRetrieveUsers(SMnodeMsg *pMsg, SShowObj *pShow, char *data, in
if (pUser->superUser) { if (pUser->superUser) {
const char *src = "super"; const char *src = "super";
STR_WITH_SIZE_TO_VARSTR(pWrite, src, strlen(src)); STR_WITH_SIZE_TO_VARSTR(pWrite, src, strlen(src));
// } else if (pUser->writeAuth) { } else {
// const char *src = "writable"; const char *src = "normal";
// STR_WITH_SIZE_TO_VARSTR(pWrite, src, strlen(src)); STR_WITH_SIZE_TO_VARSTR(pWrite, src, strlen(src));
// } else {
// const char *src = "readable";
// STR_WITH_SIZE_TO_VARSTR(pWrite, src, strlen(src));
} }
cols++; cols++;

View File

@ -52,10 +52,10 @@ SSdb *sdbInit(SSdbOpt *pOption) {
void sdbCleanup(SSdb *pSdb) { void sdbCleanup(SSdb *pSdb) {
mDebug("start to cleanup sdb"); mDebug("start to cleanup sdb");
if (pSdb->curVer != pSdb->lastCommitVer) { // if (pSdb->curVer != pSdb->lastCommitVer) {
mDebug("write sdb file for curVer:% " PRId64 " and lastVer:%" PRId64, pSdb->curVer, pSdb->lastCommitVer); mDebug("write sdb file for curVer:% " PRId64 " and lastVer:%" PRId64, pSdb->curVer, pSdb->lastCommitVer);
sdbWriteFile(pSdb); sdbWriteFile(pSdb);
} // }
if (pSdb->currDir != NULL) { if (pSdb->currDir != NULL) {
tfree(pSdb->currDir); tfree(pSdb->currDir);

View File

@ -18,6 +18,6 @@ target_link_libraries(
) )
# test # test
#if(${BUILD_TEST}) if(${BUILD_TEST})
# add_subdirectory(test) add_subdirectory(test)
#endif(${BUILD_TEST}) endif(${BUILD_TEST})

View File

@ -28,6 +28,7 @@ typedef struct SVBufPool SVBufPool;
int vnodeOpenBufPool(SVnode *pVnode); int vnodeOpenBufPool(SVnode *pVnode);
void vnodeCloseBufPool(SVnode *pVnode); void vnodeCloseBufPool(SVnode *pVnode);
void *vnodeMalloc(SVnode *pVnode, uint64_t size); void *vnodeMalloc(SVnode *pVnode, uint64_t size);
bool vnodeBufPoolIsFull(SVnode *pVnode);
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -22,7 +22,7 @@
extern "C" { extern "C" {
#endif #endif
bool vnodeShouldCommit(SVnode *pVnode); #define vnodeShouldCommit vnodeBufPoolIsFull
int vnodeAsyncCommit(SVnode *pVnode); int vnodeAsyncCommit(SVnode *pVnode);
#ifdef __cplusplus #ifdef __cplusplus

View File

@ -18,15 +18,17 @@
#include "mallocator.h" #include "mallocator.h"
#include "sync.h" #include "sync.h"
#include "tcoding.h"
#include "tdlist.h"
#include "tlockfree.h" #include "tlockfree.h"
#include "wal.h" #include "wal.h"
#include "tcoding.h"
#include "vnode.h" #include "vnode.h"
#include "vnodeBufferPool.h" #include "vnodeBufferPool.h"
#include "vnodeCfg.h" #include "vnodeCfg.h"
#include "vnodeCommit.h" #include "vnodeCommit.h"
#include "vnodeFS.h" #include "vnodeFS.h"
#include "vnodeMemAllocator.h"
#include "vnodeRequest.h" #include "vnodeRequest.h"
#include "vnodeStateMgr.h" #include "vnodeStateMgr.h"
#include "vnodeSync.h" #include "vnodeSync.h"

View File

@ -16,15 +16,37 @@
#ifndef _TD_VNODE_MEM_ALLOCATOR_H_ #ifndef _TD_VNODE_MEM_ALLOCATOR_H_
#define _TD_VNODE_MEM_ALLOCATOR_H_ #define _TD_VNODE_MEM_ALLOCATOR_H_
#include "mallocator.h" #include "os.h"
#include "vnode.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
SMemAllocator *vnodeCreateMemAllocator(SVnode *pVnode); typedef struct SVArenaNode SVArenaNode;
void vnodeDestroyMemAllocator(SMemAllocator *pma); typedef struct SVMemAllocator SVMemAllocator;
struct SVArenaNode {
TD_SLIST_NODE(SVArenaNode);
uint64_t size; // current node size
void * ptr;
char data[];
};
struct SVMemAllocator {
TD_DLIST_NODE(SVMemAllocator);
uint64_t capacity;
uint64_t ssize;
uint64_t lsize;
SVArenaNode *pNode;
TD_SLIST(SVArenaNode) nlist;
};
SVMemAllocator *vmaCreate(uint64_t capacity, uint64_t ssize, uint64_t lsize);
void vmaDestroy(SVMemAllocator *pVMA);
void vmaReset(SVMemAllocator *pVMA);
void * vmaMalloc(SVMemAllocator *pVMA, uint64_t size);
void vmaFree(SVMemAllocator *pVMA, void *ptr);
bool vmaIsFull(SVMemAllocator *pVMA);
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -0,0 +1,117 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "vnodeDef.h"
static SVArenaNode *vArenaNodeNew(uint64_t capacity);
static void vArenaNodeFree(SVArenaNode *pNode);
SVMemAllocator *vmaCreate(uint64_t capacity, uint64_t ssize, uint64_t lsize) {
SVMemAllocator *pVMA = (SVMemAllocator *)malloc(sizeof(*pVMA));
if (pVMA == NULL) {
return NULL;
}
pVMA->capacity = capacity;
pVMA->ssize = ssize;
pVMA->lsize = lsize;
tSListInit(&(pVMA->nlist));
pVMA->pNode = vArenaNodeNew(capacity);
if (pVMA->pNode == NULL) {
free(pVMA);
return NULL;
}
tSListPush(&(pVMA->nlist), pVMA->pNode);
return pVMA;
}
void vmaDestroy(SVMemAllocator *pVMA) {
if (pVMA) {
while (TD_SLIST_NELES(&(pVMA->nlist)) > 1) {
SVArenaNode *pNode = TD_SLIST_HEAD(&(pVMA->nlist));
tSListPop(&(pVMA->nlist));
vArenaNodeFree(pNode);
}
free(pVMA);
}
}
void vmaReset(SVMemAllocator *pVMA) {
while (TD_SLIST_NELES(&(pVMA->nlist)) > 1) {
SVArenaNode *pNode = TD_SLIST_HEAD(&(pVMA->nlist));
tSListPop(&(pVMA->nlist));
vArenaNodeFree(pNode);
}
SVArenaNode *pNode = TD_SLIST_HEAD(&(pVMA->nlist));
pNode->ptr = pNode->data;
}
void *vmaMalloc(SVMemAllocator *pVMA, uint64_t size) {
SVArenaNode *pNode = TD_SLIST_HEAD(&(pVMA->nlist));
void * ptr;
if (pNode->size < POINTER_DISTANCE(pNode->ptr, pNode->data) + size) {
uint64_t capacity = MAX(pVMA->ssize, size);
pNode = vArenaNodeNew(capacity);
if (pNode == NULL) {
// TODO: handle error
return NULL;
}
tSListPush(&(pVMA->nlist), pNode);
}
ptr = pNode->ptr;
pNode->ptr = POINTER_SHIFT(ptr, size);
return ptr;
}
void vmaFree(SVMemAllocator *pVMA, void *ptr) {
// TODO
}
bool vmaIsFull(SVMemAllocator *pVMA) {
SVArenaNode *pNode = TD_SLIST_HEAD(&(pVMA->nlist));
return (TD_SLIST_NELES(&(pVMA->nlist)) > 1) ||
(pNode->size < POINTER_DISTANCE(pNode->ptr, pNode->data) + pVMA->lsize);
}
/* ------------------------ STATIC METHODS ------------------------ */
static SVArenaNode *vArenaNodeNew(uint64_t capacity) {
SVArenaNode *pNode = NULL;
pNode = (SVArenaNode *)malloc(sizeof(*pNode) + capacity);
if (pNode == NULL) {
return NULL;
}
pNode->size = capacity;
pNode->ptr = pNode->data;
return pNode;
}
static void vArenaNodeFree(SVArenaNode *pNode) {
if (pNode) {
free(pNode);
}
}

View File

@ -19,14 +19,90 @@
#define VNODE_BUF_POOL_SHARDS 3 #define VNODE_BUF_POOL_SHARDS 3
struct SVBufPool { struct SVBufPool {
// buffer pool impl TD_DLIST(SVMemAllocator) free;
SList free; TD_DLIST(SVMemAllocator) incycle;
SList incycle; SVMemAllocator *inuse;
SListNode *inuse;
// MAF for submodules // MAF for submodules
SMemAllocatorFactory maf; // SMemAllocatorFactory maf;
}; };
int vnodeOpenBufPool(SVnode *pVnode) {
uint64_t capacity;
// EVMemAllocatorT type = E_V_ARENA_ALLOCATOR;
if ((pVnode->pBufPool = (SVBufPool *)calloc(1, sizeof(SVBufPool))) == NULL) {
/* TODO */
return -1;
}
tDListInit(&(pVnode->pBufPool->free));
tDListInit(&(pVnode->pBufPool->incycle));
pVnode->pBufPool->inuse = NULL;
// TODO
capacity = pVnode->config.wsize / VNODE_BUF_POOL_SHARDS;
for (int i = 0; i < VNODE_BUF_POOL_SHARDS; i++) {
SVMemAllocator *pVMA = vmaCreate(capacity, pVnode->config.ssize, pVnode->config.lsize);
if (pVMA == NULL) {
// TODO: handle error
return -1;
}
tDListAppend(&(pVnode->pBufPool->free), pVMA);
}
return 0;
}
void vnodeCloseBufPool(SVnode *pVnode) {
if (pVnode->pBufPool) {
vmaDestroy(pVnode->pBufPool->inuse);
while (true) {
SVMemAllocator *pVMA = TD_DLIST_HEAD(&(pVnode->pBufPool->incycle));
if (pVMA == NULL) break;
tDListPop(&(pVnode->pBufPool->incycle), pVMA);
vmaDestroy(pVMA);
}
while (true) {
SVMemAllocator *pVMA = TD_DLIST_HEAD(&(pVnode->pBufPool->free));
if (pVMA == NULL) break;
tDListPop(&(pVnode->pBufPool->free), pVMA);
vmaDestroy(pVMA);
}
free(pVnode->pBufPool);
pVnode->pBufPool = NULL;
}
}
void *vnodeMalloc(SVnode *pVnode, uint64_t size) {
SVBufPool *pBufPool = pVnode->pBufPool;
if (pBufPool->inuse == NULL) {
while (true) {
// TODO: add sem_wait and sem_post
pBufPool->inuse = TD_DLIST_HEAD(&(pBufPool->free));
if (pBufPool->inuse) {
tDListPop(&(pBufPool->free), pBufPool->inuse);
break;
}
}
}
return vmaMalloc(pBufPool->inuse, size);
}
bool vnodeBufPoolIsFull(SVnode *pVnode) {
if (pVnode->pBufPool->inuse == NULL) return false;
return vmaIsFull(pVnode->pBufPool->inuse);
}
#if 0
typedef enum { typedef enum {
// Heap allocator // Heap allocator
E_V_HEAP_ALLOCATOR = 0, E_V_HEAP_ALLOCATOR = 0,
@ -57,15 +133,6 @@ typedef struct {
SListNode *pNode; SListNode *pNode;
} SVMAWrapper; } SVMAWrapper;
typedef struct {
T_REF_DECLARE()
uint64_t capacity;
EVMemAllocatorT type;
union {
SVHeapAllocator vha;
SVArenaAllocator vaa;
};
} SVMemAllocator;
static SListNode * vBufPoolNewNode(uint64_t capacity, EVMemAllocatorT type); static SListNode * vBufPoolNewNode(uint64_t capacity, EVMemAllocatorT type);
static void vBufPoolFreeNode(SListNode *pNode); static void vBufPoolFreeNode(SListNode *pNode);
@ -73,106 +140,13 @@ static SMemAllocator *vBufPoolCreateMA(SMemAllocatorFactory *pmaf);
static void vBufPoolDestroyMA(SMemAllocatorFactory *pmaf, SMemAllocator *pma); static void vBufPoolDestroyMA(SMemAllocatorFactory *pmaf, SMemAllocator *pma);
static void * vBufPoolMalloc(SVMemAllocator *pvma, uint64_t size); static void * vBufPoolMalloc(SVMemAllocator *pvma, uint64_t size);
int vnodeOpenBufPool(SVnode *pVnode) {
uint64_t capacity;
EVMemAllocatorT type = E_V_ARENA_ALLOCATOR;
if ((pVnode->pBufPool = (SVBufPool *)calloc(1, sizeof(SVBufPool))) == NULL) {
/* TODO */
return -1;
}
tdListInit(&(pVnode->pBufPool->free), 0);
tdListInit(&(pVnode->pBufPool->incycle), 0);
capacity = pVnode->config.wsize / VNODE_BUF_POOL_SHARDS;
if (pVnode->config.isHeapAllocator) {
type = E_V_HEAP_ALLOCATOR;
}
for (int i = 0; i < VNODE_BUF_POOL_SHARDS; i++) {
SListNode *pNode = vBufPoolNewNode(capacity, type);
if (pNode == NULL) {
vnodeCloseBufPool(pVnode);
return -1;
}
tdListAppendNode(&(pVnode->pBufPool->free), pNode);
}
pVnode->pBufPool->maf.impl = pVnode;
pVnode->pBufPool->maf.create = vBufPoolCreateMA;
pVnode->pBufPool->maf.destroy = vBufPoolDestroyMA;
return 0;
}
void vnodeCloseBufPool(SVnode *pVnode) {
SListNode *pNode;
if (pVnode->pBufPool) {
// Clear free list
while ((pNode = tdListPopHead(&(pVnode->pBufPool->free))) != NULL) {
vBufPoolFreeNode(pNode);
}
// Clear incycle list
while ((pNode = tdListPopHead(&(pVnode->pBufPool->incycle))) != NULL) {
vBufPoolFreeNode(pNode);
}
// Free inuse node
if (pVnode->pBufPool->inuse) {
vBufPoolFreeNode(pVnode->pBufPool->inuse);
}
free(pVnode->pBufPool);
pVnode->pBufPool = NULL;
}
}
void *vnodeMalloc(SVnode *pVnode, uint64_t size) {
void *ptr;
if (pVnode->pBufPool->inuse == NULL) {
SListNode *pNode;
while ((pNode = tdListPopHead(&(pVnode->pBufPool->free))) == NULL) {
// todo
// tsem_wait();
ASSERT(0);
}
pVnode->pBufPool->inuse = pNode;
}
SVMemAllocator *pvma = (SVMemAllocator *)(pVnode->pBufPool->inuse->data);
return vBufPoolMalloc(pvma, size);
}
/* ------------------------ STATIC METHODS ------------------------ */ /* ------------------------ STATIC METHODS ------------------------ */
static void vArenaAllocatorInit(SVArenaAllocator *pvaa, uint64_t capacity, uint64_t ssize, uint64_t lsize) { /* TODO */
pvaa->ssize = ssize;
pvaa->lsize = lsize;
pvaa->inuse = &pvaa->node;
pvaa->node.prev = NULL;
pvaa->node.size = capacity;
pvaa->node.ptr = pvaa->node.data;
}
static void vArenaAllocatorClear(SVArenaAllocator *pvaa) { /* TODO */
while (pvaa->inuse != &(pvaa->node)) {
SVArenaNode *pANode = pvaa->inuse;
pvaa->inuse = pANode->prev;
free(pANode);
}
}
static SListNode *vBufPoolNewNode(uint64_t capacity, EVMemAllocatorT type) { static SListNode *vBufPoolNewNode(uint64_t capacity, EVMemAllocatorT type) {
SListNode * pNode; SListNode * pNode;
SVMemAllocator *pvma; SVMemAllocator *pvma;
uint64_t msize; uint64_t msize;
uint64_t ssize = 0; // TODO uint64_t ssize = 4096; // TODO
uint64_t lsize = 0; // TODO uint64_t lsize = 1024; // TODO
msize = sizeof(SListNode) + sizeof(SVMemAllocator); msize = sizeof(SListNode) + sizeof(SVMemAllocator);
if (type == E_V_ARENA_ALLOCATOR) { if (type == E_V_ARENA_ALLOCATOR) {
@ -318,3 +292,4 @@ static void vBufPoolDestroyMA(SMemAllocatorFactory *pmaf, SMemAllocator *pma) {
// tsem_post(); todo: sem_post // tsem_post(); todo: sem_post
} }
} }
#endif

View File

@ -15,7 +15,7 @@
#include "vnodeDef.h" #include "vnodeDef.h"
const SVnodeCfg defaultVnodeOptions = {0}; /* TODO */ const SVnodeCfg defaultVnodeOptions = {.wsize = 16 * 1024 * 1024, .walCfg = {.level = TAOS_WAL_WRITE}}; /* TODO */
void vnodeOptionsInit(SVnodeCfg *pVnodeOptions) { /* TODO */ void vnodeOptionsInit(SVnodeCfg *pVnodeOptions) { /* TODO */
vnodeOptionsCopy(pVnodeOptions, &defaultVnodeOptions); vnodeOptionsCopy(pVnodeOptions, &defaultVnodeOptions);

View File

@ -18,8 +18,6 @@
static int vnodeStartCommit(SVnode *pVnode); static int vnodeStartCommit(SVnode *pVnode);
static int vnodeEndCommit(SVnode *pVnode); static int vnodeEndCommit(SVnode *pVnode);
bool vnodeShouldCommit(SVnode *pVnode) { return false; }
int vnodeAsyncCommit(SVnode *pVnode) { int vnodeAsyncCommit(SVnode *pVnode) {
#if 0 #if 0
if (vnodeStartCommit(pVnode) < 0) { if (vnodeStartCommit(pVnode) < 0) {

View File

@ -13,6 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "tmacro.h"
#include "vnodeDef.h" #include "vnodeDef.h"
static SVnode *vnodeNew(const char *path, const SVnodeCfg *pVnodeCfg); static SVnode *vnodeNew(const char *path, const SVnodeCfg *pVnodeCfg);
@ -20,8 +21,14 @@ static void vnodeFree(SVnode *pVnode);
static int vnodeOpenImpl(SVnode *pVnode); static int vnodeOpenImpl(SVnode *pVnode);
static void vnodeCloseImpl(SVnode *pVnode); static void vnodeCloseImpl(SVnode *pVnode);
TD_DEF_MOD_INIT_FLAG(vnode);
TD_DEF_MOD_CLEAR_FLAG(vnode);
int vnodeInit() { int vnodeInit() {
// TODO if (TD_CHECK_AND_SET_MODE_INIT(vnode) == TD_MOD_INITIALIZED) {
return 0;
}
if (walInit() < 0) { if (walInit() < 0) {
return -1; return -1;
} }
@ -30,6 +37,10 @@ int vnodeInit() {
} }
void vnodeClear() { void vnodeClear() {
if (TD_CHECK_AND_SET_MOD_CLEAR(vnode) == TD_MOD_CLEARD) {
return;
}
walCleanUp(); walCleanUp();
} }

View File

@ -1,113 +0,0 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "vnodeDef.h"
SMemAllocator *vnodeCreateMemAllocator(SVnode *pVnode) {
SMemAllocator *pma = NULL;
/* TODO */
return pma;
}
void vnodeDestroyMemAllocator(SMemAllocator *pma) {
// TODO
}
#if 0
#define VNODE_HEAP_ALLOCATOR 0
#define VNODE_ARENA_ALLOCATOR 1
typedef struct {
uint64_t tsize;
uint64_t used;
} SVHeapAllocator;
typedef struct SVArenaNode {
struct SVArenaNode *prev;
void * nptr;
char data[];
} SVArenaNode;
typedef struct {
SVArenaNode *inuse;
SVArenaNode node;
} SVArenaAllocator;
typedef struct {
int8_t type;
uint64_t tsize;
T_REF_DECLARE()
union {
SVHeapAllocator vha;
SVArenaAllocator vaa;
};
} SVMemAllocator;
SMemAllocator *vnodeCreateMemAllocator(int8_t type, uint64_t tsize, uint64_t ssize /* step size only for arena */) {
SMemAllocator * pma;
uint64_t msize;
SVMemAllocator *pva;
msize = sizeof(*pma) + sizeof(SVMemAllocator);
if (type == VNODE_ARENA_ALLOCATOR) {
msize += tsize;
}
pma = (SMemAllocator *)calloc(1, msize);
if (pma == NULL) {
return NULL;
}
pma->impl = POINTER_SHIFT(pma, sizeof(*pma));
pva = (SVMemAllocator *)(pma->impl);
pva->type = type;
pva->tsize = tsize;
if (type == VNODE_HEAP_ALLOCATOR) {
pma->malloc = NULL;
pma->calloc = NULL;
pma->realloc = NULL;
pma->free = NULL;
pma->usage = NULL;
} else if (type == VNODE_ARENA_ALLOCATOR) {
pma->malloc = NULL;
pma->calloc = NULL;
pma->realloc = NULL;
pma->free = NULL;
pma->usage = NULL;
} else {
ASSERT(0);
}
return pma;
}
void vnodeDestroyMemAllocator(SMemAllocator *pma) {
// TODO
}
void vnodeRefMemAllocator(SMemAllocator *pma) {
// TODO
}
void vnodeUnrefMemAllocator(SMemAllocator *pma) {
// TODO
}
/* ------------------------ Heap Allocator IMPL ------------------------ */
/* ------------------------ Arena Allocator IMPL ------------------------ */
#endif

View File

@ -5,3 +5,8 @@ target_sources(vnodeApiTests
"vnodeApiTests.cpp" "vnodeApiTests.cpp"
) )
target_link_libraries(vnodeApiTests vnode gtest gtest_main) target_link_libraries(vnodeApiTests vnode gtest gtest_main)
add_test(
NAME vnode_api_tests
COMMAND ${CMAKE_CURRENT_BINARY_DIR}/vnodeApiTests
)

View File

@ -1,3 +1,14 @@
/**
* @file vnodeApiTests.cpp
* @author hzcheng (hzcheng@taosdata.com)
* @brief VNODE module API tests
* @version 0.1
* @date 2021-12-13
*
* @copyright Copyright (c) 2021
*
*/
#include <gtest/gtest.h> #include <gtest/gtest.h>
#include <iostream> #include <iostream>

View File

@ -75,16 +75,7 @@ typedef struct {
SMemRowInfo *rowInfo; SMemRowInfo *rowInfo;
} SMemRowBuilder; } SMemRowBuilder;
typedef struct SParamInfo {
int32_t idx;
uint8_t type;
uint8_t timePrec;
int16_t bytes;
uint32_t offset;
} SParamInfo;
typedef struct STableDataBlocks { typedef struct STableDataBlocks {
SName tableName;
int8_t tsSource; // where does the UNIX timestamp come from, server or client int8_t tsSource; // where does the UNIX timestamp come from, server or client
bool ordered; // if current rows are ordered or not bool ordered; // if current rows are ordered or not
int64_t vgId; // virtual group id int64_t vgId; // virtual group id
@ -100,11 +91,6 @@ typedef struct STableDataBlocks {
STagData tagData; STagData tagData;
SParsedDataColInfo boundColumnInfo; SParsedDataColInfo boundColumnInfo;
// for parameter ('?') binding
uint32_t numOfAllocedParams;
uint32_t numOfParams;
SParamInfo * params;
SMemRowBuilder rowBuilder; SMemRowBuilder rowBuilder;
} STableDataBlocks; } STableDataBlocks;
@ -187,7 +173,7 @@ void destroyBoundColumnInfo(SParsedDataColInfo* pColList);
int32_t initMemRowBuilder(SMemRowBuilder *pBuilder, uint32_t nRows, uint32_t nCols, uint32_t nBoundCols, int32_t allNullLen); int32_t initMemRowBuilder(SMemRowBuilder *pBuilder, uint32_t nRows, uint32_t nCols, uint32_t nBoundCols, int32_t allNullLen);
int32_t allocateMemIfNeed(STableDataBlocks *pDataBlock, int32_t rowSize, int32_t * numOfRows); int32_t allocateMemIfNeed(STableDataBlocks *pDataBlock, int32_t rowSize, int32_t * numOfRows);
int32_t getDataBlockFromList(SHashObj* pHashList, int64_t id, int32_t size, int32_t startOffset, int32_t rowSize, int32_t getDataBlockFromList(SHashObj* pHashList, int64_t id, int32_t size, int32_t startOffset, int32_t rowSize,
SName* name, const STableMeta* pTableMeta, STableDataBlocks** dataBlocks, SArray* pBlockList); const STableMeta* pTableMeta, STableDataBlocks** dataBlocks, SArray* pBlockList);
int32_t mergeTableDataBlocks(SHashObj* pHashObj, int8_t schemaAttached, uint8_t payloadType, bool freeBlockMap); int32_t mergeTableDataBlocks(SHashObj* pHashObj, int8_t schemaAttached, uint8_t payloadType, bool freeBlockMap);
#endif // TDENGINE_DATABLOCKMGT_H #endif // TDENGINE_DATABLOCKMGT_H

View File

@ -108,7 +108,7 @@ void destroyBoundColumnInfo(SParsedDataColInfo* pColList) {
tfree(pColList->colIdxInfo); tfree(pColList->colIdxInfo);
} }
static int32_t createDataBlock(size_t defaultSize, int32_t rowSize, int32_t startOffset, SName* name, static int32_t createDataBlock(size_t defaultSize, int32_t rowSize, int32_t startOffset,
const STableMeta* pTableMeta, STableDataBlocks** dataBlocks) { const STableMeta* pTableMeta, STableDataBlocks** dataBlocks) {
STableDataBlocks* dataBuf = (STableDataBlocks*)calloc(1, sizeof(STableDataBlocks)); STableDataBlocks* dataBuf = (STableDataBlocks*)calloc(1, sizeof(STableDataBlocks));
if (dataBuf == NULL) { if (dataBuf == NULL) {
@ -145,7 +145,7 @@ static int32_t createDataBlock(size_t defaultSize, int32_t rowSize, int32_t star
dataBuf->tsSource = -1; dataBuf->tsSource = -1;
dataBuf->vgId = dataBuf->pTableMeta->vgId; dataBuf->vgId = dataBuf->pTableMeta->vgId;
tNameAssign(&dataBuf->tableName, name); // tNameAssign(&dataBuf->tableName, name);
assert(defaultSize > 0 && pTableMeta != NULL && dataBuf->pTableMeta != NULL); assert(defaultSize > 0 && pTableMeta != NULL && dataBuf->pTableMeta != NULL);
@ -154,8 +154,7 @@ static int32_t createDataBlock(size_t defaultSize, int32_t rowSize, int32_t star
} }
int32_t getDataBlockFromList(SHashObj* pHashList, int64_t id, int32_t size, int32_t startOffset, int32_t rowSize, int32_t getDataBlockFromList(SHashObj* pHashList, int64_t id, int32_t size, int32_t startOffset, int32_t rowSize,
SName* name, const STableMeta* pTableMeta, STableDataBlocks** dataBlocks, const STableMeta* pTableMeta, STableDataBlocks** dataBlocks, SArray* pBlockList) {
SArray* pBlockList) {
*dataBlocks = NULL; *dataBlocks = NULL;
STableDataBlocks** t1 = (STableDataBlocks**)taosHashGet(pHashList, (const char*)&id, sizeof(id)); STableDataBlocks** t1 = (STableDataBlocks**)taosHashGet(pHashList, (const char*)&id, sizeof(id));
if (t1 != NULL) { if (t1 != NULL) {
@ -163,7 +162,7 @@ int32_t getDataBlockFromList(SHashObj* pHashList, int64_t id, int32_t size, int3
} }
if (*dataBlocks == NULL) { if (*dataBlocks == NULL) {
int32_t ret = createDataBlock((size_t)size, rowSize, startOffset, name, pTableMeta, dataBlocks); int32_t ret = createDataBlock((size_t)size, rowSize, startOffset, pTableMeta, dataBlocks);
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
return ret; return ret;
} }
@ -253,23 +252,13 @@ static FORCE_INLINE void convertSMemRow(SMemRow dest, SMemRow src, STableDataBlo
} }
} }
void destroyDataBlock(STableDataBlocks* pDataBlock, bool removeMeta) { void destroyDataBlock(STableDataBlocks* pDataBlock) {
if (pDataBlock == NULL) { if (pDataBlock == NULL) {
return; return;
} }
tfree(pDataBlock->pData); tfree(pDataBlock->pData);
if (removeMeta) {
char name[TSDB_TABLE_FNAME_LEN] = {0};
tNameExtractFullName(&pDataBlock->tableName, name);
// taosHashRemove(tscTableMetaMap, name, strnlen(name, TSDB_TABLE_FNAME_LEN));
}
if (!pDataBlock->cloned) { if (!pDataBlock->cloned) {
tfree(pDataBlock->params);
// free the refcount for metermeta // free the refcount for metermeta
if (pDataBlock->pTableMeta != NULL) { if (pDataBlock->pTableMeta != NULL) {
tfree(pDataBlock->pTableMeta); tfree(pDataBlock->pTableMeta);
@ -277,7 +266,6 @@ void destroyDataBlock(STableDataBlocks* pDataBlock, bool removeMeta) {
destroyBoundColumnInfo(&pDataBlock->boundColumnInfo); destroyBoundColumnInfo(&pDataBlock->boundColumnInfo);
} }
tfree(pDataBlock); tfree(pDataBlock);
} }
@ -289,7 +277,7 @@ void* destroyBlockArrayList(SArray* pDataBlockList) {
size_t size = taosArrayGetSize(pDataBlockList); size_t size = taosArrayGetSize(pDataBlockList);
for (int32_t i = 0; i < size; i++) { for (int32_t i = 0; i < size; i++) {
void* d = taosArrayGetP(pDataBlockList, i); void* d = taosArrayGetP(pDataBlockList, i);
destroyDataBlock(d, false); destroyDataBlock(d);
} }
taosArrayDestroy(pDataBlockList); taosArrayDestroy(pDataBlockList);
@ -505,7 +493,7 @@ int32_t mergeTableDataBlocks(SHashObj* pHashObj, int8_t schemaAttached, uint8_t
if (pBlocks->numOfRows > 0) { if (pBlocks->numOfRows > 0) {
STableDataBlocks* dataBuf = NULL; STableDataBlocks* dataBuf = NULL;
int32_t ret = getDataBlockFromList(pVnodeDataBlockHashList, pOneTableBlock->vgId, TSDB_PAYLOAD_SIZE, int32_t ret = getDataBlockFromList(pVnodeDataBlockHashList, pOneTableBlock->vgId, TSDB_PAYLOAD_SIZE,
INSERT_HEAD_SIZE, 0, &pOneTableBlock->tableName, pOneTableBlock->pTableMeta, &dataBuf, pVnodeDataBlockList); INSERT_HEAD_SIZE, 0, pOneTableBlock->pTableMeta, &dataBuf, pVnodeDataBlockList);
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
taosHashCleanup(pVnodeDataBlockHashList); taosHashCleanup(pVnodeDataBlockHashList);
destroyBlockArrayList(pVnodeDataBlockList); destroyBlockArrayList(pVnodeDataBlockList);
@ -580,7 +568,6 @@ int32_t mergeTableDataBlocks(SHashObj* pHashObj, int8_t schemaAttached, uint8_t
extractTableNameList(pHashObj, freeBlockMap); extractTableNameList(pHashObj, freeBlockMap);
// free the table data blocks; // free the table data blocks;
// pInsertParam->pDataBlocks = pVnodeDataBlockList;
taosHashCleanup(pVnodeDataBlockHashList); taosHashCleanup(pVnodeDataBlockHashList);
tfree(blkKeyInfo.pKeyTuple); tfree(blkKeyInfo.pKeyTuple);

View File

@ -167,11 +167,12 @@ static int32_t skipInsertInto(SInsertParseContext* pCxt) {
static int32_t buildTableName(SInsertParseContext* pCxt, SToken* pStname, SArray* tableNameList) { static int32_t buildTableName(SInsertParseContext* pCxt, SToken* pStname, SArray* tableNameList) {
if (parserValidateIdToken(pStname) != TSDB_CODE_SUCCESS) { if (parserValidateIdToken(pStname) != TSDB_CODE_SUCCESS) {
return buildInvalidOperationMsg(&pCxt->msg, "invalid table name"); return buildSyntaxErrMsg(&pCxt->msg, "invalid table name", pStname->z);
} }
SName name = {0}; SName name = {0};
strndequote(name.tname, pStname->z, pStname->n); strcpy(name.dbname, pCxt->pComCxt->pDbname);
strncpy(name.tname, pStname->z, pStname->n);
taosArrayPush(tableNameList, &name); taosArrayPush(tableNameList, &name);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -686,7 +687,6 @@ static int parseOneRow(SInsertParseContext* pCxt, STableDataBlocks* pDataBlocks,
// 1. set the parsed value from sql string // 1. set the parsed value from sql string
for (int i = 0; i < spd->numOfBound; ++i) { for (int i = 0; i < spd->numOfBound; ++i) {
NEXT_TOKEN(pCxt->pSql, sToken); NEXT_TOKEN(pCxt->pSql, sToken);
// todo bind param
SSchema *pSchema = &schema[spd->boundedColumns[i]]; SSchema *pSchema = &schema[spd->boundedColumns[i]];
param.schema = pSchema; param.schema = pSchema;
param.compareStat = pBuilder->compareStat; param.compareStat = pBuilder->compareStat;
@ -770,14 +770,6 @@ static int32_t parseValuesClause(SInsertParseContext* pCxt, STableDataBlocks* da
int32_t numOfRows = 0; int32_t numOfRows = 0;
CHECK_CODE(parseValues(pCxt, dataBuf, maxNumOfRows, &numOfRows)); CHECK_CODE(parseValues(pCxt, dataBuf, maxNumOfRows, &numOfRows));
for (uint32_t i = 0; i < dataBuf->numOfParams; ++i) {
SParamInfo *param = dataBuf->params + i;
if (param->idx == -1) {
// param->idx = pInsertParam->numOfParams++;
param->offset -= sizeof(SSubmitBlk);
}
}
SSubmitBlk *pBlocks = (SSubmitBlk *)(dataBuf->pData); SSubmitBlk *pBlocks = (SSubmitBlk *)(dataBuf->pData);
if (TSDB_CODE_SUCCESS != setBlockInfo(pBlocks, dataBuf->pTableMeta, numOfRows)) { if (TSDB_CODE_SUCCESS != setBlockInfo(pBlocks, dataBuf->pTableMeta, numOfRows)) {
return buildInvalidOperationMsg(&pCxt->msg, "too many rows in sql, total number of rows should be less than 32767"); return buildInvalidOperationMsg(&pCxt->msg, "too many rows in sql, total number of rows should be less than 32767");
@ -815,12 +807,12 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) {
CHECK_CODE(parseUsingClause(pCxt, &tbnameToken)); CHECK_CODE(parseUsingClause(pCxt, &tbnameToken));
NEXT_TOKEN(pCxt->pSql, sToken); NEXT_TOKEN(pCxt->pSql, sToken);
} else { } else {
CHECK_CODE(getTableMeta(pCxt, &sToken)); CHECK_CODE(getTableMeta(pCxt, &tbnameToken));
} }
STableDataBlocks *dataBuf = NULL; STableDataBlocks *dataBuf = NULL;
CHECK_CODE(getDataBlockFromList(pCxt->pTableBlockHashObj, pCxt->pTableMeta->uid, TSDB_DEFAULT_PAYLOAD_SIZE, CHECK_CODE(getDataBlockFromList(pCxt->pTableBlockHashObj, pCxt->pTableMeta->uid, TSDB_DEFAULT_PAYLOAD_SIZE,
sizeof(SSubmitBlk), getTableInfo(pCxt->pTableMeta).rowSize, NULL/* tbname */, pCxt->pTableMeta, &dataBuf, NULL)); sizeof(SSubmitBlk), getTableInfo(pCxt->pTableMeta).rowSize, pCxt->pTableMeta, &dataBuf, NULL));
if (TK_LP == sToken.type) { if (TK_LP == sToken.type) {
// pSql -> field1_name, ...) // pSql -> field1_name, ...)
@ -831,6 +823,7 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) {
if (TK_VALUES == sToken.type) { if (TK_VALUES == sToken.type) {
// pSql -> (field1_value, ...) [(field1_value2, ...) ...] // pSql -> (field1_value, ...) [(field1_value2, ...) ...]
CHECK_CODE(parseValuesClause(pCxt, dataBuf)); CHECK_CODE(parseValuesClause(pCxt, dataBuf));
pCxt->pOutput->insertType = TSDB_QUERY_TYPE_INSERT;
continue; continue;
} }
@ -842,6 +835,7 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) {
return buildSyntaxErrMsg(&pCxt->msg, "file path is required following keyword FILE", sToken.z); return buildSyntaxErrMsg(&pCxt->msg, "file path is required following keyword FILE", sToken.z);
} }
// todo // todo
pCxt->pOutput->insertType = TSDB_QUERY_TYPE_FILE_INSERT;
continue; continue;
} }

View File

@ -36,51 +36,67 @@ namespace {
// [...]; // [...];
class InsertTest : public Test { class InsertTest : public Test {
protected: protected:
void setDatabase(const string& db) {
db_ = db;
}
void bind(const char* sql) { void bind(const char* sql) {
reset(); reset();
cxt.pSql = sql; cxt_.sqlLen = strlen(sql);
cxt.sqlLen = strlen(sql); strcpy(sqlBuf_, sql);
sqlBuf_[cxt_.sqlLen] = '\0';
cxt_.pSql = sqlBuf_;
cxt_.pDbname = db_.c_str();
} }
int32_t run() { int32_t run() {
code = parseInsertSql(&cxt, &res); code_ = parseInsertSql(&cxt_, &res_);
if (code != TSDB_CODE_SUCCESS) { if (code_ != TSDB_CODE_SUCCESS) {
cout << "code:" << toString(code) << ", msg:" << errMagBuf << endl; cout << "code:" << toString(code_) << ", msg:" << errMagBuf_ << endl;
} }
return code; return code_;
} }
SInsertStmtInfo* reslut() { SInsertStmtInfo* reslut() {
return res; return res_;
} }
private: private:
static const int max_err_len = 1024; static const int max_err_len = 1024;
static const int max_sql_len = 1024 * 1024;
void reset() { void reset() {
memset(&cxt, 0, sizeof(cxt)); memset(&cxt_, 0, sizeof(cxt_));
memset(errMagBuf, 0, max_err_len); memset(errMagBuf_, 0, max_err_len);
cxt.pMsg = errMagBuf; cxt_.pMsg = errMagBuf_;
cxt.msgLen = max_err_len; cxt_.msgLen = max_err_len;
code = TSDB_CODE_SUCCESS; code_ = TSDB_CODE_SUCCESS;
res = nullptr; res_ = nullptr;
} }
char errMagBuf[max_err_len]; string db_;
SParseContext cxt; char errMagBuf_[max_err_len];
int32_t code; char sqlBuf_[max_sql_len];
SInsertStmtInfo* res; SParseContext cxt_;
int32_t code_;
SInsertStmtInfo* res_;
}; };
// INSERT INTO tb_name VALUES (field1_value, ...) // INSERT INTO tb_name VALUES (field1_value, ...)
TEST_F(InsertTest, simpleTest) { TEST_F(InsertTest, simpleTest) {
bind("insert into .. values (...)"); setDatabase("test");
bind("insert into t1 values (now, 1, \"wxy\")");
ASSERT_EQ(run(), TSDB_CODE_SUCCESS); ASSERT_EQ(run(), TSDB_CODE_SUCCESS);
SInsertStmtInfo* res = reslut(); SInsertStmtInfo* res = reslut();
// todo check // todo check
ASSERT_EQ(res->insertType, TSDB_QUERY_TYPE_INSERT);
// ASSERT_EQ(taosArrayGetSize(res->pDataBlocks), 1);
} }
TEST_F(InsertTest, toleranceTest) { TEST_F(InsertTest, toleranceTest) {
setDatabase("test");
bind("insert into"); bind("insert into");
ASSERT_NE(run(), TSDB_CODE_SUCCESS); ASSERT_NE(run(), TSDB_CODE_SUCCESS);
bind("insert into t"); bind("insert into t");

View File

@ -17,29 +17,30 @@
#include <iostream> #include <iostream>
namespace {
void generateTestT1(MockCatalogService* mcs) {
ITableBuilder& builder = mcs->createTableBuilder("test", "t1", TSDB_NORMAL_TABLE, 3)
.setPrecision(TSDB_TIME_PRECISION_MILLI).setVgid(1).addColumn("ts", TSDB_DATA_TYPE_TIMESTAMP)
.addColumn("c1", TSDB_DATA_TYPE_INT).addColumn("c2", TSDB_DATA_TYPE_BINARY, 10);
builder.done();
}
void generateTestST1(MockCatalogService* mcs) {
ITableBuilder& builder = mcs->createTableBuilder("test", "st1", TSDB_SUPER_TABLE, 3, 2)
.setPrecision(TSDB_TIME_PRECISION_MILLI).addColumn("ts", TSDB_DATA_TYPE_TIMESTAMP)
.addTag("tag1", TSDB_DATA_TYPE_INT).addTag("tag2", TSDB_DATA_TYPE_BINARY, 10)
.addColumn("c1", TSDB_DATA_TYPE_INT).addColumn("c2", TSDB_DATA_TYPE_BINARY, 10);
builder.done();
mcs->createSubTable("test", "st1", "st1s1", 1);
mcs->createSubTable("test", "st1", "st1s2", 2);
}
}
void generateMetaData(MockCatalogService* mcs) { void generateMetaData(MockCatalogService* mcs) {
{ generateTestT1(mcs);
ITableBuilder& builder = mcs->createTableBuilder("test", "t1", TSDB_NORMAL_TABLE, MockCatalogService::numOfDataTypes) generateTestST1(mcs);
.setPrecision(TSDB_TIME_PRECISION_MILLI).setVgid(1).addColumn("ts", TSDB_DATA_TYPE_TIMESTAMP);
for (int32_t i = 0; i < MockCatalogService::numOfDataTypes; ++i) {
if (TSDB_DATA_TYPE_NULL == tDataTypes[i].type) {
continue;
}
builder = builder.addColumn("c" + std::to_string(i + 1), tDataTypes[i].type);
}
builder.done();
}
{
ITableBuilder& builder = mcs->createTableBuilder("test", "st1", TSDB_SUPER_TABLE, MockCatalogService::numOfDataTypes, 2)
.setPrecision(TSDB_TIME_PRECISION_MILLI).setVgid(2).addColumn("ts", TSDB_DATA_TYPE_TIMESTAMP);
for (int32_t i = 0; i < MockCatalogService::numOfDataTypes; ++i) {
if (TSDB_DATA_TYPE_NULL == tDataTypes[i].type) {
continue;
}
builder = builder.addColumn("c" + std::to_string(i + 1), tDataTypes[i].type);
}
builder.done();
}
mcs->showTables(); mcs->showTables();
} }

View File

@ -19,6 +19,7 @@
#include <iostream> #include <iostream>
#include <map> #include <map>
#include "tname.h"
#include "ttypes.h" #include "ttypes.h"
std::unique_ptr<MockCatalogService> mockCatalogService; std::unique_ptr<MockCatalogService> mockCatalogService;
@ -82,12 +83,26 @@ public:
MockCatalogServiceImpl() { MockCatalogServiceImpl() {
} }
struct SCatalog* getCatalogHandle(const SEpSet* pMgmtEps) { struct SCatalog* getCatalogHandle(const SEpSet* pMgmtEps) const {
return (struct SCatalog*)0x01; return (struct SCatalog*)0x01;
} }
int32_t catalogGetMetaData(struct SCatalog* pCatalog, const SMetaReq* pMetaReq, SMetaData* pMetaData) { int32_t catalogGetMetaData(struct SCatalog* pCatalog, const SMetaReq* pMetaReq, SMetaData* pMetaData) const {
return 0; assert(nullptr != pMetaReq && 1 == taosArrayGetSize(pMetaReq->pTableName));
SName* fullName = (SName*)taosArrayGet(pMetaReq->pTableName, 0);
std::unique_ptr<STableMeta> table;
int32_t code = copyTableMeta(fullName->dbname, fullName->tname, &table);
if (TSDB_CODE_SUCCESS != code) {
return code;
}
std::unique_ptr<SArray> tables((SArray*)taosArrayInit(1, sizeof(STableMeta*)));
if (!tables) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
STableMeta* elem = table.release();
taosArrayPush(tables.get(), &elem);
pMetaData->pTableMeta = tables.release();
return TSDB_CODE_SUCCESS;
} }
TableBuilder& createTableBuilder(const std::string& db, const std::string& tbname, int8_t tableType, int32_t numOfColumns, int32_t numOfTags) { TableBuilder& createTableBuilder(const std::string& db, const std::string& tbname, int8_t tableType, int32_t numOfColumns, int32_t numOfTags) {
@ -97,6 +112,15 @@ public:
return *(builder_.get()); return *(builder_.get());
} }
void createSubTable(const std::string& db, const std::string& stbname, const std::string& tbname, int16_t vgid) {
std::unique_ptr<STableMeta> table;
if (TSDB_CODE_SUCCESS != copyTableMeta(db, stbname, &table)) {
throw std::runtime_error("copyTableMeta failed");
}
meta_[db][tbname].reset(table.release());
meta_[db][tbname]->uid = id_++;
}
void showTables() const { void showTables() const {
// number of forward fills // number of forward fills
#define NOF(n) ((n) / 2) #define NOF(n) ((n) / 2)
@ -120,20 +144,35 @@ public:
#define SL(sn, in) std::setfill('=') << std::setw((sn) * (SFL + 1) + (in) * (IFL + 1)) << "" << std::setfill(' ') #define SL(sn, in) std::setfill('=') << std::setw((sn) * (SFL + 1) + (in) * (IFL + 1)) << "" << std::setfill(' ')
for (const auto& db : meta_) { for (const auto& db : meta_) {
std::cout << SH("Database") << SH("Table") << SH("Type") << SH("Precision") << IH(std::string("Vgid")) << std::endl; std::cout << "Databse:" << db.first << std::endl;
std::cout << SL(4, 1) << std::endl; std::cout << SH("Table") << SH("Type") << SH("Precision") << IH("Vgid") << std::endl;
std::cout << SL(3, 1) << std::endl;
for (const auto& table : db.second) { for (const auto& table : db.second) {
std::cout << SF(db.first) << SF(table.first) << SF(ttToString(table.second->tableType)) << SF(pToString(table.second->tableInfo.precision)) << IF(table.second->vgId) << std::endl; std::cout << SF(table.first) << SF(ttToString(table.second->tableType)) << SF(pToString(table.second->tableInfo.precision)) << IF(table.second->vgId) << std::endl;
// int16_t numOfFields = table.second->tableInfo.numOfTags + table.second->tableInfo.numOfColumns; }
// for (int16_t i = 0; i < numOfFields; ++i) { std::cout << std::endl;
// const SSchema* schema = table.second->schema + i; }
// std::cout << schema->name << " " << schema->type << " " << schema->bytes << std::endl;
// } for (const auto& db : meta_) {
for (const auto& table : db.second) {
std::cout << "Table:" << table.first << std::endl;
std::cout << SH("Field") << SH("Type") << SH("DataType") << IH("Bytes") << std::endl;
std::cout << SL(3, 1) << std::endl;
int16_t numOfTags = table.second->tableInfo.numOfTags;
int16_t numOfFields = numOfTags + table.second->tableInfo.numOfColumns;
for (int16_t i = 0; i < numOfFields; ++i) {
const SSchema* schema = table.second->schema + i;
std::cout << SF(std::string(schema->name)) << SH(ftToString(i, numOfTags)) << SH(dtToString(schema->type)) << IF(schema->bytes) << std::endl;
}
std::cout << std::endl;
} }
} }
} }
private: private:
typedef std::map<std::string, std::shared_ptr<STableMeta> > TableMetaCache;
typedef std::map<std::string, TableMetaCache> DbMetaCache;
std::string ttToString(int8_t tableType) const { std::string ttToString(int8_t tableType) const {
switch (tableType) { switch (tableType) {
case TSDB_SUPER_TABLE: case TSDB_SUPER_TABLE:
@ -160,9 +199,43 @@ private:
} }
} }
std::string dtToString(int8_t type) const {
return tDataTypes[type].name;
}
std::string ftToString(int16_t colid, int16_t numOfTags) const {
return (0 == colid ? "column" : (colid <= numOfTags ? "tag" : "column"));
}
std::shared_ptr<STableMeta> getTableMeta(const std::string& db, const std::string& tbname) const {
DbMetaCache::const_iterator it = meta_.find(db);
if (meta_.end() == it) {
return std::shared_ptr<STableMeta>();
}
TableMetaCache::const_iterator tit = it->second.find(tbname);
if (it->second.end() == tit) {
return std::shared_ptr<STableMeta>();
}
return tit->second;
}
int32_t copyTableMeta(const std::string& db, const std::string& tbname, std::unique_ptr<STableMeta>* dst) const {
std::shared_ptr<STableMeta> src = getTableMeta(db, tbname);
if (!src) {
return TSDB_CODE_TSC_INVALID_TABLE_NAME;
}
int32_t len = sizeof(STableMeta) + sizeof(SSchema) * (src->tableInfo.numOfTags + src->tableInfo.numOfColumns);
dst->reset((STableMeta*)std::calloc(1, len));
if (!dst) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
memcpy(dst->get(), src.get(), len);
return TSDB_CODE_SUCCESS;
}
uint64_t id_; uint64_t id_;
std::unique_ptr<TableBuilder> builder_; std::unique_ptr<TableBuilder> builder_;
std::map<std::string, std::map<std::string, std::shared_ptr<STableMeta> > > meta_; DbMetaCache meta_;
}; };
MockCatalogService::MockCatalogService() : impl_(new MockCatalogServiceImpl()) { MockCatalogService::MockCatalogService() : impl_(new MockCatalogServiceImpl()) {
@ -171,11 +244,11 @@ MockCatalogService::MockCatalogService() : impl_(new MockCatalogServiceImpl()) {
MockCatalogService::~MockCatalogService() { MockCatalogService::~MockCatalogService() {
} }
struct SCatalog* MockCatalogService::getCatalogHandle(const SEpSet* pMgmtEps) { struct SCatalog* MockCatalogService::getCatalogHandle(const SEpSet* pMgmtEps) const {
return impl_->getCatalogHandle(pMgmtEps); return impl_->getCatalogHandle(pMgmtEps);
} }
int32_t MockCatalogService::catalogGetMetaData(struct SCatalog* pCatalog, const SMetaReq* pMetaReq, SMetaData* pMetaData) { int32_t MockCatalogService::catalogGetMetaData(struct SCatalog* pCatalog, const SMetaReq* pMetaReq, SMetaData* pMetaData) const {
return impl_->catalogGetMetaData(pCatalog, pMetaReq, pMetaData); return impl_->catalogGetMetaData(pCatalog, pMetaReq, pMetaData);
} }
@ -183,6 +256,10 @@ ITableBuilder& MockCatalogService::createTableBuilder(const std::string& db, con
return impl_->createTableBuilder(db, tbname, tableType, numOfColumns, numOfTags); return impl_->createTableBuilder(db, tbname, tableType, numOfColumns, numOfTags);
} }
void MockCatalogService::createSubTable(const std::string& db, const std::string& stbname, const std::string& tbname, int16_t vgid) {
impl_->createSubTable(db, stbname, tbname, vgid);
}
void MockCatalogService::showTables() const { void MockCatalogService::showTables() const {
impl_->showTables(); impl_->showTables();
} }

View File

@ -49,9 +49,10 @@ public:
MockCatalogService(); MockCatalogService();
~MockCatalogService(); ~MockCatalogService();
struct SCatalog* getCatalogHandle(const SEpSet* pMgmtEps); struct SCatalog* getCatalogHandle(const SEpSet* pMgmtEps) const;
int32_t catalogGetMetaData(struct SCatalog* pCatalog, const SMetaReq* pMetaReq, SMetaData* pMetaData); int32_t catalogGetMetaData(struct SCatalog* pCatalog, const SMetaReq* pMetaReq, SMetaData* pMetaData) const;
ITableBuilder& createTableBuilder(const std::string& db, const std::string& tbname, int8_t tableType, int32_t numOfColumns, int32_t numOfTags = 0); ITableBuilder& createTableBuilder(const std::string& db, const std::string& tbname, int8_t tableType, int32_t numOfColumns, int32_t numOfTags = 0);
void createSubTable(const std::string& db, const std::string& stbname, const std::string& tbname, int16_t vgid);
void showTables() const; void showTables() const;
private: private:

View File

@ -33,10 +33,12 @@ typedef struct WalFileInfo {
int64_t fileSize; int64_t fileSize;
} WalFileInfo; } WalFileInfo;
#pragma pack(push,1)
typedef struct WalIdxEntry { typedef struct WalIdxEntry {
int64_t ver; int64_t ver;
int64_t offset; int64_t offset;
} WalIdxEntry; } WalIdxEntry;
#pragma pack(pop)
static inline int32_t compareWalFileInfo(const void* pLeft, const void* pRight) { static inline int32_t compareWalFileInfo(const void* pLeft, const void* pRight) {
WalFileInfo* pInfoLeft = (WalFileInfo*)pLeft; WalFileInfo* pInfoLeft = (WalFileInfo*)pLeft;
@ -78,11 +80,11 @@ static inline WalFileInfo* walGetCurFileInfo(SWal* pWal) {
} }
static inline int walBuildLogName(SWal*pWal, int64_t fileFirstVer, char* buf) { static inline int walBuildLogName(SWal*pWal, int64_t fileFirstVer, char* buf) {
return sprintf(buf, "%s/%" PRId64 "." WAL_LOG_SUFFIX, pWal->path, fileFirstVer); return sprintf(buf, "%s/%020" PRId64 "." WAL_LOG_SUFFIX, pWal->path, fileFirstVer);
} }
static inline int walBuildIdxName(SWal*pWal, int64_t fileFirstVer, char* buf) { static inline int walBuildIdxName(SWal*pWal, int64_t fileFirstVer, char* buf) {
return sprintf(buf, "%s/%" PRId64 "." WAL_INDEX_SUFFIX, pWal->path, fileFirstVer); return sprintf(buf, "%s/%020" PRId64 "." WAL_INDEX_SUFFIX, pWal->path, fileFirstVer);
} }
static inline int walValidHeadCksum(SWalHead* pHead) { static inline int walValidHeadCksum(SWalHead* pHead) {
@ -90,7 +92,7 @@ static inline int walValidHeadCksum(SWalHead* pHead) {
} }
static inline int walValidBodyCksum(SWalHead* pHead) { static inline int walValidBodyCksum(SWalHead* pHead) {
return taosCheckChecksum((uint8_t*)pHead->head.cont, pHead->head.len, pHead->cksumBody); return taosCheckChecksum((uint8_t*)pHead->head.body, pHead->head.len, pHead->cksumBody);
} }
static inline int walValidCksum(SWalHead *pHead, void* body, int64_t bodyLen) { static inline int walValidCksum(SWalHead *pHead, void* body, int64_t bodyLen) {

View File

@ -25,15 +25,15 @@
#include <regex.h> #include <regex.h>
int64_t walGetFirstVer(SWal *pWal) { int64_t walGetFirstVer(SWal *pWal) {
return pWal->firstVersion; return pWal->vers.firstVer;
} }
int64_t walGetSnaphostVer(SWal *pWal) { int64_t walGetSnaphostVer(SWal *pWal) {
return pWal->snapshotVersion; return pWal->vers.snapshotVer;
} }
int64_t walGetLastVer(SWal *pWal) { int64_t walGetLastVer(SWal *pWal) {
return pWal->lastVersion; return pWal->vers.lastVer;
} }
int walRollFileInfo(SWal* pWal) { int walRollFileInfo(SWal* pWal) {
@ -42,7 +42,7 @@ int walRollFileInfo(SWal* pWal) {
SArray* pArray = pWal->fileInfoSet; SArray* pArray = pWal->fileInfoSet;
if(taosArrayGetSize(pArray) != 0) { if(taosArrayGetSize(pArray) != 0) {
WalFileInfo *pInfo = taosArrayGetLast(pArray); WalFileInfo *pInfo = taosArrayGetLast(pArray);
pInfo->lastVer = pWal->lastVersion; pInfo->lastVer = pWal->vers.lastVer;
pInfo->closeTs = ts; pInfo->closeTs = ts;
} }
@ -51,7 +51,7 @@ int walRollFileInfo(SWal* pWal) {
if(pNewInfo == NULL) { if(pNewInfo == NULL) {
return -1; return -1;
} }
pNewInfo->firstVer = pWal->lastVersion + 1; pNewInfo->firstVer = pWal->vers.lastVer + 1;
pNewInfo->lastVer = -1; pNewInfo->lastVer = -1;
pNewInfo->createTs = ts; pNewInfo->createTs = ts;
pNewInfo->closeTs = -1; pNewInfo->closeTs = -1;
@ -74,13 +74,13 @@ char* walMetaSerialize(SWal* pWal) {
return NULL; return NULL;
} }
cJSON_AddItemToObject(pRoot, "meta", pMeta); cJSON_AddItemToObject(pRoot, "meta", pMeta);
sprintf(buf, "%" PRId64, pWal->firstVersion); sprintf(buf, "%" PRId64, pWal->vers.firstVer);
cJSON_AddStringToObject(pMeta, "firstVer", buf); cJSON_AddStringToObject(pMeta, "firstVer", buf);
sprintf(buf, "%" PRId64, pWal->snapshotVersion); sprintf(buf, "%" PRId64, pWal->vers.snapshotVer);
cJSON_AddStringToObject(pMeta, "snapshotVer", buf); cJSON_AddStringToObject(pMeta, "snapshotVer", buf);
sprintf(buf, "%" PRId64, pWal->commitVersion); sprintf(buf, "%" PRId64, pWal->vers.commitVer);
cJSON_AddStringToObject(pMeta, "commitVer", buf); cJSON_AddStringToObject(pMeta, "commitVer", buf);
sprintf(buf, "%" PRId64, pWal->lastVersion); sprintf(buf, "%" PRId64, pWal->vers.lastVer);
cJSON_AddStringToObject(pMeta, "lastVer", buf); cJSON_AddStringToObject(pMeta, "lastVer", buf);
cJSON_AddItemToObject(pRoot, "files", pFiles); cJSON_AddItemToObject(pRoot, "files", pFiles);
@ -116,13 +116,13 @@ int walMetaDeserialize(SWal* pWal, const char* bytes) {
pRoot = cJSON_Parse(bytes); pRoot = cJSON_Parse(bytes);
pMeta = cJSON_GetObjectItem(pRoot, "meta"); pMeta = cJSON_GetObjectItem(pRoot, "meta");
pField = cJSON_GetObjectItem(pMeta, "firstVer"); pField = cJSON_GetObjectItem(pMeta, "firstVer");
pWal->firstVersion = atoll(cJSON_GetStringValue(pField)); pWal->vers.firstVer = atoll(cJSON_GetStringValue(pField));
pField = cJSON_GetObjectItem(pMeta, "snapshotVer"); pField = cJSON_GetObjectItem(pMeta, "snapshotVer");
pWal->snapshotVersion = atoll(cJSON_GetStringValue(pField)); pWal->vers.snapshotVer = atoll(cJSON_GetStringValue(pField));
pField = cJSON_GetObjectItem(pMeta, "commitVer"); pField = cJSON_GetObjectItem(pMeta, "commitVer");
pWal->commitVersion = atoll(cJSON_GetStringValue(pField)); pWal->vers.commitVer = atoll(cJSON_GetStringValue(pField));
pField = cJSON_GetObjectItem(pMeta, "lastVer"); pField = cJSON_GetObjectItem(pMeta, "lastVer");
pWal->lastVersion = atoll(cJSON_GetStringValue(pField)); pWal->vers.lastVer = atoll(cJSON_GetStringValue(pField));
pFiles = cJSON_GetObjectItem(pRoot, "files"); pFiles = cJSON_GetObjectItem(pRoot, "files");
int sz = cJSON_GetArraySize(pFiles); int sz = cJSON_GetArraySize(pFiles);
@ -161,7 +161,7 @@ static int walFindCurMetaVer(SWal* pWal) {
DIR *dir = opendir(pWal->path); DIR *dir = opendir(pWal->path);
if(dir == NULL) { if(dir == NULL) {
wError("vgId:%d, path:%s, failed to open since %s", pWal->vgId, pWal->path, strerror(errno)); wError("vgId:%d, path:%s, failed to open since %s", pWal->cfg.vgId, pWal->path, strerror(errno));
return -1; return -1;
} }

View File

@ -68,9 +68,12 @@ int32_t walInit() {
} }
void walCleanUp() { void walCleanUp() {
int old = atomic_val_compare_exchange_8(&tsWal.inited, 1, 0);
if(old == 0) {
return;
}
walStopThread(); walStopThread();
taosCloseRef(tsWal.refSetId); taosCloseRef(tsWal.refSetId);
atomic_store_8(&tsWal.inited, 0);
wInfo("wal module is cleaned up"); wInfo("wal module is cleaned up");
} }
@ -86,21 +89,15 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) {
pWal->writeCur = -1; pWal->writeCur = -1;
//set config //set config
pWal->vgId = pCfg->vgId; memcpy(&pWal->cfg, pCfg, sizeof(SWalCfg));
pWal->fsyncPeriod = pCfg->fsyncPeriod;
pWal->rollPeriod = pCfg->rollPeriod;
pWal->segSize = pCfg->segSize;
pWal->retentionSize = pCfg->retentionSize;
pWal->retentionPeriod = pCfg->retentionPeriod;
pWal->level = pCfg->walLevel;
//init version info //init version info
pWal->firstVersion = -1; pWal->vers.firstVer = -1;
pWal->commitVersion = -1; pWal->vers.commitVer = -1;
pWal->snapshotVersion = -1; pWal->vers.snapshotVer = -1;
pWal->lastVersion = -1; pWal->vers.lastVer = -1;
pWal->snapshottingVer = -1; pWal->vers.verInSnapshotting = -1;
pWal->totSize = 0; pWal->totSize = 0;
@ -108,8 +105,8 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) {
pWal->lastRollSeq = -1; pWal->lastRollSeq = -1;
//init write buffer //init write buffer
memset(&pWal->head, 0, sizeof(SWalHead)); memset(&pWal->writeHead, 0, sizeof(SWalHead));
pWal->head.head.sver = 0; pWal->writeHead.head.sver = 0;
tstrncpy(pWal->path, path, sizeof(pWal->path)); tstrncpy(pWal->path, path, sizeof(pWal->path));
pthread_mutex_init(&pWal->mutex, NULL); pthread_mutex_init(&pWal->mutex, NULL);
@ -129,7 +126,7 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) {
} }
walReadMeta(pWal); walReadMeta(pWal);
wDebug("vgId:%d, wal:%p is opened, level:%d fsyncPeriod:%d", pWal->vgId, pWal, pWal->level, pWal->fsyncPeriod); wDebug("vgId:%d, wal:%p is opened, level:%d fsyncPeriod:%d", pWal->cfg.vgId, pWal, pWal->cfg.level, pWal->cfg.fsyncPeriod);
return pWal; return pWal;
} }
@ -137,17 +134,17 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) {
int32_t walAlter(SWal *pWal, SWalCfg *pCfg) { int32_t walAlter(SWal *pWal, SWalCfg *pCfg) {
if (pWal == NULL) return TSDB_CODE_WAL_APP_ERROR; if (pWal == NULL) return TSDB_CODE_WAL_APP_ERROR;
if (pWal->level == pCfg->walLevel && pWal->fsyncPeriod == pCfg->fsyncPeriod) { if (pWal->cfg.level == pCfg->level && pWal->cfg.fsyncPeriod == pCfg->fsyncPeriod) {
wDebug("vgId:%d, old walLevel:%d fsync:%d, new walLevel:%d fsync:%d not change", pWal->vgId, pWal->level, wDebug("vgId:%d, old walLevel:%d fsync:%d, new walLevel:%d fsync:%d not change", pWal->cfg.vgId, pWal->cfg.level,
pWal->fsyncPeriod, pCfg->walLevel, pCfg->fsyncPeriod); pWal->cfg.fsyncPeriod, pCfg->level, pCfg->fsyncPeriod);
return 0; return 0;
} }
wInfo("vgId:%d, change old walLevel:%d fsync:%d, new walLevel:%d fsync:%d", pWal->vgId, pWal->level, wInfo("vgId:%d, change old walLevel:%d fsync:%d, new walLevel:%d fsync:%d", pWal->cfg.vgId, pWal->cfg.level,
pWal->fsyncPeriod, pCfg->walLevel, pCfg->fsyncPeriod); pWal->cfg.fsyncPeriod, pCfg->level, pCfg->fsyncPeriod);
pWal->level = pCfg->walLevel; pWal->cfg.level = pCfg->level;
pWal->fsyncPeriod = pCfg->fsyncPeriod; pWal->cfg.fsyncPeriod = pCfg->fsyncPeriod;
pWal->fsyncSeq = pCfg->fsyncPeriod / 1000; pWal->fsyncSeq = pCfg->fsyncPeriod / 1000;
if (pWal->fsyncSeq <= 0) pWal->fsyncSeq = 1; if (pWal->fsyncSeq <= 0) pWal->fsyncSeq = 1;
@ -171,22 +168,22 @@ void walClose(SWal *pWal) {
static int32_t walInitObj(SWal *pWal) { static int32_t walInitObj(SWal *pWal) {
if (taosMkDir(pWal->path) != 0) { if (taosMkDir(pWal->path) != 0) {
wError("vgId:%d, path:%s, failed to create directory since %s", pWal->vgId, pWal->path, strerror(errno)); wError("vgId:%d, path:%s, failed to create directory since %s", pWal->cfg.vgId, pWal->path, strerror(errno));
return TAOS_SYSTEM_ERROR(errno); return TAOS_SYSTEM_ERROR(errno);
} }
pWal->fileInfoSet = taosArrayInit(8, sizeof(WalFileInfo)); pWal->fileInfoSet = taosArrayInit(8, sizeof(WalFileInfo));
if(pWal->fileInfoSet == NULL) { if(pWal->fileInfoSet == NULL) {
wError("vgId:%d, path:%s, failed to init taosArray %s", pWal->vgId, pWal->path, strerror(errno)); wError("vgId:%d, path:%s, failed to init taosArray %s", pWal->cfg.vgId, pWal->path, strerror(errno));
return TAOS_SYSTEM_ERROR(errno); return TAOS_SYSTEM_ERROR(errno);
} }
wDebug("vgId:%d, object is initialized", pWal->vgId); wDebug("vgId:%d, object is initialized", pWal->cfg.vgId);
return 0; return 0;
} }
static void walFreeObj(void *wal) { static void walFreeObj(void *wal) {
SWal *pWal = wal; SWal *pWal = wal;
wDebug("vgId:%d, wal:%p is freed", pWal->vgId, pWal); wDebug("vgId:%d, wal:%p is freed", pWal->cfg.vgId, pWal);
tfClose(pWal->writeLogTfd); tfClose(pWal->writeLogTfd);
tfClose(pWal->writeIdxTfd); tfClose(pWal->writeIdxTfd);
@ -197,7 +194,7 @@ static void walFreeObj(void *wal) {
} }
static bool walNeedFsync(SWal *pWal) { static bool walNeedFsync(SWal *pWal) {
if (pWal->fsyncPeriod <= 0 || pWal->level != TAOS_WAL_FSYNC) { if (pWal->cfg.fsyncPeriod <= 0 || pWal->cfg.level != TAOS_WAL_FSYNC) {
return false; return false;
} }
@ -217,10 +214,10 @@ static void walFsyncAll() {
SWal *pWal = taosIterateRef(tsWal.refSetId, 0); SWal *pWal = taosIterateRef(tsWal.refSetId, 0);
while (pWal) { while (pWal) {
if (walNeedFsync(pWal)) { if (walNeedFsync(pWal)) {
wTrace("vgId:%d, do fsync, level:%d seq:%d rseq:%d", pWal->vgId, pWal->level, pWal->fsyncSeq, atomic_load_32(&tsWal.seq)); wTrace("vgId:%d, do fsync, level:%d seq:%d rseq:%d", pWal->cfg.vgId, pWal->cfg.level, pWal->fsyncSeq, atomic_load_32(&tsWal.seq));
int32_t code = tfFsync(pWal->writeLogTfd); int32_t code = tfFsync(pWal->writeLogTfd);
if (code != 0) { if (code != 0) {
wError("vgId:%d, file:%"PRId64".log, failed to fsync since %s", pWal->vgId, walGetLastFileFirstVer(pWal), strerror(code)); wError("vgId:%d, file:%"PRId64".log, failed to fsync since %s", pWal->cfg.vgId, walGetLastFileFirstVer(pWal), strerror(code));
} }
} }
pWal = taosIterateRef(tsWal.refSetId, pWal->refId); pWal = taosIterateRef(tsWal.refSetId, pWal->refId);
@ -258,9 +255,8 @@ static int32_t walCreateThread() {
static void walStopThread() { static void walStopThread() {
atomic_store_8(&tsWal.stop, 1); atomic_store_8(&tsWal.stop, 1);
if (tsWal.thread != NULL && taosCheckPthreadValid(tsWal.thread)) { if (taosCheckPthreadValid(tsWal.thread)) {
pthread_join(tsWal.thread, NULL); pthread_join(tsWal.thread, NULL);
tsWal.thread = NULL;
} }
wDebug("wal thread is stopped"); wDebug("wal thread is stopped");

View File

@ -16,6 +16,163 @@
#include "walInt.h" #include "walInt.h"
#include "tfile.h" #include "tfile.h"
SWalReadHandle* walOpenReadHandle(SWal* pWal) {
SWalReadHandle *pRead = malloc(sizeof(SWalReadHandle));
if(pRead == NULL) {
return NULL;
}
pRead->pWal = pWal;
pRead->readIdxTfd = -1;
pRead->readLogTfd = -1;
pRead->curVersion = -1;
pRead->curFileFirstVer = -1;
pRead->capacity = 0;
pRead->status = 0;
pRead->pHead = malloc(sizeof(SWalHead));
if(pRead->pHead == NULL) {
free(pRead);
return NULL;
}
return pRead;
}
void walCloseReadHandle(SWalReadHandle *pRead) {
tfClose(pRead->readIdxTfd);
tfClose(pRead->readLogTfd);
tfree(pRead->pHead);
free(pRead);
}
int32_t walRegisterRead(SWalReadHandle *pRead, int64_t ver) {
return 0;
}
static int32_t walReadSeekFilePos(SWalReadHandle *pRead, int64_t fileFirstVer, int64_t ver) {
int code = 0;
int64_t idxTfd = pRead->readIdxTfd;
int64_t logTfd = pRead->readLogTfd;
//seek position
int64_t offset = (ver - fileFirstVer) * WAL_IDX_ENTRY_SIZE;
code = tfLseek(idxTfd, offset, SEEK_SET);
if(code < 0) {
return -1;
}
WalIdxEntry entry;
if(tfRead(idxTfd, &entry, sizeof(WalIdxEntry)) != sizeof(WalIdxEntry)) {
return -1;
}
//TODO:deserialize
ASSERT(entry.ver == ver);
code = tfLseek(logTfd, entry.offset, SEEK_SET);
if (code < 0) {
return -1;
}
return code;
}
static int32_t walReadChangeFile(SWalReadHandle *pRead, int64_t fileFirstVer) {
char fnameStr[WAL_FILE_LEN];
tfClose(pRead->readIdxTfd);
tfClose(pRead->readLogTfd);
walBuildLogName(pRead->pWal, fileFirstVer, fnameStr);
int64_t logTfd = tfOpenRead(fnameStr);
if(logTfd < 0) {
return -1;
}
walBuildIdxName(pRead->pWal, fileFirstVer, fnameStr);
int64_t idxTfd = tfOpenRead(fnameStr);
if(idxTfd < 0) {
return -1;
}
pRead->readLogTfd = logTfd;
pRead->readIdxTfd = idxTfd;
return 0;
}
static int32_t walReadSeekVer(SWalReadHandle *pRead, int64_t ver) {
int code;
SWal *pWal = pRead->pWal;
if(ver == pRead->curVersion) {
return 0;
}
if(ver > pWal->vers.lastVer || ver < pWal->vers.firstVer) {
return -1;
}
if(ver < pWal->vers.snapshotVer) {
}
WalFileInfo tmpInfo;
tmpInfo.firstVer = ver;
//bsearch in fileSet
WalFileInfo* pRet = taosArraySearch(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE);
ASSERT(pRet != NULL);
if(pRead->curFileFirstVer != pRet->firstVer) {
code = walReadChangeFile(pRead, pRet->firstVer);
if(code < 0) {
//TODO: set error flag
return -1;
}
}
code = walReadSeekFilePos(pRead, pRet->firstVer, ver);
if(code < 0) {
return -1;
}
pRead->curVersion = ver;
return 0;
}
int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver) {
int code;
//TODO: check wal life
if(pRead->curVersion != ver) {
code = walReadSeekVer(pRead, ver);
if(code != 0) {
return -1;
}
}
if(!tfValid(pRead->readLogTfd)) return -1;
code = tfRead(pRead->readLogTfd, pRead->pHead, sizeof(SWalHead));
if(code != sizeof(SWalHead)) {
return -1;
}
code = walValidHeadCksum(pRead->pHead);
if(code != 0) {
return -1;
}
if(pRead->capacity < pRead->pHead->head.len) {
void* ptr = realloc(pRead->pHead, sizeof(SWalHead) + pRead->pHead->head.len);
if(ptr == NULL) {
return -1;
}
pRead->pHead = ptr;
pRead->capacity = pRead->pHead->head.len;
}
if(pRead->pHead->head.len != tfRead(pRead->readLogTfd, pRead->pHead->head.body, pRead->pHead->head.len)) {
return -1;
}
/*code = walValidBodyCksum(pRead->pHead);*/
ASSERT(pRead->pHead->head.version == ver);
if(code != 0) {
return -1;
}
pRead->curVersion++;
return 0;
}
int32_t walRead(SWal *pWal, SWalHead **ppHead, int64_t ver) { int32_t walRead(SWal *pWal, SWalHead **ppHead, int64_t ver) {
int code; int code;
code = walSeekVer(pWal, ver); code = walSeekVer(pWal, ver);
@ -42,7 +199,7 @@ int32_t walRead(SWal *pWal, SWalHead **ppHead, int64_t ver) {
*ppHead = NULL; *ppHead = NULL;
return -1; return -1;
} }
if(tfRead(pWal->writeLogTfd, (*ppHead)->head.cont, (*ppHead)->head.len) != (*ppHead)->head.len) { if(tfRead(pWal->writeLogTfd, (*ppHead)->head.body, (*ppHead)->head.len) != (*ppHead)->head.len) {
return -1; return -1;
} }
//TODO: endian compatibility processing after read //TODO: endian compatibility processing after read

View File

@ -78,10 +78,12 @@ int walChangeFile(SWal *pWal, int64_t ver) {
code = tfClose(pWal->writeLogTfd); code = tfClose(pWal->writeLogTfd);
if(code != 0) { if(code != 0) {
//TODO //TODO
return -1;
} }
code = tfClose(pWal->writeIdxTfd); code = tfClose(pWal->writeIdxTfd);
if(code != 0) { if(code != 0) {
//TODO //TODO
return -1;
} }
WalFileInfo tmpInfo; WalFileInfo tmpInfo;
tmpInfo.firstVer = ver; tmpInfo.firstVer = ver;
@ -106,24 +108,19 @@ int walChangeFile(SWal *pWal, int64_t ver) {
pWal->writeLogTfd = logTfd; pWal->writeLogTfd = logTfd;
pWal->writeIdxTfd = idxTfd; pWal->writeIdxTfd = idxTfd;
return code; return fileFirstVer;
}
int walGetVerOffset(SWal* pWal, int64_t ver) {
int code;
return 0;
} }
int walSeekVer(SWal *pWal, int64_t ver) { int walSeekVer(SWal *pWal, int64_t ver) {
int code; int code;
if(ver == pWal->lastVersion) { if(ver == pWal->vers.lastVer) {
return 0; return 0;
} }
if(ver > pWal->lastVersion || ver < pWal->firstVersion) { if(ver > pWal->vers.lastVer|| ver < pWal->vers.firstVer) {
return -1; return -1;
} }
if(ver < pWal->snapshotVersion) { if(ver < pWal->vers.snapshotVer) {
//TODO: set flag to prevent roll back
} }
if(ver < walGetCurFileFirstVer(pWal) || (ver > walGetCurFileLastVer(pWal))) { if(ver < walGetCurFileFirstVer(pWal) || (ver > walGetCurFileLastVer(pWal))) {
code = walChangeFile(pWal, ver); code = walChangeFile(pWal, ver);

View File

@ -17,6 +17,7 @@
#include "os.h" #include "os.h"
#include "walInt.h" #include "walInt.h"
#if 0
int32_t walGetNextFile(SWal *pWal, int64_t *nextFileId) { int32_t walGetNextFile(SWal *pWal, int64_t *nextFileId) {
int64_t curFileId = *nextFileId; int64_t curFileId = *nextFileId;
int64_t minFileId = INT64_MAX; int64_t minFileId = INT64_MAX;
@ -116,3 +117,4 @@ int32_t walGetNewFile(SWal *pWal, int64_t *newFileId) {
return 0; return 0;
} }
#endif

View File

@ -114,22 +114,22 @@ void walRemoveAllOldFiles(void *handle) {
#endif #endif
int32_t walCommit(SWal *pWal, int64_t ver) { int32_t walCommit(SWal *pWal, int64_t ver) {
ASSERT(pWal->commitVersion >= pWal->snapshotVersion); ASSERT(pWal->vers.commitVer >= pWal->vers.snapshotVer);
ASSERT(pWal->commitVersion <= pWal->lastVersion); ASSERT(pWal->vers.commitVer <= pWal->vers.lastVer);
if(ver < pWal->commitVersion || ver > pWal->lastVersion) { if(ver < pWal->vers.commitVer || ver > pWal->vers.lastVer) {
return -1; return -1;
} }
pWal->commitVersion = ver; pWal->vers.commitVer = ver;
return 0; return 0;
} }
int32_t walRollback(SWal *pWal, int64_t ver) { int32_t walRollback(SWal *pWal, int64_t ver) {
int code; int code;
char fnameStr[WAL_FILE_LEN]; char fnameStr[WAL_FILE_LEN];
if(ver == pWal->lastVersion) { if(ver == pWal->vers.lastVer) {
return 0; return 0;
} }
if(ver > pWal->lastVersion || ver < pWal->commitVersion) { if(ver > pWal->vers.lastVer || ver < pWal->vers.commitVer) {
return -1; return -1;
} }
pthread_mutex_lock(&pWal->mutex); pthread_mutex_lock(&pWal->mutex);
@ -220,7 +220,7 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
if(code < 0) { if(code < 0) {
return -1; return -1;
} }
pWal->lastVersion = ver - 1; pWal->vers.lastVer = ver - 1;
((WalFileInfo*)taosArrayGetLast(pWal->fileInfoSet))->lastVer = ver - 1; ((WalFileInfo*)taosArrayGetLast(pWal->fileInfoSet))->lastVer = ver - 1;
((WalFileInfo*)taosArrayGetLast(pWal->fileInfoSet))->fileSize = entry.offset; ((WalFileInfo*)taosArrayGetLast(pWal->fileInfoSet))->fileSize = entry.offset;
@ -230,9 +230,9 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
} }
int32_t walBeginTakeSnapshot(SWal* pWal, int64_t ver) { int32_t walBeginTakeSnapshot(SWal* pWal, int64_t ver) {
pWal->snapshottingVer = ver; pWal->vers.verInSnapshotting = ver;
//check file rolling //check file rolling
if(pWal->retentionPeriod == 0) { if(pWal->cfg.retentionPeriod == 0) {
walRoll(pWal); walRoll(pWal);
} }
@ -240,10 +240,10 @@ int32_t walBeginTakeSnapshot(SWal* pWal, int64_t ver) {
} }
int32_t walEndTakeSnapshot(SWal *pWal) { int32_t walEndTakeSnapshot(SWal *pWal) {
int64_t ver = pWal->snapshottingVer; int64_t ver = pWal->vers.verInSnapshotting;
if(ver == -1) return -1; if(ver == -1) return -1;
pWal->snapshotVersion = ver; pWal->vers.snapshotVer = ver;
int ts = taosGetTimestampSec(); int ts = taosGetTimestampSec();
int deleteCnt = 0; int deleteCnt = 0;
@ -257,8 +257,8 @@ int32_t walEndTakeSnapshot(SWal *pWal) {
} }
//iterate files, until the searched result //iterate files, until the searched result
for(WalFileInfo* iter = pWal->fileInfoSet->pData; iter < pInfo; iter++) { for(WalFileInfo* iter = pWal->fileInfoSet->pData; iter < pInfo; iter++) {
if(pWal->totSize > pWal->retentionSize || if(pWal->totSize > pWal->cfg.retentionSize ||
iter->closeTs + pWal->retentionPeriod > ts) { iter->closeTs + pWal->cfg.retentionPeriod > ts) {
//delete according to file size or close time //delete according to file size or close time
deleteCnt++; deleteCnt++;
newTotSize -= iter->fileSize; newTotSize -= iter->fileSize;
@ -278,13 +278,13 @@ int32_t walEndTakeSnapshot(SWal *pWal) {
taosArrayPopFrontBatch(pWal->fileInfoSet, deleteCnt); taosArrayPopFrontBatch(pWal->fileInfoSet, deleteCnt);
if(taosArrayGetSize(pWal->fileInfoSet) == 0) { if(taosArrayGetSize(pWal->fileInfoSet) == 0) {
pWal->writeCur = -1; pWal->writeCur = -1;
pWal->firstVersion = -1; pWal->vers.firstVer = -1;
} else { } else {
pWal->firstVersion = ((WalFileInfo*)taosArrayGet(pWal->fileInfoSet, 0))->firstVer; pWal->vers.firstVer = ((WalFileInfo*)taosArrayGet(pWal->fileInfoSet, 0))->firstVer;
} }
pWal->writeCur = taosArrayGetSize(pWal->fileInfoSet) - 1;; pWal->writeCur = taosArrayGetSize(pWal->fileInfoSet) - 1;;
pWal->totSize = newTotSize; pWal->totSize = newTotSize;
pWal->snapshottingVer = -1; pWal->vers.verInSnapshotting = -1;
//save snapshot ver, commit ver //save snapshot ver, commit ver
int code = walWriteMeta(pWal); int code = walWriteMeta(pWal);
@ -311,7 +311,7 @@ int walRoll(SWal *pWal) {
} }
int64_t idxTfd, logTfd; int64_t idxTfd, logTfd;
//create new file //create new file
int64_t newFileFirstVersion = pWal->lastVersion + 1; int64_t newFileFirstVersion = pWal->vers.lastVer + 1;
char fnameStr[WAL_FILE_LEN]; char fnameStr[WAL_FILE_LEN];
walBuildIdxName(pWal, newFileFirstVersion, fnameStr); walBuildIdxName(pWal, newFileFirstVersion, fnameStr);
idxTfd = tfOpenCreateWrite(fnameStr); idxTfd = tfOpenCreateWrite(fnameStr);
@ -357,18 +357,18 @@ int64_t walWrite(SWal *pWal, int64_t index, uint8_t msgType, const void *body, i
int code = 0; int code = 0;
// no wal // no wal
if (pWal->level == TAOS_WAL_NOLOG) return 0; if (pWal->cfg.level == TAOS_WAL_NOLOG) return 0;
if (index == pWal->lastVersion + 1) { if (index == pWal->vers.lastVer + 1) {
if(taosArrayGetSize(pWal->fileInfoSet) == 0) { if(taosArrayGetSize(pWal->fileInfoSet) == 0) {
pWal->firstVersion = index; pWal->vers.firstVer = index;
code = walRoll(pWal); code = walRoll(pWal);
ASSERT(code == 0); ASSERT(code == 0);
} else { } else {
int64_t passed = walGetSeq() - pWal->lastRollSeq; int64_t passed = walGetSeq() - pWal->lastRollSeq;
if(pWal->rollPeriod != -1 && pWal->rollPeriod != 0 && passed > pWal->rollPeriod) { if(pWal->cfg.rollPeriod != -1 && pWal->cfg.rollPeriod != 0 && passed > pWal->cfg.rollPeriod) {
walRoll(pWal); walRoll(pWal);
} else if(pWal->segSize != -1 && pWal->segSize != 0 && walGetLastFileSize(pWal) > pWal->segSize) { } else if(pWal->cfg.segSize != -1 && pWal->cfg.segSize != 0 && walGetLastFileSize(pWal) > pWal->cfg.segSize) {
walRoll(pWal); walRoll(pWal);
} }
} }
@ -377,35 +377,36 @@ int64_t walWrite(SWal *pWal, int64_t index, uint8_t msgType, const void *body, i
//must truncate explicitly first //must truncate explicitly first
return -1; return -1;
} }
/*if (!tfValid(pWal->curLogTfd)) return 0;*/ /*if (!tfValid(pWal->writeLogTfd)) return -1;*/
pthread_mutex_lock(&pWal->mutex); pthread_mutex_lock(&pWal->mutex);
pWal->head.head.version = index; pWal->writeHead.head.version = index;
pWal->head.head.len = bodyLen; int64_t offset = walGetCurFileOffset(pWal);
pWal->head.head.msgType = msgType; pWal->writeHead.head.len = bodyLen;
pWal->head.cksumHead = walCalcHeadCksum(&pWal->head); pWal->writeHead.head.msgType = msgType;
pWal->head.cksumBody = walCalcBodyCksum(body, bodyLen); pWal->writeHead.cksumHead = walCalcHeadCksum(&pWal->writeHead);
pWal->writeHead.cksumBody = walCalcBodyCksum(body, bodyLen);
if (tfWrite(pWal->writeLogTfd, &pWal->head, sizeof(SWalHead)) != sizeof(SWalHead)) { if (tfWrite(pWal->writeLogTfd, &pWal->writeHead, sizeof(SWalHead)) != sizeof(SWalHead)) {
//ftruncate //ftruncate
code = TAOS_SYSTEM_ERROR(errno); code = TAOS_SYSTEM_ERROR(errno);
wError("vgId:%d, file:%"PRId64".log, failed to write since %s", pWal->vgId, walGetLastFileFirstVer(pWal), strerror(errno)); wError("vgId:%d, file:%"PRId64".log, failed to write since %s", pWal->cfg.vgId, walGetLastFileFirstVer(pWal), strerror(errno));
} }
if (tfWrite(pWal->writeLogTfd, &body, bodyLen) != bodyLen) { if (tfWrite(pWal->writeLogTfd, (char*)body, bodyLen) != bodyLen) {
//ftruncate //ftruncate
code = TAOS_SYSTEM_ERROR(errno); code = TAOS_SYSTEM_ERROR(errno);
wError("vgId:%d, file:%"PRId64".log, failed to write since %s", pWal->vgId, walGetLastFileFirstVer(pWal), strerror(errno)); wError("vgId:%d, file:%"PRId64".log, failed to write since %s", pWal->cfg.vgId, walGetLastFileFirstVer(pWal), strerror(errno));
} }
code = walWriteIndex(pWal, index, walGetCurFileOffset(pWal)); code = walWriteIndex(pWal, index, offset);
if(code != 0) { if(code != 0) {
//TODO //TODO
return -1; return -1;
} }
//set status //set status
pWal->lastVersion = index; pWal->vers.lastVer = index;
pWal->totSize += sizeof(SWalHead) + bodyLen; pWal->totSize += sizeof(SWalHead) + bodyLen;
walGetCurFileInfo(pWal)->lastVer = index; walGetCurFileInfo(pWal)->lastVer = index;
walGetCurFileInfo(pWal)->fileSize += sizeof(SWalHead) + bodyLen; walGetCurFileInfo(pWal)->fileSize += sizeof(SWalHead) + bodyLen;
@ -416,10 +417,10 @@ int64_t walWrite(SWal *pWal, int64_t index, uint8_t msgType, const void *body, i
} }
void walFsync(SWal *pWal, bool forceFsync) { void walFsync(SWal *pWal, bool forceFsync) {
if (forceFsync || (pWal->level == TAOS_WAL_FSYNC && pWal->fsyncPeriod == 0)) { if (forceFsync || (pWal->cfg.level == TAOS_WAL_FSYNC && pWal->cfg.fsyncPeriod == 0)) {
wTrace("vgId:%d, fileId:%"PRId64".log, do fsync", pWal->vgId, walGetCurFileFirstVer(pWal)); wTrace("vgId:%d, fileId:%"PRId64".log, do fsync", pWal->cfg.vgId, walGetCurFileFirstVer(pWal));
if (tfFsync(pWal->writeLogTfd) < 0) { if (tfFsync(pWal->writeLogTfd) < 0) {
wError("vgId:%d, file:%"PRId64".log, fsync failed since %s", pWal->vgId, walGetCurFileFirstVer(pWal), strerror(errno)); wError("vgId:%d, file:%"PRId64".log, fsync failed since %s", pWal->cfg.vgId, walGetCurFileFirstVer(pWal), strerror(errno));
} }
} }
} }
@ -492,29 +493,29 @@ int32_t walGetWalFile(void *handle, char *fileName, int64_t *fileId) {
} }
#endif #endif
static int walValidateOffset(SWal* pWal, int64_t ver) { /*static int walValidateOffset(SWal* pWal, int64_t ver) {*/
int code = 0; /*int code = 0;*/
SWalHead *pHead = NULL; /*SWalHead *pHead = NULL;*/
code = (int)walRead(pWal, &pHead, ver); /*code = (int)walRead(pWal, &pHead, ver);*/
if(pHead->head.version != ver) { /*if(pHead->head.version != ver) {*/
return -1; /*return -1;*/
} /*}*/
return 0; /*return 0;*/
} /*}*/
static int64_t walGetOffset(SWal* pWal, int64_t ver) { /*static int64_t walGetOffset(SWal* pWal, int64_t ver) {*/
int code = walSeekVer(pWal, ver); /*int code = walSeekVer(pWal, ver);*/
if(code != 0) { /*if(code != 0) {*/
return -1; /*return -1;*/
} /*}*/
code = walValidateOffset(pWal, ver); /*code = walValidateOffset(pWal, ver);*/
if(code != 0) { /*if(code != 0) {*/
return -1; /*return -1;*/
} /*}*/
return 0; /*return 0;*/
} /*}*/
#if 0 #if 0
static int32_t walSkipCorruptedRecord(SWal *pWal, SWalHead *pHead, int64_t tfd, int64_t *offset) { static int32_t walSkipCorruptedRecord(SWal *pWal, SWalHead *pHead, int64_t tfd, int64_t *offset) {

View File

@ -5,6 +5,9 @@
#include "walInt.h" #include "walInt.h"
const char* ranStr = "tvapq02tcp";
const int ranStrLen = strlen(ranStr);
class WalCleanEnv : public ::testing::Test { class WalCleanEnv : public ::testing::Test {
protected: protected:
static void SetUpTestCase() { static void SetUpTestCase() {
@ -24,7 +27,7 @@ class WalCleanEnv : public ::testing::Test {
pCfg->segSize = -1; pCfg->segSize = -1;
pCfg->retentionPeriod = 0; pCfg->retentionPeriod = 0;
pCfg->retentionSize = 0; pCfg->retentionSize = 0;
pCfg->walLevel = TAOS_WAL_FSYNC; pCfg->level = TAOS_WAL_FSYNC;
pWal = walOpen(pathName, pCfg); pWal = walOpen(pathName, pCfg);
free(pCfg); free(pCfg);
ASSERT(pWal != NULL); ASSERT(pWal != NULL);
@ -56,7 +59,7 @@ class WalCleanDeleteEnv : public ::testing::Test {
memset(pCfg, 0, sizeof(SWalCfg)); memset(pCfg, 0, sizeof(SWalCfg));
pCfg->retentionPeriod = 0; pCfg->retentionPeriod = 0;
pCfg->retentionSize = 0; pCfg->retentionSize = 0;
pCfg->walLevel = TAOS_WAL_FSYNC; pCfg->level = TAOS_WAL_FSYNC;
pWal = walOpen(pathName, pCfg); pWal = walOpen(pathName, pCfg);
free(pCfg); free(pCfg);
ASSERT(pWal != NULL); ASSERT(pWal != NULL);
@ -95,7 +98,7 @@ class WalKeepEnv : public ::testing::Test {
pCfg->segSize = -1; pCfg->segSize = -1;
pCfg->retentionPeriod = 0; pCfg->retentionPeriod = 0;
pCfg->retentionSize = 0; pCfg->retentionSize = 0;
pCfg->walLevel = TAOS_WAL_FSYNC; pCfg->level = TAOS_WAL_FSYNC;
pWal = walOpen(pathName, pCfg); pWal = walOpen(pathName, pCfg);
free(pCfg); free(pCfg);
ASSERT(pWal != NULL); ASSERT(pWal != NULL);
@ -157,29 +160,27 @@ TEST_F(WalCleanEnv, removeOldMeta) {
TEST_F(WalKeepEnv, readOldMeta) { TEST_F(WalKeepEnv, readOldMeta) {
walResetEnv(); walResetEnv();
const char* ranStr = "tvapq02tcp";
int len = strlen(ranStr);
int code; int code;
for(int i = 0; i < 10; i++) { for(int i = 0; i < 10; i++) {
code = walWrite(pWal, i, i+1, (void*)ranStr, len); code = walWrite(pWal, i, i+1, (void*)ranStr, ranStrLen);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
ASSERT_EQ(pWal->lastVersion, i); ASSERT_EQ(pWal->vers.lastVer, i);
code = walWrite(pWal, i+2, i, (void*)ranStr, len); code = walWrite(pWal, i+2, i, (void*)ranStr, ranStrLen);
ASSERT_EQ(code, -1); ASSERT_EQ(code, -1);
ASSERT_EQ(pWal->lastVersion, i); ASSERT_EQ(pWal->vers.lastVer, i);
} }
char* oldss = walMetaSerialize(pWal); char* oldss = walMetaSerialize(pWal);
TearDown(); TearDown();
SetUp(); SetUp();
ASSERT_EQ(pWal->firstVersion, 0); ASSERT_EQ(pWal->vers.firstVer, 0);
ASSERT_EQ(pWal->lastVersion, 9); ASSERT_EQ(pWal->vers.lastVer, 9);
char* newss = walMetaSerialize(pWal); char* newss = walMetaSerialize(pWal);
len = strlen(oldss); int len = strlen(oldss);
ASSERT_EQ(len, strlen(newss)); ASSERT_EQ(len, strlen(newss));
for(int i = 0; i < len; i++) { for(int i = 0; i < len; i++) {
EXPECT_EQ(oldss[i], newss[i]); EXPECT_EQ(oldss[i], newss[i]);
@ -189,72 +190,102 @@ TEST_F(WalKeepEnv, readOldMeta) {
} }
TEST_F(WalCleanEnv, write) { TEST_F(WalCleanEnv, write) {
const char* ranStr = "tvapq02tcp";
const int len = strlen(ranStr);
int code; int code;
for(int i = 0; i < 10; i++) { for(int i = 0; i < 10; i++) {
code = walWrite(pWal, i, i+1, (void*)ranStr, len); code = walWrite(pWal, i, i+1, (void*)ranStr, ranStrLen);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
ASSERT_EQ(pWal->lastVersion, i); ASSERT_EQ(pWal->vers.lastVer, i);
code = walWrite(pWal, i+2, i, (void*)ranStr, len); code = walWrite(pWal, i+2, i, (void*)ranStr, ranStrLen);
ASSERT_EQ(code, -1); ASSERT_EQ(code, -1);
ASSERT_EQ(pWal->lastVersion, i); ASSERT_EQ(pWal->vers.lastVer, i);
} }
code = walWriteMeta(pWal); code = walWriteMeta(pWal);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
} }
TEST_F(WalCleanEnv, rollback) { TEST_F(WalCleanEnv, rollback) {
const char* ranStr = "tvapq02tcp";
const int len = strlen(ranStr);
int code; int code;
for(int i = 0; i < 10; i++) { for(int i = 0; i < 10; i++) {
code = walWrite(pWal, i, i+1, (void*)ranStr, len); code = walWrite(pWal, i, i+1, (void*)ranStr, ranStrLen);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
ASSERT_EQ(pWal->lastVersion, i); ASSERT_EQ(pWal->vers.lastVer, i);
} }
code = walRollback(pWal, 5); code = walRollback(pWal, 5);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
ASSERT_EQ(pWal->lastVersion, 4); ASSERT_EQ(pWal->vers.lastVer, 4);
code = walRollback(pWal, 3); code = walRollback(pWal, 3);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
ASSERT_EQ(pWal->lastVersion, 2); ASSERT_EQ(pWal->vers.lastVer, 2);
code = walWriteMeta(pWal); code = walWriteMeta(pWal);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
} }
TEST_F(WalCleanDeleteEnv, roll) { TEST_F(WalCleanDeleteEnv, roll) {
const char* ranStr = "tvapq02tcp";
const int len = strlen(ranStr);
int code; int code;
int i; int i;
for(i = 0; i < 100; i++) { for(i = 0; i < 100; i++) {
code = walWrite(pWal, i, 0, (void*)ranStr, len); code = walWrite(pWal, i, 0, (void*)ranStr, ranStrLen);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
ASSERT_EQ(pWal->lastVersion, i); ASSERT_EQ(pWal->vers.lastVer, i);
code = walCommit(pWal, i); code = walCommit(pWal, i);
ASSERT_EQ(pWal->commitVersion, i); ASSERT_EQ(pWal->vers.commitVer, i);
} }
walBeginTakeSnapshot(pWal, i-1); walBeginTakeSnapshot(pWal, i-1);
ASSERT_EQ(pWal->snapshottingVer, i-1); ASSERT_EQ(pWal->vers.verInSnapshotting, i-1);
walEndTakeSnapshot(pWal); walEndTakeSnapshot(pWal);
ASSERT_EQ(pWal->snapshotVersion, i-1); ASSERT_EQ(pWal->vers.snapshotVer, i-1);
ASSERT_EQ(pWal->snapshottingVer, -1); ASSERT_EQ(pWal->vers.verInSnapshotting, -1);
code = walWrite(pWal, 5, 0, (void*)ranStr, len); code = walWrite(pWal, 5, 0, (void*)ranStr, ranStrLen);
ASSERT_NE(code, 0); ASSERT_NE(code, 0);
for(; i < 200; i++) { for(; i < 200; i++) {
code = walWrite(pWal, i, 0, (void*)ranStr, len); code = walWrite(pWal, i, 0, (void*)ranStr, ranStrLen);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
code = walCommit(pWal, i); code = walCommit(pWal, i);
ASSERT_EQ(pWal->commitVersion, i); ASSERT_EQ(pWal->vers.commitVer, i);
} }
//code = walWriteMeta(pWal);
code = walBeginTakeSnapshot(pWal, i - 1); code = walBeginTakeSnapshot(pWal, i - 1);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
code = walEndTakeSnapshot(pWal); code = walEndTakeSnapshot(pWal);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
} }
TEST_F(WalKeepEnv, readHandleRead) {
walResetEnv();
int code;
SWalReadHandle* pRead = walOpenReadHandle(pWal);
ASSERT(pRead != NULL);
int i ;
for(i = 0; i < 100; i++) {
char newStr[100];
sprintf(newStr, "%s-%d", ranStr, i);
int len = strlen(newStr);
code = walWrite(pWal, i, 0, newStr, len);
ASSERT_EQ(code, 0);
}
for(int i = 0; i < 1000; i++) {
int ver = rand() % 100;
code = walReadWithHandle(pRead, ver);
ASSERT_EQ(code, 0);
//printf("rrbody: \n");
//for(int i = 0; i < pRead->pHead->head.len; i++) {
//printf("%d ", pRead->pHead->head.body[i]);
//}
//printf("\n");
ASSERT_EQ(pRead->pHead->head.version, ver);
ASSERT_EQ(pRead->curVersion, ver+1);
char newStr[100];
sprintf(newStr, "%s-%d", ranStr, ver);
int len = strlen(newStr);
ASSERT_EQ(pRead->pHead->head.len, len);
for(int j = 0; j < len; j++) {
EXPECT_EQ(newStr[j], pRead->pHead->head.body[j]);
}
}
}

View File

@ -250,7 +250,7 @@ void taosArrayPopFrontBatch(SArray* pArray, size_t cnt) {
if(pArray->size == 0) { if(pArray->size == 0) {
return; return;
} }
memmove(pArray->pData, (char*)pArray->pData + cnt * pArray->elemSize, pArray->size); memmove(pArray->pData, (char*)pArray->pData + cnt * pArray->elemSize, pArray->size * pArray->elemSize);
} }
void taosArrayPopTailBatch(SArray* pArray, size_t cnt) { void taosArrayPopTailBatch(SArray* pArray, size_t cnt) {

View File

@ -17515,7 +17515,7 @@
fun:gaih_inet.constprop.0 fun:gaih_inet.constprop.0
fun:getaddrinfo fun:getaddrinfo
fun:taosGetFqdn fun:taosGetFqdn
fun:taosCheckGlobalCfg fun:taosCheckAndPrintCfg
fun:taos_init_imp fun:taos_init_imp
} }
{ {
@ -17740,7 +17740,7 @@
fun:gaih_inet.constprop.7 fun:gaih_inet.constprop.7
fun:getaddrinfo fun:getaddrinfo
fun:taosGetFqdn fun:taosGetFqdn
fun:taosCheckGlobalCfg fun:taosCheckAndPrintCfg
fun:taos_init_imp fun:taos_init_imp
} }
{ {