diff --git a/include/dnode/mnode/sdb/sdb.h b/include/dnode/mnode/sdb/sdb.h
index 7e7afc9774..9620c4ea62 100644
--- a/include/dnode/mnode/sdb/sdb.h
+++ b/include/dnode/mnode/sdb/sdb.h
@@ -144,9 +144,10 @@ typedef struct SSdbRow SSdbRow;
typedef enum { SDB_KEY_BINARY = 1, SDB_KEY_INT32 = 2, SDB_KEY_INT64 = 3 } EKeyType;
typedef enum {
SDB_STATUS_CREATING = 1,
- SDB_STATUS_READY = 2,
+ SDB_STATUS_UPDATING = 2,
SDB_STATUS_DROPPING = 3,
- SDB_STATUS_DROPPED = 4
+ SDB_STATUS_READY = 4,
+ SDB_STATUS_DROPPED = 5
} ESdbStatus;
typedef enum {
@@ -174,67 +175,19 @@ typedef SSdbRow *(*SdbDecodeFp)(SSdbRaw *pRaw);
typedef SSdbRaw *(*SdbEncodeFp)(void *pObj);
typedef struct {
- /**
- * @brief The sdb type of the table.
- *
- */
- ESdbType sdbType;
-
- /**
- * @brief The key type of the table.
- *
- */
- EKeyType keyType;
-
- /**
- * @brief The callback function when the table is first deployed.
- *
- */
+ ESdbType sdbType;
+ EKeyType keyType;
SdbDeployFp deployFp;
-
- /**
- * @brief Encode one row of the table into rawdata.
- *
- */
SdbEncodeFp encodeFp;
-
- /**
- * @brief Decode one row of the table from rawdata.
- *
- */
SdbDecodeFp decodeFp;
-
- /**
- * @brief The callback function when insert a row to sdb.
- *
- */
SdbInsertFp insertFp;
-
- /**
- * @brief The callback function when undate a row in sdb.
- *
- */
SdbUpdateFp updateFp;
-
- /**
- * @brief The callback function when delete a row from sdb.
- *
- */
SdbDeleteFp deleteFp;
} SSdbTable;
typedef struct SSdbOpt {
- /**
- * @brief The path of the sdb file.
- *
- */
const char *path;
-
- /**
- * @brief The mnode object.
- *
- */
- SMnode *pMnode;
+ SMnode *pMnode;
} SSdbOpt;
/**
diff --git a/include/dnode/vnode/vnode.h b/include/dnode/vnode/vnode.h
index 007ce83812..8458ad9da3 100644
--- a/include/dnode/vnode/vnode.h
+++ b/include/dnode/vnode/vnode.h
@@ -68,9 +68,11 @@ typedef struct SVnodeCfg {
/**
* @brief Initialize the vnode module
*
+ * @param nthreads number of commit threads. 0 for no threads and
+ * a schedule queue should be given (TODO)
* @return int 0 for success and -1 for failure
*/
-int vnodeInit();
+int vnodeInit(uint16_t nthreads);
/**
* @brief clear a vnode
diff --git a/include/libs/planner/planner.h b/include/libs/planner/planner.h
index 1ff3f02da5..8f217a0deb 100644
--- a/include/libs/planner/planner.h
+++ b/include/libs/planner/planner.h
@@ -22,6 +22,7 @@ extern "C" {
#define QUERY_TYPE_MERGE 1
#define QUERY_TYPE_PARTIAL 2
+#define QUERY_TYPE_SCAN 3
enum OPERATOR_TYPE_E {
OP_TableScan = 1,
@@ -54,90 +55,37 @@ enum OPERATOR_TYPE_E {
struct SEpSet;
struct SQueryPlanNode;
-struct SQueryDistPlanNode;
+struct SPhyNode;
struct SQueryStmtInfo;
-typedef struct SSubquery {
- int64_t queryId; // the subquery id created by qnode
- int32_t type; // QUERY_TYPE_MERGE|QUERY_TYPE_PARTIAL
- int32_t level; // the execution level of current subquery, starting from 0.
- SArray *pUpstream; // the upstream,from which to fetch the result
- struct SQueryDistPlanNode *pNode; // physical plan of current subquery
-} SSubquery;
-
-typedef struct SQueryJob {
- SArray **pSubqueries;
- int32_t numOfLevels;
- int32_t currentLevel;
-} SQueryJob;
+typedef struct SSubplan {
+ int32_t type; // QUERY_TYPE_MERGE|QUERY_TYPE_PARTIAL|QUERY_TYPE_SCAN
+ SArray *pDatasource; // the datasource subplan,from which to fetch the result
+ struct SPhyNode *pNode; // physical plan of current subplan
+} SSubplan;
+typedef struct SQueryDag {
+ SArray **pSubplans;
+} SQueryDag;
/**
- * Optimize the query execution plan, currently not implement yet.
- * @param pQueryNode
- * @return
+ * Create the physical plan for the query, according to the AST.
*/
-int32_t qOptimizeQueryPlan(struct SQueryPlanNode* pQueryNode);
+int32_t qCreateQueryDag(const struct SQueryStmtInfo* pQueryInfo, struct SEpSet* pQnode, struct SQueryDag** pDag);
+
+int32_t qExplainQuery(const struct SQueryStmtInfo* pQueryInfo, struct SEpSet* pQnode, char** str);
/**
- * Create the query plan according to the bound AST, which is in the form of pQueryInfo
- * @param pQueryInfo
- * @param pQueryNode
- * @return
+ * Convert to subplan to string for the scheduler to send to the executor
*/
-int32_t qCreateQueryPlan(const struct SQueryStmtInfo* pQueryInfo, struct SQueryPlanNode** pQueryNode);
-
-/**
- * Convert the query plan to string, in order to display it in the shell.
- * @param pQueryNode
- * @return
- */
-int32_t qQueryPlanToString(struct SQueryPlanNode* pQueryNode, char** str);
-
-/**
- * Restore the SQL statement according to the logic query plan.
- * @param pQueryNode
- * @param sql
- * @return
- */
-int32_t qQueryPlanToSql(struct SQueryPlanNode* pQueryNode, char** sql);
-
-/**
- * Create the physical plan for the query, according to the logic plan.
- * @param pQueryNode
- * @param pPhyNode
- * @return
- */
-int32_t qCreatePhysicalPlan(struct SQueryPlanNode* pQueryNode, struct SEpSet* pQnode, struct SQueryDistPlanNode *pPhyNode);
-
-/**
- * Convert to physical plan to string to enable to print it out in the shell.
- * @param pPhyNode
- * @param str
- * @return
- */
-int32_t qPhyPlanToString(struct SQueryDistPlanNode *pPhyNode, char** str);
-
-/**
- * Destroy the query plan object.
- * @return
- */
-void* qDestroyQueryPlan(struct SQueryPlanNode* pQueryNode);
+int32_t qSubPlanToString(struct SSubplan *pPhyNode, char** str);
/**
* Destroy the physical plan.
* @param pQueryPhyNode
* @return
*/
-void* qDestroyQueryPhyPlan(struct SQueryDistPlanNode* pQueryPhyNode);
-
-/**
- * Create the query job from the physical execution plan
- * @param pPhyNode
- * @param pJob
- * @return
- */
-int32_t qCreateQueryJob(const struct SQueryDistPlanNode* pPhyNode, struct SQueryJob** pJob);
+void* qDestroyQueryDag(struct SQueryDag* pDag);
#ifdef __cplusplus
}
diff --git a/include/libs/scheduler/scheduler.h b/include/libs/scheduler/scheduler.h
index d9653046cf..6b3c9ed021 100644
--- a/include/libs/scheduler/scheduler.h
+++ b/include/libs/scheduler/scheduler.h
@@ -20,7 +20,42 @@
extern "C" {
#endif
-struct SQueryJob;
+typedef struct SQueryProfileSummary {
+ int64_t startTs; // Object created and added into the message queue
+ int64_t endTs; // the timestamp when the task is completed
+ int64_t cputime; // total cpu cost, not execute elapsed time
+
+ int64_t loadRemoteDataDuration; // remote io time
+ int64_t loadNativeDataDuration; // native disk io time
+
+ uint64_t loadNativeData; // blocks + SMA + header files
+ uint64_t loadRemoteData; // remote data acquired by exchange operator.
+
+ uint64_t waitDuration; // the time to waiting to be scheduled in queue does matter, so we need to record it
+ int64_t addQTs; // the time to be added into the message queue, used to calculate the waiting duration in queue.
+
+ uint64_t totalRows;
+ uint64_t loadRows;
+ uint32_t totalBlocks;
+ uint32_t loadBlocks;
+ uint32_t loadBlockAgg;
+ uint32_t skipBlocks;
+ uint64_t resultSize; // generated result size in Kb.
+} SQueryProfileSummary;
+
+typedef struct SQueryTask {
+ uint64_t queryId; // query id
+ uint64_t taskId; // task id
+ char *pSubplan; // operator tree
+ uint64_t status; // task status
+ SQueryProfileSummary summary; // task execution summary
+ void *pOutputHandle; // result buffer handle, to temporarily keep the output result for next stage
+} SQueryTask;
+
+typedef struct SQueryJob {
+ SArray **pSubtasks;
+ // todo
+} SQueryJob;
/**
* Process the query job, generated according to the query physical plan.
diff --git a/include/libs/wal/wal.h b/include/libs/wal/wal.h
index 744275e6ff..e19d65837a 100644
--- a/include/libs/wal/wal.h
+++ b/include/libs/wal/wal.h
@@ -32,23 +32,19 @@ extern int32_t wDebugFlag;
#define wDebug(...) { if (wDebugFlag & DEBUG_DEBUG) { taosPrintLog("WAL ", wDebugFlag, __VA_ARGS__); }}
#define wTrace(...) { if (wDebugFlag & DEBUG_TRACE) { taosPrintLog("WAL ", wDebugFlag, __VA_ARGS__); }}
-#define WAL_PREFIX "wal"
-#define WAL_PREFIX_LEN 3
+#define WAL_HEAD_VER 0
#define WAL_NOSUFFIX_LEN 20
#define WAL_SUFFIX_AT (WAL_NOSUFFIX_LEN+1)
#define WAL_LOG_SUFFIX "log"
#define WAL_INDEX_SUFFIX "idx"
#define WAL_REFRESH_MS 1000
-#define WAL_MAX_SIZE (TSDB_MAX_WAL_SIZE + sizeof(SWalHead) + 16)
+#define WAL_MAX_SIZE (TSDB_MAX_WAL_SIZE + sizeof(SWalHead))
#define WAL_PATH_LEN (TSDB_FILENAME_LEN + 12)
#define WAL_FILE_LEN (WAL_PATH_LEN + 32)
-#define WAL_IDX_ENTRY_SIZE (sizeof(int64_t)*2)
-#define WAL_CUR_POS_WRITABLE 1
-#define WAL_CUR_FILE_WRITABLE 2
-#define WAL_CUR_FAILED 4
+#define WAL_CUR_FAILED 1
-#pragma pack(push,1)
+#pragma pack(push, 1)
typedef enum {
TAOS_WAL_NOLOG = 0,
TAOS_WAL_WRITE = 1,
@@ -56,11 +52,11 @@ typedef enum {
} EWalType;
typedef struct SWalReadHead {
- int8_t sver;
+ int8_t headVer;
uint8_t msgType;
int8_t reserved[2];
int32_t len;
- //int64_t ingestTs; //not implemented
+ int64_t ingestTs; //not implemented
int64_t version;
char body[];
} SWalReadHead;
@@ -72,18 +68,10 @@ typedef struct {
int32_t rollPeriod; // secs
int64_t retentionSize;
int64_t segSize;
- EWalType level; // wal level
+ EWalType level; // wal level
} SWalCfg;
typedef struct {
- //union {
- //uint32_t info;
- //struct {
- //uint32_t sver:3;
- //uint32_t msgtype: 5;
- //uint32_t reserved : 24;
- //};
- //};
uint32_t cksumHead;
uint32_t cksumBody;
SWalReadHead head;
@@ -102,16 +90,16 @@ typedef struct SWal {
SWalCfg cfg;
SWalVer vers;
//file set
- int32_t writeCur;
int64_t writeLogTfd;
int64_t writeIdxTfd;
+ int32_t writeCur;
SArray* fileInfoSet;
- //ctl
- int32_t curStatus;
- int32_t fsyncSeq;
+ //statistics
int64_t totSize;
- int64_t refId;
int64_t lastRollSeq;
+ //ctl
+ int32_t fsyncSeq;
+ int64_t refId;
pthread_mutex_t mutex;
//path
char path[WAL_PATH_LEN];
@@ -131,7 +119,7 @@ typedef struct SWalReadHandle {
} SWalReadHandle;
#pragma pack(pop)
-typedef int32_t (*FWalWrite)(void *ahandle, void *pHead);
+//typedef int32_t (*FWalWrite)(void *ahandle, void *pHead);
// module initialization
int32_t walInit();
@@ -151,8 +139,8 @@ int32_t walCommit(SWal *, int64_t ver);
// truncate after
int32_t walRollback(SWal *, int64_t ver);
// notify that previous logs can be pruned safely
-int32_t walBeginTakeSnapshot(SWal *, int64_t ver);
-int32_t walEndTakeSnapshot(SWal *);
+int32_t walBeginSnapshot(SWal *, int64_t ver);
+int32_t walEndSnapshot(SWal *);
//int32_t walDataCorrupted(SWal*);
// read
@@ -161,7 +149,7 @@ void walCloseReadHandle(SWalReadHandle *);
int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver);
int32_t walRead(SWal *, SWalHead **, int64_t ver);
-int32_t walReadWithFp(SWal *, FWalWrite writeFp, int64_t verStart, int32_t readNum);
+//int32_t walReadWithFp(SWal *, FWalWrite writeFp, int64_t verStart, int32_t readNum);
// lifecycle check
int64_t walGetFirstVer(SWal *);
diff --git a/include/util/tdlist.h b/include/util/tdlist.h
index a19f3bebec..d047a57770 100644
--- a/include/util/tdlist.h
+++ b/include/util/tdlist.h
@@ -58,8 +58,8 @@ extern "C" {
// Double linked list
#define TD_DLIST_NODE(TYPE) \
struct { \
- TYPE *dl_prev_; \
- TYPE *dl_next_; \
+ struct TYPE *dl_prev_; \
+ struct TYPE *dl_next_; \
}
#define TD_DLIST(TYPE) \
diff --git a/include/util/tmacro.h b/include/util/tmacro.h
index 74056cfe07..5cca8a1062 100644
--- a/include/util/tmacro.h
+++ b/include/util/tmacro.h
@@ -29,13 +29,11 @@ extern "C" {
#define TD_MOD_UNCLEARD 0
#define TD_MOD_CLEARD 1
-#define TD_DEF_MOD_INIT_FLAG(MOD) static int8_t MOD##InitFlag = TD_MOD_UNINITIALIZED
-#define TD_DEF_MOD_CLEAR_FLAG(MOD) static int8_t MOD##ClearFlag = TD_MOD_UNCLEARD
+typedef int8_t td_mode_flag_t;
-#define TD_CHECK_AND_SET_MODE_INIT(MOD) \
- atomic_val_compare_exchange_8(&(MOD##InitFlag), TD_MOD_UNINITIALIZED, TD_MOD_INITIALIZED)
+#define TD_CHECK_AND_SET_MODE_INIT(FLAG) atomic_val_compare_exchange_8((FLAG), TD_MOD_UNINITIALIZED, TD_MOD_INITIALIZED)
-#define TD_CHECK_AND_SET_MOD_CLEAR(MOD) atomic_val_compare_exchange_8(&(MOD##ClearFlag), TD_MOD_UNCLEARD, TD_MOD_CLEARD)
+#define TD_CHECK_AND_SET_MOD_CLEAR(FLAG) atomic_val_compare_exchange_8((FLAG), TD_MOD_UNCLEARD, TD_MOD_CLEARD)
#ifdef __cplusplus
}
diff --git a/include/util/tref.h b/include/util/tref.h
index cc7d075f52..6680204d63 100644
--- a/include/util/tref.h
+++ b/include/util/tref.h
@@ -17,6 +17,8 @@
#ifndef _TD_UTIL_REF_H
#define _TD_UTIL_REF_H
+#include "os.h"
+
#ifdef __cplusplus
extern "C" {
#endif
diff --git a/source/dnode/mgmt/impl/test/dnode/dnode.cpp b/source/dnode/mgmt/impl/test/dnode/dnode.cpp
index fba3794f6a..580fe8e131 100644
--- a/source/dnode/mgmt/impl/test/dnode/dnode.cpp
+++ b/source/dnode/mgmt/impl/test/dnode/dnode.cpp
@@ -379,41 +379,41 @@ TEST_F(DndTestDnode, RestartDnode_01) {
const char* fqdn = "localhost";
const char* firstEp = "localhost:9521";
pServer1 = startServer("/tmp/dndTestDnode1", fqdn, 9521, firstEp);
- // pServer1 = startServer("/tmp/dndTestDnode3", fqdn, 9523, firstEp);
- // pServer1 = startServer("/tmp/dndTestDnode4", fqdn, 9524, firstEp);
- // pServer1 = startServer("/tmp/dndTestDnode5", fqdn, 9525, firstEp);
+ pServer3 = startServer("/tmp/dndTestDnode3", fqdn, 9523, firstEp);
+ pServer4 = startServer("/tmp/dndTestDnode4", fqdn, 9524, firstEp);
+ pServer5 = startServer("/tmp/dndTestDnode5", fqdn, 9525, firstEp);
uInfo("all server is running");
- // taosMsleep(1300);
- // SendTheCheckShowMetaMsg(TSDB_MGMT_TABLE_DNODE, "show dnodes", 7);
- // SendThenCheckShowRetrieveMsg(4);
- // CheckInt16(1);
- // CheckInt16(3);
- // CheckInt16(4);
- // CheckInt16(5);
- // CheckBinary("localhost:9521", TSDB_EP_LEN);
- // CheckBinary("localhost:9523", TSDB_EP_LEN);
- // CheckBinary("localhost:9524", TSDB_EP_LEN);
- // CheckBinary("localhost:9525", TSDB_EP_LEN);
- // CheckInt16(0);
- // CheckInt16(0);
- // CheckInt16(0);
- // CheckInt16(0);
- // CheckInt16(1);
- // CheckInt16(1);
- // CheckInt16(1);
- // CheckInt16(1);
- // CheckBinary("ready", 10);
- // CheckBinary("ready", 10);
- // CheckBinary("ready", 10);
- // CheckBinary("ready", 10);
- // CheckTimestamp();
- // CheckTimestamp();
- // CheckTimestamp();
- // CheckTimestamp();
- // CheckBinary("", 24);
- // CheckBinary("", 24);
- // CheckBinary("", 24);
- // CheckBinary("", 24);
+ taosMsleep(1300);
+ SendTheCheckShowMetaMsg(TSDB_MGMT_TABLE_DNODE, "show dnodes", 7);
+ SendThenCheckShowRetrieveMsg(4);
+ CheckInt16(1);
+ CheckInt16(3);
+ CheckInt16(4);
+ CheckInt16(5);
+ CheckBinary("localhost:9521", TSDB_EP_LEN);
+ CheckBinary("localhost:9523", TSDB_EP_LEN);
+ CheckBinary("localhost:9524", TSDB_EP_LEN);
+ CheckBinary("localhost:9525", TSDB_EP_LEN);
+ CheckInt16(0);
+ CheckInt16(0);
+ CheckInt16(0);
+ CheckInt16(0);
+ CheckInt16(1);
+ CheckInt16(1);
+ CheckInt16(1);
+ CheckInt16(1);
+ CheckBinary("ready", 10);
+ CheckBinary("ready", 10);
+ CheckBinary("ready", 10);
+ CheckBinary("ready", 10);
+ CheckTimestamp();
+ CheckTimestamp();
+ CheckTimestamp();
+ CheckTimestamp();
+ CheckBinary("", 24);
+ CheckBinary("", 24);
+ CheckBinary("", 24);
+ CheckBinary("", 24);
}
diff --git a/source/dnode/mgmt/impl/test/sut/deploy.cpp b/source/dnode/mgmt/impl/test/sut/deploy.cpp
index de50899c2d..be1506bccf 100644
--- a/source/dnode/mgmt/impl/test/sut/deploy.cpp
+++ b/source/dnode/mgmt/impl/test/sut/deploy.cpp
@@ -18,7 +18,7 @@
void initLog(const char* path) {
dDebugFlag = 143;
vDebugFlag = 0;
- mDebugFlag = 143;
+ mDebugFlag = 207;
cDebugFlag = 0;
jniDebugFlag = 0;
tmrDebugFlag = 0;
diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h
index 5ddde3181e..0f4839392e 100644
--- a/source/dnode/mnode/impl/inc/mndDef.h
+++ b/source/dnode/mnode/impl/inc/mndDef.h
@@ -73,7 +73,8 @@ typedef enum {
TRN_STAGE_EXECUTE = 2,
TRN_STAGE_COMMIT = 3,
TRN_STAGE_ROLLBACK = 4,
- TRN_STAGE_RETRY = 5
+ TRN_STAGE_RETRY = 5,
+ TRN_STAGE_OVER = 6,
} ETrnStage;
typedef enum { TRN_POLICY_ROLLBACK = 1, TRN_POLICY_RETRY = 2 } ETrnPolicy;
@@ -103,7 +104,6 @@ typedef struct STrans {
int32_t id;
ETrnStage stage;
ETrnPolicy policy;
- SMnode *pMnode;
void *rpcHandle;
SArray *redoLogs;
SArray *undoLogs;
@@ -304,6 +304,7 @@ typedef struct SMnodeMsg {
typedef struct {
int32_t id;
+ int32_t code;
void *rpcHandle;
} STransMsg;
diff --git a/source/dnode/mnode/impl/inc/mndSync.h b/source/dnode/mnode/impl/inc/mndSync.h
index 02ba725be1..fe557cdeac 100644
--- a/source/dnode/mnode/impl/inc/mndSync.h
+++ b/source/dnode/mnode/impl/inc/mndSync.h
@@ -25,7 +25,7 @@ extern "C" {
int32_t mndInitSync(SMnode *pMnode);
void mndCleanupSync(SMnode *pMnode);
bool mndIsMaster(SMnode *pMnode);
-int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, STransMsg *pMsg);
+int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw);
#ifdef __cplusplus
}
diff --git a/source/dnode/mnode/impl/inc/mndTrans.h b/source/dnode/mnode/impl/inc/mndTrans.h
index 878337e4be..5da1d1ca2b 100644
--- a/source/dnode/mnode/impl/inc/mndTrans.h
+++ b/source/dnode/mnode/impl/inc/mndTrans.h
@@ -32,10 +32,10 @@ int32_t mndTransAppendUndolog(STrans *pTrans, SSdbRaw *pRaw);
int32_t mndTransAppendCommitlog(STrans *pTrans, SSdbRaw *pRaw);
int32_t mndTransAppendRedoAction(STrans *pTrans, SEpSet *, void *pMsg);
int32_t mndTransAppendUndoAction(STrans *pTrans, SEpSet *, void *pMsg);
-
-int32_t mndTransPrepare(STrans *pTrans);
+int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans);
void mndTransApply(SMnode *pMnode, SSdbRaw *pRaw, STransMsg *pMsg, int32_t code);
-int32_t mndTransExecute(SSdb *pSdb, int32_t tranId);
+char *mndTransStageStr(ETrnStage stage);
+char *mndTransPolicyStr(ETrnPolicy policy);
#ifdef __cplusplus
}
diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c
index fd02c6e251..b638728647 100644
--- a/source/dnode/mnode/impl/src/mndDb.c
+++ b/source/dnode/mnode/impl/src/mndDb.c
@@ -357,7 +357,7 @@ static int32_t mndCreateDb(SMnode *pMnode, SMnodeMsg *pMsg, SCreateDbMsg *pCreat
}
sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY);
- if (mndTransPrepare(pTrans) != 0) {
+ if (mndTransPrepare(pMnode, pTrans) != 0) {
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
mndTransDrop(pTrans);
return -1;
@@ -491,7 +491,7 @@ static int32_t mndUpdateDb(SMnode *pMnode, SMnodeMsg *pMsg, SDbObj *pOldDb, SDbO
}
sdbSetRawStatus(pUndoRaw, SDB_STATUS_READY);
- if (mndTransPrepare(pTrans) != 0) {
+ if (mndTransPrepare(pMnode, pTrans) != 0) {
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
mndTransDrop(pTrans);
return -1;
@@ -571,7 +571,7 @@ static int32_t mndDropDb(SMnode *pMnode, SMnodeMsg *pMsg, SDbObj *pDb) {
}
sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED);
- if (mndTransPrepare(pTrans) != 0) {
+ if (mndTransPrepare(pMnode, pTrans) != 0) {
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
mndTransDrop(pTrans);
return -1;
diff --git a/source/dnode/mnode/impl/src/mndDnode.c b/source/dnode/mnode/impl/src/mndDnode.c
index cf46d044ca..4ce557fc02 100644
--- a/source/dnode/mnode/impl/src/mndDnode.c
+++ b/source/dnode/mnode/impl/src/mndDnode.c
@@ -413,25 +413,9 @@ static int32_t mndCreateDnode(SMnode *pMnode, SMnodeMsg *pMsg, SCreateDnodeMsg *
mndTransDrop(pTrans);
return -1;
}
- sdbSetRawStatus(pRedoRaw, SDB_STATUS_CREATING);
+ sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY);
- SSdbRaw *pUndoRaw = mndDnodeActionEncode(&dnodeObj);
- if (pUndoRaw == NULL || mndTransAppendUndolog(pTrans, pUndoRaw) != 0) {
- mError("trans:%d, failed to append undo log since %s", pTrans->id, terrstr());
- mndTransDrop(pTrans);
- return -1;
- }
- sdbSetRawStatus(pUndoRaw, SDB_STATUS_DROPPED);
-
- SSdbRaw *pCommitRaw = mndDnodeActionEncode(&dnodeObj);
- if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
- mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
- mndTransDrop(pTrans);
- return -1;
- }
- sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY);
-
- if (mndTransPrepare(pTrans) != 0) {
+ if (mndTransPrepare(pMnode, pTrans) != 0) {
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
mndTransDrop(pTrans);
return -1;
@@ -485,25 +469,9 @@ static int32_t mndDropDnode(SMnode *pMnode, SMnodeMsg *pMsg, SDnodeObj *pDnode)
mndTransDrop(pTrans);
return -1;
}
- sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING);
+ sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPED);
- SSdbRaw *pUndoRaw = mndDnodeActionEncode(pDnode);
- if (pUndoRaw == NULL || mndTransAppendUndolog(pTrans, pUndoRaw) != 0) {
- mError("trans:%d, failed to append undo log since %s", pTrans->id, terrstr());
- mndTransDrop(pTrans);
- return -1;
- }
- sdbSetRawStatus(pUndoRaw, SDB_STATUS_READY);
-
- SSdbRaw *pCommitRaw = mndDnodeActionEncode(pDnode);
- if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
- mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
- mndTransDrop(pTrans);
- return -1;
- }
- sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED);
-
- if (mndTransPrepare(pTrans) != 0) {
+ if (mndTransPrepare(pMnode, pTrans) != 0) {
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
mndTransDrop(pTrans);
return -1;
diff --git a/source/dnode/mnode/impl/src/mndFunc.c b/source/dnode/mnode/impl/src/mndFunc.c
index e407b271fd..3fd7dcfba1 100644
--- a/source/dnode/mnode/impl/src/mndFunc.c
+++ b/source/dnode/mnode/impl/src/mndFunc.c
@@ -183,7 +183,7 @@ static int32_t mndCreateFunc(SMnode *pMnode, SMnodeMsg *pMsg, SCreateFuncMsg *pC
}
sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY);
- if (mndTransPrepare(pTrans) != 0) {
+ if (mndTransPrepare(pMnode, pTrans) != 0) {
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
mndTransDrop(pTrans);
return -1;
@@ -226,7 +226,7 @@ static int32_t mndDropFunc(SMnode *pMnode, SMnodeMsg *pMsg, SFuncObj *pFunc) {
}
sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED);
- if (mndTransPrepare(pTrans) != 0) {
+ if (mndTransPrepare(pMnode, pTrans) != 0) {
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
mndTransDrop(pTrans);
return -1;
diff --git a/source/dnode/mnode/impl/src/mndMnode.c b/source/dnode/mnode/impl/src/mndMnode.c
index 6278e3ffef..e91c51d301 100644
--- a/source/dnode/mnode/impl/src/mndMnode.c
+++ b/source/dnode/mnode/impl/src/mndMnode.c
@@ -238,7 +238,7 @@ static int32_t mndCreateMnode(SMnode *pMnode, SMnodeMsg *pMsg, SCreateMnodeMsg *
}
sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY);
- if (mndTransPrepare(pTrans) != 0) {
+ if (mndTransPrepare(pMnode, pTrans) != 0) {
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
mndTransDrop(pTrans);
return -1;
@@ -313,7 +313,7 @@ static int32_t mndDropMnode(SMnode *pMnode, SMnodeMsg *pMsg, SMnodeObj *pMnodeOb
}
sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED);
- if (mndTransPrepare(pTrans) != 0) {
+ if (mndTransPrepare(pMnode, pTrans) != 0) {
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
mndTransDrop(pTrans);
return -1;
diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c
index c3afbf37c8..63bf186be5 100644
--- a/source/dnode/mnode/impl/src/mndStb.c
+++ b/source/dnode/mnode/impl/src/mndStb.c
@@ -285,7 +285,7 @@ static int32_t mndCreateStb(SMnode *pMnode, SMnodeMsg *pMsg, SCreateStbMsg *pCre
}
sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY);
- if (mndTransPrepare(pTrans) != 0) {
+ if (mndTransPrepare(pMnode, pTrans) != 0) {
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
mndTransDrop(pTrans);
return -1;
@@ -433,7 +433,7 @@ static int32_t mndDropStb(SMnode *pMnode, SMnodeMsg *pMsg, SStbObj *pStb) {
}
sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED);
- if (mndTransPrepare(pTrans) != 0) {
+ if (mndTransPrepare(pMnode, pTrans) != 0) {
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
mndTransDrop(pTrans);
return -1;
diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c
index 6e7ee662f8..59161b32f2 100644
--- a/source/dnode/mnode/impl/src/mndSync.c
+++ b/source/dnode/mnode/impl/src/mndSync.c
@@ -21,16 +21,16 @@
int32_t mndInitSync(SMnode *pMnode) { return 0; }
void mndCleanupSync(SMnode *pMnode) {}
-int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, STransMsg *pMsg) {
+int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw) {
int32_t code = 0;
- int32_t len = sdbGetRawTotalSize(pRaw);
- SSdbRaw *pReceived = calloc(1, len);
- memcpy(pReceived, pRaw, len);
- mDebug("trans:%d, data:%p recv from sync, code:0x%x pMsg:%p", pMsg->id, pReceived, code & 0xFFFF, pMsg);
+ // int32_t len = sdbGetRawTotalSize(pRaw);
+ // SSdbRaw *pReceived = calloc(1, len);
+ // memcpy(pReceived, pRaw, len);
+ // mDebug("trans:%d, data:%p recv from sync, code:0x%x pMsg:%p", pMsg->id, pReceived, code & 0xFFFF, pMsg);
- mndTransApply(pMnode, pReceived, pMsg, code);
- return 0;
+ // mndTransApply(pMnode, pReceived, code);
+ return code;
}
bool mndIsMaster(SMnode *pMnode) { return true; }
\ No newline at end of file
diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c
index 3a53472d45..9ab84d6557 100644
--- a/source/dnode/mnode/impl/src/mndTrans.c
+++ b/source/dnode/mnode/impl/src/mndTrans.c
@@ -17,8 +17,9 @@
#include "mndTrans.h"
#include "mndSync.h"
-#define SDB_TRANS_VER 1
-#define TRN_DEFAULT_ARRAY_SIZE 8
+#define TSDB_TRANS_VER 1
+#define TSDB_TRN_ARRAY_SIZE 8
+#define TSDB_TRN_RESERVE_SIZE 64
static SSdbRaw *mndTransActionEncode(STrans *pTrans);
static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw);
@@ -26,6 +27,22 @@ static int32_t mndTransActionInsert(SSdb *pSdb, STrans *pTrans);
static int32_t mndTransActionUpdate(SSdb *pSdb, STrans *OldTrans, STrans *pOldTrans);
static int32_t mndTransActionDelete(SSdb *pSdb, STrans *pTrans);
+static void mndTransSetRpcHandle(STrans *pTrans, void *rpcHandle);
+static void mndTransSendRpcRsp(STrans *pTrans, int32_t code);
+static int32_t mndTransAppendArray(SArray *pArray, SSdbRaw *pRaw);
+static void mndTransDropArray(SArray *pArray);
+static int32_t mndTransExecuteArray(SMnode *pMnode, SArray *pArray);
+static int32_t mndTransExecuteRedoLogs(SMnode *pMnode, STrans *pTrans);
+static int32_t mndTransExecuteUndoLogs(SMnode *pMnode, STrans *pTrans);
+static int32_t mndTransExecuteCommitLogs(SMnode *pMnode, STrans *pTrans);
+static int32_t mndTransExecuteRedoActions(SMnode *pMnode, STrans *pTrans);
+static int32_t mndTransExecuteUndoActions(SMnode *pMnode, STrans *pTrans);
+static int32_t mndTransPerformPrepareStage(SMnode *pMnode, STrans *pTrans);
+static int32_t mndTransPerformExecuteStage(SMnode *pMnode, STrans *pTrans);
+static int32_t mndTransPerformCommitStage(SMnode *pMnode, STrans *pTrans);
+static int32_t mndTransPerformRollbackStage(SMnode *pMnode, STrans *pTrans);
+static void mndTransExecute(SMnode *pMnode, STrans *pTrans);
+
int32_t mndInitTrans(SMnode *pMnode) {
SSdbTable table = {.sdbType = SDB_TRANS,
.keyType = SDB_KEY_INT32,
@@ -41,7 +58,7 @@ int32_t mndInitTrans(SMnode *pMnode) {
void mndCleanupTrans(SMnode *pMnode) {}
static SSdbRaw *mndTransActionEncode(STrans *pTrans) {
- int32_t rawDataLen = 16 * sizeof(int32_t);
+ int32_t rawDataLen = 16 * sizeof(int32_t) + TSDB_TRN_RESERVE_SIZE;
int32_t redoLogNum = taosArrayGetSize(pTrans->redoLogs);
int32_t undoLogNum = taosArrayGetSize(pTrans->undoLogs);
int32_t commitLogNum = taosArrayGetSize(pTrans->commitLogs);
@@ -63,7 +80,7 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) {
rawDataLen += sdbGetRawTotalSize(pTmp);
}
- SSdbRaw *pRaw = sdbAllocRaw(SDB_TRANS, SDB_TRANS_VER, rawDataLen);
+ SSdbRaw *pRaw = sdbAllocRaw(SDB_TRANS, TSDB_TRANS_VER, rawDataLen);
if (pRaw == NULL) {
mError("trans:%d, failed to alloc raw since %s", pTrans->id, terrstr());
return NULL;
@@ -71,7 +88,6 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) {
int32_t dataPos = 0;
SDB_SET_INT32(pRaw, dataPos, pTrans->id)
- SDB_SET_INT8(pRaw, dataPos, pTrans->stage)
SDB_SET_INT8(pRaw, dataPos, pTrans->policy)
SDB_SET_INT32(pRaw, dataPos, redoLogNum)
SDB_SET_INT32(pRaw, dataPos, undoLogNum)
@@ -100,6 +116,8 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) {
SDB_SET_BINARY(pRaw, dataPos, (void *)pTmp, len)
}
+ SDB_SET_RESERVE(pRaw, dataPos, TSDB_TRN_RESERVE_SIZE)
+ SDB_SET_DATALEN(pRaw, dataPos);
mTrace("trans:%d, encode to raw:%p, len:%d", pTrans->id, pRaw, dataPos);
return pRaw;
}
@@ -113,7 +131,7 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
return NULL;
}
- if (sver != SDB_TRANS_VER) {
+ if (sver != TSDB_TRANS_VER) {
terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
mError("failed to get check soft ver from raw:%p since %s", pRaw, terrstr());
return NULL;
@@ -126,11 +144,11 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
return NULL;
}
- pTrans->redoLogs = taosArrayInit(TRN_DEFAULT_ARRAY_SIZE, sizeof(void *));
- pTrans->undoLogs = taosArrayInit(TRN_DEFAULT_ARRAY_SIZE, sizeof(void *));
- pTrans->commitLogs = taosArrayInit(TRN_DEFAULT_ARRAY_SIZE, sizeof(void *));
- pTrans->redoActions = taosArrayInit(TRN_DEFAULT_ARRAY_SIZE, sizeof(void *));
- pTrans->undoActions = taosArrayInit(TRN_DEFAULT_ARRAY_SIZE, sizeof(void *));
+ pTrans->redoLogs = taosArrayInit(TSDB_TRN_ARRAY_SIZE, sizeof(void *));
+ pTrans->undoLogs = taosArrayInit(TSDB_TRN_ARRAY_SIZE, sizeof(void *));
+ pTrans->commitLogs = taosArrayInit(TSDB_TRN_ARRAY_SIZE, sizeof(void *));
+ pTrans->redoActions = taosArrayInit(TSDB_TRN_ARRAY_SIZE, sizeof(void *));
+ pTrans->undoActions = taosArrayInit(TSDB_TRN_ARRAY_SIZE, sizeof(void *));
if (pTrans->redoLogs == NULL || pTrans->undoLogs == NULL || pTrans->commitLogs == NULL ||
pTrans->redoActions == NULL || pTrans->undoActions == NULL) {
@@ -147,7 +165,6 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
int32_t dataPos = 0;
SDB_GET_INT32(pRaw, pRow, dataPos, &pTrans->id)
- SDB_GET_INT8(pRaw, pRow, dataPos, (int8_t *)&pTrans->stage)
SDB_GET_INT8(pRaw, pRow, dataPos, (int8_t *)&pTrans->policy)
SDB_GET_INT32(pRaw, pRow, dataPos, &redoLogNum)
SDB_GET_INT32(pRaw, pRow, dataPos, &undoLogNum)
@@ -197,6 +214,8 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
}
}
+ SDB_GET_RESERVE(pRaw, pRow, dataPos, TSDB_TRN_RESERVE_SIZE)
+
TRANS_DECODE_OVER:
if (code != 0) {
mError("trans:%d, failed to parse from raw:%p since %s", pTrans->id, pRaw, tstrerror(errno));
@@ -210,64 +229,72 @@ TRANS_DECODE_OVER:
}
static int32_t mndTransActionInsert(SSdb *pSdb, STrans *pTrans) {
- mTrace("trans:%d, perform insert action, stage:%d", pTrans->id, pTrans->stage);
-
- SArray *pArray = pTrans->redoLogs;
- int32_t arraySize = taosArrayGetSize(pArray);
-
- for (int32_t i = 0; i < arraySize; ++i) {
- SSdbRaw *pRaw = taosArrayGetP(pArray, i);
- int32_t code = sdbWrite(pSdb, pRaw);
- if (code != 0) {
- mError("trans:%d, failed to write raw:%p to sdb since %s", pTrans->id, pRaw, terrstr());
- return code;
- }
- }
+ pTrans->stage = TRN_STAGE_PREPARE;
+ mTrace("trans:%d, perform insert action, stage:%s", pTrans->id, mndTransStageStr(pTrans->stage));
return 0;
}
static int32_t mndTransActionDelete(SSdb *pSdb, STrans *pTrans) {
- mTrace("trans:%d, perform delete action, stage:%d", pTrans->id, pTrans->stage);
+ mTrace("trans:%d, perform delete action, stage:%s", pTrans->id, mndTransStageStr(pTrans->stage));
- SArray *pArray = pTrans->undoLogs;
- int32_t arraySize = taosArrayGetSize(pArray);
-
- for (int32_t i = 0; i < arraySize; ++i) {
- SSdbRaw *pRaw = taosArrayGetP(pArray, i);
- int32_t code = sdbWrite(pSdb, pRaw);
- if (code != 0) {
- mError("trans:%d, failed to write raw:%p to sdb since %s", pTrans->id, pRaw, terrstr());
- return code;
- }
- }
+ mndTransDropArray(pTrans->redoLogs);
+ mndTransDropArray(pTrans->undoLogs);
+ mndTransDropArray(pTrans->commitLogs);
+ mndTransDropArray(pTrans->redoActions);
+ mndTransDropArray(pTrans->undoActions);
return 0;
}
static int32_t mndTransActionUpdate(SSdb *pSdb, STrans *pOldTrans, STrans *pNewTrans) {
- mTrace("trans:%d, perform update action, stage:%d", pOldTrans->id, pNewTrans->stage);
-
- SArray *pArray = pOldTrans->commitLogs;
- int32_t arraySize = taosArrayGetSize(pArray);
-
- for (int32_t i = 0; i < arraySize; ++i) {
- SSdbRaw *pRaw = taosArrayGetP(pArray, i);
- int32_t code = sdbWrite(pSdb, pRaw);
- if (code != 0) {
- mError("trans:%d, failed to write raw:%p to sdb since %s", pOldTrans->id, pRaw, terrstr());
- return code;
- }
- }
-
+ mTrace("trans:%d, perform update action, stage:%s", pOldTrans->id, mndTransStageStr(pNewTrans->stage));
pOldTrans->stage = pNewTrans->stage;
return 0;
}
-static int32_t trnGenerateTransId() {
+STrans *mndAcquireTrans(SMnode *pMnode, int32_t transId) {
+ SSdb *pSdb = pMnode->pSdb;
+ return sdbAcquire(pSdb, SDB_TRANS, &transId);
+}
+
+void mndReleaseTrans(SMnode *pMnode, STrans *pTrans) {
+ SSdb *pSdb = pMnode->pSdb;
+ sdbRelease(pSdb, pTrans);
+}
+
+static int32_t mndGenerateTransId() {
static int32_t tmp = 0;
return ++tmp;
}
+char *mndTransStageStr(ETrnStage stage) {
+ switch (stage) {
+ case TRN_STAGE_PREPARE:
+ return "prepare";
+ case TRN_STAGE_EXECUTE:
+ return "execute";
+ case TRN_STAGE_COMMIT:
+ return "commit";
+ case TRN_STAGE_ROLLBACK:
+ return "rollback";
+ case TRN_STAGE_RETRY:
+ return "retry";
+ default:
+ return "undefined";
+ }
+}
+
+char *mndTransPolicyStr(ETrnPolicy policy) {
+ switch (policy) {
+ case TRN_POLICY_ROLLBACK:
+ return "prepare";
+ case TRN_POLICY_RETRY:
+ return "retry";
+ default:
+ return "undefined";
+ }
+}
+
STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, void *rpcHandle) {
STrans *pTrans = calloc(1, sizeof(STrans));
if (pTrans == NULL) {
@@ -276,16 +303,15 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, void *rpcHandle) {
return NULL;
}
- pTrans->id = trnGenerateTransId();
+ pTrans->id = mndGenerateTransId();
pTrans->stage = TRN_STAGE_PREPARE;
pTrans->policy = policy;
- pTrans->pMnode = pMnode;
pTrans->rpcHandle = rpcHandle;
- pTrans->redoLogs = taosArrayInit(TRN_DEFAULT_ARRAY_SIZE, sizeof(void *));
- pTrans->undoLogs = taosArrayInit(TRN_DEFAULT_ARRAY_SIZE, sizeof(void *));
- pTrans->commitLogs = taosArrayInit(TRN_DEFAULT_ARRAY_SIZE, sizeof(void *));
- pTrans->redoActions = taosArrayInit(TRN_DEFAULT_ARRAY_SIZE, sizeof(void *));
- pTrans->undoActions = taosArrayInit(TRN_DEFAULT_ARRAY_SIZE, sizeof(void *));
+ pTrans->redoLogs = taosArrayInit(TSDB_TRN_ARRAY_SIZE, sizeof(void *));
+ pTrans->undoLogs = taosArrayInit(TSDB_TRN_ARRAY_SIZE, sizeof(void *));
+ pTrans->commitLogs = taosArrayInit(TSDB_TRN_ARRAY_SIZE, sizeof(void *));
+ pTrans->redoActions = taosArrayInit(TSDB_TRN_ARRAY_SIZE, sizeof(void *));
+ pTrans->undoActions = taosArrayInit(TSDB_TRN_ARRAY_SIZE, sizeof(void *));
if (pTrans->redoLogs == NULL || pTrans->undoLogs == NULL || pTrans->commitLogs == NULL ||
pTrans->redoActions == NULL || pTrans->undoActions == NULL) {
@@ -298,7 +324,7 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, void *rpcHandle) {
return pTrans;
}
-static void trnDropArray(SArray *pArray) {
+static void mndTransDropArray(SArray *pArray) {
for (int32_t i = 0; i < pArray->size; ++i) {
SSdbRaw *pRaw = taosArrayGetP(pArray, i);
tfree(pRaw);
@@ -308,17 +334,17 @@ static void trnDropArray(SArray *pArray) {
}
void mndTransDrop(STrans *pTrans) {
- trnDropArray(pTrans->redoLogs);
- trnDropArray(pTrans->undoLogs);
- trnDropArray(pTrans->commitLogs);
- trnDropArray(pTrans->redoActions);
- trnDropArray(pTrans->undoActions);
+ mndTransDropArray(pTrans->redoLogs);
+ mndTransDropArray(pTrans->undoLogs);
+ mndTransDropArray(pTrans->commitLogs);
+ mndTransDropArray(pTrans->redoActions);
+ mndTransDropArray(pTrans->undoActions);
mDebug("trans:%d, data:%p is dropped", pTrans->id, pTrans);
tfree(pTrans);
}
-void mndTransSetRpcHandle(STrans *pTrans, void *rpcHandle) {
+static void mndTransSetRpcHandle(STrans *pTrans, void *rpcHandle) {
pTrans->rpcHandle = rpcHandle;
mTrace("trans:%d, set rpc handle:%p", pTrans->id, rpcHandle);
}
@@ -340,19 +366,19 @@ static int32_t mndTransAppendArray(SArray *pArray, SSdbRaw *pRaw) {
int32_t mndTransAppendRedolog(STrans *pTrans, SSdbRaw *pRaw) {
int32_t code = mndTransAppendArray(pTrans->redoLogs, pRaw);
- mTrace("trans:%d, raw:%p append to redo logs, code:%d", pTrans->id, pRaw, code);
+ mTrace("trans:%d, raw:%p append to redo logs, code:0x%x", pTrans->id, pRaw, code);
return code;
}
int32_t mndTransAppendUndolog(STrans *pTrans, SSdbRaw *pRaw) {
int32_t code = mndTransAppendArray(pTrans->undoLogs, pRaw);
- mTrace("trans:%d, raw:%p append to undo logs, code:%d", pTrans->id, pRaw, code);
+ mTrace("trans:%d, raw:%p append to undo logs, code:0x%x", pTrans->id, pRaw, code);
return code;
}
int32_t mndTransAppendCommitlog(STrans *pTrans, SSdbRaw *pRaw) {
int32_t code = mndTransAppendArray(pTrans->commitLogs, pRaw);
- mTrace("trans:%d, raw:%p append to commit logs, code:%d", pTrans->id, pRaw, code);
+ mTrace("trans:%d, raw:%p append to commit logs, code:0x%x", pTrans->id, pRaw, code);
return code;
}
@@ -368,7 +394,7 @@ int32_t mndTransAppendUndoAction(STrans *pTrans, SEpSet *pEpSet, void *pMsg) {
return code;
}
-int32_t mndTransPrepare(STrans *pTrans) {
+int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) {
mDebug("trans:%d, prepare transaction", pTrans->id);
SSdbRaw *pRaw = mndTransActionEncode(pTrans);
@@ -376,180 +402,295 @@ int32_t mndTransPrepare(STrans *pTrans) {
mError("trans:%d, failed to decode trans since %s", pTrans->id, terrstr());
return -1;
}
- sdbSetRawStatus(pRaw, SDB_STATUS_CREATING);
+ sdbSetRawStatus(pRaw, SDB_STATUS_READY);
- if (sdbWriteNotFree(pTrans->pMnode->pSdb, pRaw) != 0) {
- mError("trans:%d, failed to write trans since %s", pTrans->id, terrstr());
- return -1;
- }
-
- STransMsg *pMsg = calloc(1, sizeof(STransMsg));
- pMsg->id = pTrans->id;
- pMsg->rpcHandle = pTrans->rpcHandle;
-
- mDebug("trans:%d, start sync, RPC:%p pMsg:%p", pTrans->id, pTrans->rpcHandle, pMsg);
- if (mndSyncPropose(pTrans->pMnode, pRaw, pMsg) != 0) {
+ mTrace("trans:%d, start sync", pTrans->id);
+ int32_t code = mndSyncPropose(pMnode, pRaw);
+ if (code != 0) {
mError("trans:%d, failed to sync since %s", pTrans->id, terrstr());
- free(pMsg);
sdbFreeRaw(pRaw);
return -1;
}
- sdbFreeRaw(pRaw);
+ mTrace("trans:%d, sync finished", pTrans->id);
+
+ code = sdbWrite(pMnode->pSdb, pRaw);
+ if (code != 0) {
+ mError("trans:%d, failed to write sdb since %s", pTrans->id, terrstr());
+ return -1;
+ }
+
+ STrans *pNewTrans = mndAcquireTrans(pMnode, pTrans->id);
+ if (pNewTrans == NULL) {
+ mError("trans:%d, failed to ready from sdb since %s", pTrans->id, terrstr());
+ return -1;
+ }
+
+ mDebug("trans:%d, prepare finished", pNewTrans->id);
+ pNewTrans->rpcHandle = pTrans->rpcHandle;
+ mndTransExecute(pMnode, pNewTrans);
+ mndReleaseTrans(pMnode, pNewTrans);
return 0;
}
-static void trnSendRpcRsp(STransMsg *pMsg, int32_t code) {
- mDebug("trans:%d, send rpc rsp, RPC:%p code:0x%x pMsg:%p", pMsg->id, pMsg->rpcHandle, code & 0xFFFF, pMsg);
- if (pMsg->rpcHandle != NULL) {
- SRpcMsg rspMsg = {.handle = pMsg->rpcHandle, .code = code};
- rpcSendResponse(&rspMsg);
+int32_t mndTransCommit(SMnode *pMnode, STrans *pTrans) {
+ mDebug("trans:%d, commit transaction", pTrans->id);
+
+ SSdbRaw *pRaw = mndTransActionEncode(pTrans);
+ if (pRaw == NULL) {
+ mError("trans:%d, failed to decode trans since %s", pTrans->id, terrstr());
+ return -1;
+ }
+ sdbSetRawStatus(pRaw, SDB_STATUS_DROPPED);
+
+ if (taosArrayGetSize(pTrans->commitLogs) != 0) {
+ mTrace("trans:%d, start sync", pTrans->id);
+ int32_t code = mndSyncPropose(pMnode, pRaw);
+ if (code != 0) {
+ mError("trans:%d, failed to sync since %s", pTrans->id, terrstr());
+ sdbFreeRaw(pRaw);
+ return -1;
+ }
+
+ mTrace("trans:%d, sync finished", pTrans->id);
+ code = sdbWrite(pMnode->pSdb, pRaw);
+ if (code != 0) {
+ mError("trans:%d, failed to write sdb since %s", pTrans->id, terrstr());
+ return -1;
+ }
}
- free(pMsg);
+ mDebug("trans:%d, commit finished", pTrans->id);
+ return 0;
+}
+
+int32_t mndTransRollback(SMnode *pMnode, STrans *pTrans) {
+ mDebug("trans:%d, rollback transaction", pTrans->id);
+
+ SSdbRaw *pRaw = mndTransActionEncode(pTrans);
+ if (pRaw == NULL) {
+ mError("trans:%d, failed to decode trans since %s", pTrans->id, terrstr());
+ return -1;
+ }
+ sdbSetRawStatus(pRaw, SDB_STATUS_DROPPED);
+
+ mTrace("trans:%d, start sync", pTrans->id);
+ int32_t code = mndSyncPropose(pMnode, pRaw);
+ if (code != 0) {
+ mError("trans:%d, failed to sync since %s", pTrans->id, terrstr());
+ sdbFreeRaw(pRaw);
+ return -1;
+ }
+
+ mTrace("trans:%d, sync finished", pTrans->id);
+ code = sdbWrite(pMnode->pSdb, pRaw);
+ if (code != 0) {
+ mError("trans:%d, failed to write sdb since %s", pTrans->id, terrstr());
+ return -1;
+ }
+
+ mDebug("trans:%d, rollback finished", pTrans->id);
+ return 0;
+}
+
+static void mndTransSendRpcRsp(STrans *pTrans, int32_t code) {
+ if (code == TSDB_CODE_MND_ACTION_IN_PROGRESS) return;
+ mDebug("trans:%d, send rpc rsp, RPC:%p code:0x%x", pTrans->id, pTrans->rpcHandle, code & 0xFFFF);
+
+ if (pTrans->rpcHandle != NULL) {
+ SRpcMsg rspMsg = {.handle = pTrans->rpcHandle, .code = code};
+ rpcSendResponse(&rspMsg);
+ }
}
void mndTransApply(SMnode *pMnode, SSdbRaw *pRaw, STransMsg *pMsg, int32_t code) {
- if (code == 0) {
- mDebug("trans:%d, commit transaction", pMsg->id);
- sdbSetRawStatus(pRaw, SDB_STATUS_READY);
- if (sdbWrite(pMnode->pSdb, pRaw) != 0) {
- code = terrno;
- mError("trans:%d, failed to write sdb while commit since %s", pMsg->id, terrstr());
- }
- trnSendRpcRsp(pMsg, code);
- } else {
- mDebug("trans:%d, rollback transaction", pMsg->id);
- sdbSetRawStatus(pRaw, SDB_STATUS_DROPPED);
- if (sdbWrite(pMnode->pSdb, pRaw) != 0) {
- mError("trans:%d, failed to write sdb while rollback since %s", pMsg->id, terrstr());
- }
- trnSendRpcRsp(pMsg, code);
- }
+ // todo
}
-static int32_t trnExecuteArray(SMnode *pMnode, SArray *pArray) {
- for (int32_t i = 0; i < pArray->size; ++i) {
+static int32_t mndTransExecuteArray(SMnode *pMnode, SArray *pArray) {
+ SSdb *pSdb = pMnode->pSdb;
+ int32_t arraySize = taosArrayGetSize(pArray);
+
+ for (int32_t i = 0; i < arraySize; ++i) {
SSdbRaw *pRaw = taosArrayGetP(pArray, i);
- if (sdbWrite(pMnode->pSdb, pRaw) != 0) {
- return -1;
+ int32_t code = sdbWriteNotFree(pSdb, pRaw);
+ if (code != 0) {
+ return code;
}
}
return 0;
}
-static int32_t trnExecuteRedoLogs(STrans *pTrans) { return trnExecuteArray(pTrans->pMnode, pTrans->redoLogs); }
-
-static int32_t trnExecuteUndoLogs(STrans *pTrans) { return trnExecuteArray(pTrans->pMnode, pTrans->undoLogs); }
-
-static int32_t trnExecuteCommitLogs(STrans *pTrans) { return trnExecuteArray(pTrans->pMnode, pTrans->commitLogs); }
-
-static int32_t trnExecuteRedoActions(STrans *pTrans) { return trnExecuteArray(pTrans->pMnode, pTrans->redoActions); }
-
-static int32_t trnExecuteUndoActions(STrans *pTrans) { return trnExecuteArray(pTrans->pMnode, pTrans->undoActions); }
-
-static int32_t trnPerformPrepareStage(STrans *pTrans) {
- if (trnExecuteRedoLogs(pTrans) == 0) {
- pTrans->stage = TRN_STAGE_EXECUTE;
- return 0;
- } else {
- pTrans->stage = TRN_STAGE_ROLLBACK;
- return -1;
+static int32_t mndTransExecuteRedoLogs(SMnode *pMnode, STrans *pTrans) {
+ int32_t code = 0;
+ if (taosArrayGetSize(pTrans->redoLogs) != 0) {
+ code = mndTransExecuteArray(pMnode, pTrans->redoLogs);
+ if (code != 0) {
+ mError("trans:%d, failed to execute redo logs since %s", pTrans->id, terrstr())
+ } else {
+ mTrace("trans:%d, execute redo logs finished", pTrans->id)
+ }
}
+
+ return code;
}
-static int32_t trnPerformExecuteStage(STrans *pTrans) {
- int32_t code = trnExecuteRedoActions(pTrans);
+static int32_t mndTransExecuteUndoLogs(SMnode *pMnode, STrans *pTrans) {
+ int32_t code = 0;
+ if (taosArrayGetSize(pTrans->undoLogs) != 0) {
+ code = mndTransExecuteArray(pMnode, pTrans->undoLogs);
+ if (code != 0) {
+ mError("trans:%d, failed to execute undo logs since %s", pTrans->id, terrstr())
+ } else {
+ mTrace("trans:%d, execute undo logs finished", pTrans->id)
+ }
+ }
+
+ return code;
+}
+
+static int32_t mndTransExecuteCommitLogs(SMnode *pMnode, STrans *pTrans) {
+ int32_t code = 0;
+ if (taosArrayGetSize(pTrans->commitLogs) != 0) {
+ code = mndTransExecuteArray(pMnode, pTrans->commitLogs);
+ if (code != 0) {
+ mError("trans:%d, failed to execute commit logs since %s", pTrans->id, terrstr())
+ } else {
+ mTrace("trans:%d, execute commit logs finished", pTrans->id)
+ }
+ }
+
+ return code;
+}
+
+static int32_t mndTransExecuteRedoActions(SMnode *pMnode, STrans *pTrans) {
+ if (taosArrayGetSize(pTrans->redoActions) != 0) {
+ mTrace("trans:%d, execute redo actions finished", pTrans->id);
+ }
+ return 0;
+}
+
+static int32_t mndTransExecuteUndoActions(SMnode *pMnode, STrans *pTrans) {
+ if (taosArrayGetSize(pTrans->undoActions) != 0) {
+ mTrace("trans:%d, execute undo actions finished", pTrans->id);
+ }
+ return 0;
+}
+
+static int32_t mndTransPerformPrepareStage(SMnode *pMnode, STrans *pTrans) {
+ int32_t code = mndTransExecuteRedoLogs(pMnode, pTrans);
+
+ if (code == 0) {
+ pTrans->stage = TRN_STAGE_EXECUTE;
+ mTrace("trans:%d, stage from prepare to execute", pTrans->id);
+ } else {
+ pTrans->stage = TRN_STAGE_ROLLBACK;
+ mError("trans:%d, stage from prepare to rollback since %s", pTrans->id, terrstr());
+ }
+
+ return 0;
+}
+
+static int32_t mndTransPerformExecuteStage(SMnode *pMnode, STrans *pTrans) {
+ int32_t code = mndTransExecuteRedoActions(pMnode, pTrans);
if (code == 0) {
pTrans->stage = TRN_STAGE_COMMIT;
- return 0;
+ mTrace("trans:%d, stage from execute to commit", pTrans->id);
} else if (code == TSDB_CODE_MND_ACTION_IN_PROGRESS) {
- return -1;
+ mTrace("trans:%d, stage keep on execute since %s", pTrans->id, terrstr(code));
+ return code;
} else {
- if (pTrans->policy == TRN_POLICY_RETRY) {
- pTrans->stage = TRN_STAGE_RETRY;
- } else {
+ if (pTrans->policy == TRN_POLICY_ROLLBACK) {
pTrans->stage = TRN_STAGE_ROLLBACK;
+ mError("trans:%d, stage from execute to rollback since %s", pTrans->id, terrstr());
+ } else {
+ pTrans->stage = TRN_STAGE_RETRY;
+ mError("trans:%d, stage from execute to retry since %s", pTrans->id, terrstr());
}
- return 0;
}
+
+ return 0;
}
-static int32_t trnPerformCommitStage(STrans *pTrans) {
- if (trnExecuteCommitLogs(pTrans) == 0) {
- pTrans->stage = TRN_STAGE_EXECUTE;
- return 0;
+static int32_t mndTransPerformCommitStage(SMnode *pMnode, STrans *pTrans) {
+ int32_t code = mndTransExecuteCommitLogs(pMnode, pTrans);
+
+ if (code == 0) {
+ pTrans->stage = TRN_STAGE_OVER;
+ mTrace("trans:%d, commit stage finished", pTrans->id);
+ } else {
+ if (pTrans->policy == TRN_POLICY_ROLLBACK) {
+ pTrans->stage = TRN_STAGE_ROLLBACK;
+ mError("trans:%d, stage from commit to rollback since %s", pTrans->id, terrstr());
+ } else {
+ pTrans->stage = TRN_STAGE_RETRY;
+ mError("trans:%d, stage from commit to retry since %s", pTrans->id, terrstr());
+ }
+ }
+
+ return code;
+}
+
+static int32_t mndTransPerformRollbackStage(SMnode *pMnode, STrans *pTrans) {
+ int32_t code = mndTransExecuteUndoActions(pMnode, pTrans);
+
+ if (code == 0) {
+ mTrace("trans:%d, rollbacked", pTrans->id);
} else {
pTrans->stage = TRN_STAGE_ROLLBACK;
- return -1;
+ mError("trans:%d, stage keep on rollback since %s", pTrans->id, terrstr());
}
+
+ return code;
}
-static int32_t trnPerformRollbackStage(STrans *pTrans) {
- if (trnExecuteCommitLogs(pTrans) == 0) {
- pTrans->stage = TRN_STAGE_EXECUTE;
- return 0;
+static int32_t mndTransPerformRetryStage(SMnode *pMnode, STrans *pTrans) {
+ int32_t code = mndTransExecuteRedoActions(pMnode, pTrans);
+
+ if (code == 0) {
+ pTrans->stage = TRN_STAGE_COMMIT;
+ mTrace("trans:%d, stage from retry to commit", pTrans->id);
} else {
- pTrans->stage = TRN_STAGE_ROLLBACK;
- return -1;
+ pTrans->stage = TRN_STAGE_RETRY;
+ mError("trans:%d, stage keep on retry since %s", pTrans->id, terrstr());
}
+
+ return code;
}
-static int32_t trnPerformRetryStage(STrans *pTrans) {
- if (trnExecuteCommitLogs(pTrans) == 0) {
- pTrans->stage = TRN_STAGE_EXECUTE;
- return 0;
- } else {
- pTrans->stage = TRN_STAGE_ROLLBACK;
- return -1;
- }
-}
-
-int32_t mndTransExecute(SSdb *pSdb, int32_t tranId) {
+static void mndTransExecute(SMnode *pMnode, STrans *pTrans) {
int32_t code = 0;
- STrans *pTrans = sdbAcquire(pSdb, SDB_TRANS, &tranId);
- if (pTrans == NULL) {
- return -1;
- }
-
- if (pTrans->stage == TRN_STAGE_PREPARE) {
- if (trnPerformPrepareStage(pTrans) != 0) {
- sdbRelease(pSdb, pTrans);
- return -1;
+ while (code == 0) {
+ switch (pTrans->stage) {
+ case TRN_STAGE_PREPARE:
+ code = mndTransPerformPrepareStage(pMnode, pTrans);
+ break;
+ case TRN_STAGE_EXECUTE:
+ code = mndTransPerformExecuteStage(pMnode, pTrans);
+ break;
+ case TRN_STAGE_COMMIT:
+ code = mndTransCommit(pMnode, pTrans);
+ if (code == 0) {
+ code = mndTransPerformCommitStage(pMnode, pTrans);
+ }
+ break;
+ case TRN_STAGE_ROLLBACK:
+ code = mndTransPerformRollbackStage(pMnode, pTrans);
+ if (code == 0) {
+ code = mndTransRollback(pMnode, pTrans);
+ }
+ break;
+ case TRN_STAGE_RETRY:
+ code = mndTransPerformRetryStage(pMnode, pTrans);
+ break;
+ default:
+ mndTransSendRpcRsp(pTrans, 0);
+ return;
}
}
- if (pTrans->stage == TRN_STAGE_EXECUTE) {
- if (trnPerformExecuteStage(pTrans) != 0) {
- sdbRelease(pSdb, pTrans);
- return -1;
- }
- }
-
- if (pTrans->stage == TRN_STAGE_COMMIT) {
- if (trnPerformCommitStage(pTrans) != 0) {
- sdbRelease(pSdb, pTrans);
- return -1;
- }
- }
-
- if (pTrans->stage == TRN_STAGE_ROLLBACK) {
- if (trnPerformRollbackStage(pTrans) != 0) {
- sdbRelease(pSdb, pTrans);
- return -1;
- }
- }
-
- if (pTrans->stage == TRN_STAGE_RETRY) {
- if (trnPerformRetryStage(pTrans) != 0) {
- sdbRelease(pSdb, pTrans);
- return -1;
- }
- }
-
- sdbRelease(pSdb, pTrans);
- return 0;
-}
\ No newline at end of file
+ mndTransSendRpcRsp(pTrans, code);
+}
diff --git a/source/dnode/mnode/impl/src/mndUser.c b/source/dnode/mnode/impl/src/mndUser.c
index 2111047219..452c14886d 100644
--- a/source/dnode/mnode/impl/src/mndUser.c
+++ b/source/dnode/mnode/impl/src/mndUser.c
@@ -207,25 +207,9 @@ static int32_t mndCreateUser(SMnode *pMnode, char *acct, char *user, char *pass,
mndTransDrop(pTrans);
return -1;
}
- sdbSetRawStatus(pRedoRaw, SDB_STATUS_CREATING);
+ sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY);
- SSdbRaw *pUndoRaw = mndUserActionEncode(&userObj);
- if (pUndoRaw == NULL || mndTransAppendUndolog(pTrans, pUndoRaw) != 0) {
- mError("trans:%d, failed to append undo log since %s", pTrans->id, terrstr());
- mndTransDrop(pTrans);
- return -1;
- }
- sdbSetRawStatus(pUndoRaw, SDB_STATUS_DROPPED);
-
- SSdbRaw *pCommitRaw = mndUserActionEncode(&userObj);
- if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
- mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
- mndTransDrop(pTrans);
- return -1;
- }
- sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY);
-
- if (mndTransPrepare(pTrans) != 0) {
+ if (mndTransPrepare(pMnode, pTrans) != 0) {
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
mndTransDrop(pTrans);
return -1;
@@ -251,15 +235,7 @@ static int32_t mndUpdateUser(SMnode *pMnode, SUserObj *pOldUser, SUserObj *pNewU
}
sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY);
- SSdbRaw *pUndoRaw = mndUserActionEncode(pOldUser);
- if (pUndoRaw == NULL || mndTransAppendUndolog(pTrans, pUndoRaw) != 0) {
- mError("trans:%d, failed to append undo log since %s", pTrans->id, terrstr());
- mndTransDrop(pTrans);
- return -1;
- }
- sdbSetRawStatus(pUndoRaw, SDB_STATUS_READY);
-
- if (mndTransPrepare(pTrans) != 0) {
+ if (mndTransPrepare(pMnode, pTrans) != 0) {
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
mndTransDrop(pTrans);
return -1;
@@ -283,25 +259,9 @@ static int32_t mndDropUser(SMnode *pMnode, SMnodeMsg *pMsg, SUserObj *pUser) {
mndTransDrop(pTrans);
return -1;
}
- sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING);
+ sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPED);
- SSdbRaw *pUndoRaw = mndUserActionEncode(pUser);
- if (pUndoRaw == NULL || mndTransAppendUndolog(pTrans, pUndoRaw) != 0) {
- mError("trans:%d, failed to append undo log since %s", pTrans->id, terrstr());
- mndTransDrop(pTrans);
- return -1;
- }
- sdbSetRawStatus(pUndoRaw, SDB_STATUS_READY);
-
- SSdbRaw *pCommitRaw = mndUserActionEncode(pUser);
- if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
- mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
- mndTransDrop(pTrans);
- return -1;
- }
- sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED);
-
- if (mndTransPrepare(pTrans) != 0) {
+ if (mndTransPrepare(pMnode, pTrans) != 0) {
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
mndTransDrop(pTrans);
return -1;
diff --git a/source/dnode/mnode/sdb/src/sdbHash.c b/source/dnode/mnode/sdb/src/sdbHash.c
index 8d8daf5ce5..c3874caffc 100644
--- a/source/dnode/mnode/sdb/src/sdbHash.c
+++ b/source/dnode/mnode/sdb/src/sdbHash.c
@@ -132,11 +132,6 @@ static int32_t sdbDeleteRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *
taosHashRemove(hash, pOldRow->pObj, keySize);
taosWUnLockLatch(pLock);
- SdbDeleteFp deleteFp = pSdb->deleteFps[pOldRow->type];
- if (deleteFp != NULL) {
- code = (*deleteFp)(pSdb, pOldRow->pObj);
- }
-
sdbRelease(pSdb, pOldRow->pObj);
sdbFreeRow(pRow);
return code;
@@ -161,6 +156,7 @@ int32_t sdbWriteNotFree(SSdb *pSdb, SSdbRaw *pRaw) {
case SDB_STATUS_CREATING:
code = sdbInsertRow(pSdb, hash, pRaw, pRow, keySize);
break;
+ case SDB_STATUS_UPDATING:
case SDB_STATUS_READY:
case SDB_STATUS_DROPPING:
code = sdbUpdateRow(pSdb, hash, pRaw, pRow, keySize);
@@ -228,6 +224,11 @@ void sdbRelease(SSdb *pSdb, void *pObj) {
int32_t ref = atomic_sub_fetch_32(&pRow->refCount, 1);
if (ref <= 0 && pRow->status == SDB_STATUS_DROPPED) {
+ SdbDeleteFp deleteFp = pSdb->deleteFps[pRow->type];
+ if (deleteFp != NULL) {
+ (*deleteFp)(pSdb, pRow->pObj);
+ }
+
sdbFreeRow(pRow);
}
diff --git a/source/dnode/vnode/impl/inc/vnodeBufferPool.h b/source/dnode/vnode/impl/inc/vnodeBufferPool.h
index bfc4de9e12..d96671d2bd 100644
--- a/source/dnode/vnode/impl/inc/vnodeBufferPool.h
+++ b/source/dnode/vnode/impl/inc/vnodeBufferPool.h
@@ -27,6 +27,8 @@ typedef struct SVBufPool SVBufPool;
int vnodeOpenBufPool(SVnode *pVnode);
void vnodeCloseBufPool(SVnode *pVnode);
+int vnodeBufPoolSwitch(SVnode *pVnode);
+int vnodeBufPoolRecycle(SVnode *pVnode);
void *vnodeMalloc(SVnode *pVnode, uint64_t size);
bool vnodeBufPoolIsFull(SVnode *pVnode);
diff --git a/source/dnode/vnode/impl/inc/vnodeCommit.h b/source/dnode/vnode/impl/inc/vnodeCommit.h
index a60e8feac2..031089ba14 100644
--- a/source/dnode/vnode/impl/inc/vnodeCommit.h
+++ b/source/dnode/vnode/impl/inc/vnodeCommit.h
@@ -24,6 +24,7 @@ extern "C" {
#define vnodeShouldCommit vnodeBufPoolIsFull
int vnodeAsyncCommit(SVnode *pVnode);
+int vnodeCommit(void *arg);
#ifdef __cplusplus
}
diff --git a/source/dnode/vnode/impl/inc/vnodeDef.h b/source/dnode/vnode/impl/inc/vnodeDef.h
index e3a3fac6b9..e6a88c7629 100644
--- a/source/dnode/vnode/impl/inc/vnodeDef.h
+++ b/source/dnode/vnode/impl/inc/vnodeDef.h
@@ -21,9 +21,11 @@
#include "tcoding.h"
#include "tdlist.h"
#include "tlockfree.h"
+#include "tmacro.h"
#include "wal.h"
#include "vnode.h"
+
#include "vnodeBufferPool.h"
#include "vnodeCfg.h"
#include "vnodeCommit.h"
@@ -37,6 +39,27 @@
extern "C" {
#endif
+typedef struct SVnodeTask {
+ TD_DLIST_NODE(SVnodeTask);
+ void* arg;
+ int (*execute)(void*);
+} SVnodeTask;
+
+typedef struct SVnodeMgr {
+ td_mode_flag_t vnodeInitFlag;
+ td_mode_flag_t vnodeClearFlag;
+ // For commit
+ bool stop;
+ uint16_t nthreads;
+ pthread_t* threads;
+ pthread_mutex_t mutex;
+ pthread_cond_t hasTask;
+ TD_DLIST(SVnodeTask) queue;
+ // For vnode Mgmt
+} SVnodeMgr;
+
+extern SVnodeMgr vnodeMgr;
+
struct SVnode {
char* path;
SVnodeCfg config;
@@ -50,6 +73,8 @@ struct SVnode {
SVnodeFS* pFs;
};
+int vnodeScheduleTask(SVnodeTask* task);
+
#ifdef __cplusplus
}
#endif
diff --git a/source/dnode/vnode/impl/inc/vnodeMemAllocator.h b/source/dnode/vnode/impl/inc/vnodeMemAllocator.h
index c8c58e9f69..bdafbf31a7 100644
--- a/source/dnode/vnode/impl/inc/vnodeMemAllocator.h
+++ b/source/dnode/vnode/impl/inc/vnodeMemAllocator.h
@@ -33,6 +33,7 @@ struct SVArenaNode {
};
struct SVMemAllocator {
+ T_REF_DECLARE()
TD_DLIST_NODE(SVMemAllocator);
uint64_t capacity;
uint64_t ssize;
diff --git a/source/dnode/vnode/impl/src/vnodeBufferPool.c b/source/dnode/vnode/impl/src/vnodeBufferPool.c
index 1db15c3990..152a346f0a 100644
--- a/source/dnode/vnode/impl/src/vnodeBufferPool.c
+++ b/source/dnode/vnode/impl/src/vnodeBufferPool.c
@@ -19,6 +19,8 @@
#define VNODE_BUF_POOL_SHARDS 3
struct SVBufPool {
+ pthread_mutex_t mutex;
+ pthread_cond_t hasFree;
TD_DLIST(SVMemAllocator) free;
TD_DLIST(SVMemAllocator) incycle;
SVMemAllocator *inuse;
@@ -28,7 +30,6 @@ struct SVBufPool {
int vnodeOpenBufPool(SVnode *pVnode) {
uint64_t capacity;
- // EVMemAllocatorT type = E_V_ARENA_ALLOCATOR;
if ((pVnode->pBufPool = (SVBufPool *)calloc(1, sizeof(SVBufPool))) == NULL) {
/* TODO */
@@ -79,6 +80,27 @@ void vnodeCloseBufPool(SVnode *pVnode) {
}
}
+int vnodeBufPoolSwitch(SVnode *pVnode) {
+ SVMemAllocator *pvma = pVnode->pBufPool->inuse;
+
+ pVnode->pBufPool->inuse = NULL;
+
+ tDListAppend(&(pVnode->pBufPool->incycle), pvma);
+ return 0;
+}
+
+int vnodeBufPoolRecycle(SVnode *pVnode) {
+ SVBufPool * pBufPool = pVnode->pBufPool;
+ SVMemAllocator *pvma = TD_DLIST_HEAD(&(pBufPool->incycle));
+ ASSERT(pvma != NULL);
+
+ tDListPop(&(pBufPool->incycle), pvma);
+ vmaReset(pvma);
+ tDListAppend(&(pBufPool->free), pvma);
+
+ return 0;
+}
+
void *vnodeMalloc(SVnode *pVnode, uint64_t size) {
SVBufPool *pBufPool = pVnode->pBufPool;
@@ -89,6 +111,8 @@ void *vnodeMalloc(SVnode *pVnode, uint64_t size) {
if (pBufPool->inuse) {
tDListPop(&(pBufPool->free), pBufPool->inuse);
break;
+ } else {
+ // tsem_wait(&(pBufPool->hasFree));
}
}
}
diff --git a/source/dnode/vnode/impl/src/vnodeCfg.c b/source/dnode/vnode/impl/src/vnodeCfg.c
index f5bb7e35d2..97c3cc9cee 100644
--- a/source/dnode/vnode/impl/src/vnodeCfg.c
+++ b/source/dnode/vnode/impl/src/vnodeCfg.c
@@ -15,7 +15,8 @@
#include "vnodeDef.h"
-const SVnodeCfg defaultVnodeOptions = {.wsize = 16 * 1024 * 1024, .walCfg = {.level = TAOS_WAL_WRITE}}; /* TODO */
+const SVnodeCfg defaultVnodeOptions = {
+ .wsize = 96 * 1024 * 1024, .ssize = 1 * 1024 * 1024, .lsize = 1024, .walCfg = {.level = TAOS_WAL_WRITE}}; /* TODO */
void vnodeOptionsInit(SVnodeCfg *pVnodeOptions) { /* TODO */
vnodeOptionsCopy(pVnodeOptions, &defaultVnodeOptions);
diff --git a/source/dnode/vnode/impl/src/vnodeCommit.c b/source/dnode/vnode/impl/src/vnodeCommit.c
index cac7999f59..a728de0ebb 100644
--- a/source/dnode/vnode/impl/src/vnodeCommit.c
+++ b/source/dnode/vnode/impl/src/vnodeCommit.c
@@ -19,28 +19,21 @@ static int vnodeStartCommit(SVnode *pVnode);
static int vnodeEndCommit(SVnode *pVnode);
int vnodeAsyncCommit(SVnode *pVnode) {
-#if 0
- if (vnodeStartCommit(pVnode) < 0) {
- // TODO
- }
+ vnodeBufPoolSwitch(pVnode);
+ SVnodeTask *pTask = (SVnodeTask *)malloc(sizeof(*pTask));
- if (tqCommit(pVnode->pTQ) < 0) {
- // TODO
- }
+ pTask->execute = vnodeCommit; // TODO
+ pTask->arg = pVnode; // TODO
- if (metaCommit(pVnode->pMeta) < 0) {
- // TODO
- }
+ vnodeScheduleTask(pTask);
+ return 0;
+}
- if (tsdbCommit(pVnode->pTsdb) < 0) {
- // TODO
- }
+int vnodeCommit(void *arg) {
+ SVnode *pVnode = (SVnode *)arg;
- if (vnodeEndCommit(pVnode) < 0) {
- // TODO
- }
-
-#endif
+ vnodeBufPoolRecycle(pVnode);
+ // TODO
return 0;
}
diff --git a/source/dnode/vnode/impl/src/vnodeMain.c b/source/dnode/vnode/impl/src/vnodeMain.c
index 9b94b4a361..59e3bae5d7 100644
--- a/source/dnode/vnode/impl/src/vnodeMain.c
+++ b/source/dnode/vnode/impl/src/vnodeMain.c
@@ -13,7 +13,6 @@
* along with this program. If not, see .
*/
-#include "tmacro.h"
#include "vnodeDef.h"
static SVnode *vnodeNew(const char *path, const SVnodeCfg *pVnodeCfg);
@@ -21,29 +20,6 @@ static void vnodeFree(SVnode *pVnode);
static int vnodeOpenImpl(SVnode *pVnode);
static void vnodeCloseImpl(SVnode *pVnode);
-TD_DEF_MOD_INIT_FLAG(vnode);
-TD_DEF_MOD_CLEAR_FLAG(vnode);
-
-int vnodeInit() {
- if (TD_CHECK_AND_SET_MODE_INIT(vnode) == TD_MOD_INITIALIZED) {
- return 0;
- }
-
- if (walInit() < 0) {
- return -1;
- }
-
- return 0;
-}
-
-void vnodeClear() {
- if (TD_CHECK_AND_SET_MOD_CLEAR(vnode) == TD_MOD_CLEARD) {
- return;
- }
-
- walCleanUp();
-}
-
SVnode *vnodeOpen(const char *path, const SVnodeCfg *pVnodeCfg) {
SVnode *pVnode = NULL;
diff --git a/source/dnode/vnode/impl/src/vnodeMgr.c b/source/dnode/vnode/impl/src/vnodeMgr.c
new file mode 100644
index 0000000000..964cbe77da
--- /dev/null
+++ b/source/dnode/vnode/impl/src/vnodeMgr.c
@@ -0,0 +1,115 @@
+/*
+ * Copyright (c) 2019 TAOS Data, Inc.
+ *
+ * This program is free software: you can use, redistribute, and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3
+ * or later ("AGPL"), as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+
+#include "vnodeDef.h"
+
+SVnodeMgr vnodeMgr = {.vnodeInitFlag = TD_MOD_UNINITIALIZED, .vnodeClearFlag = TD_MOD_UNCLEARD, .stop = false};
+
+static void* loop(void* arg);
+
+int vnodeInit(uint16_t nthreads) {
+ if (TD_CHECK_AND_SET_MODE_INIT(&(vnodeMgr.vnodeInitFlag)) == TD_MOD_INITIALIZED) {
+ return 0;
+ }
+
+ // Start commit handers
+ if (nthreads > 0) {
+ vnodeMgr.nthreads = nthreads;
+ vnodeMgr.threads = (pthread_t*)calloc(nthreads, sizeof(pthread_t));
+ if (vnodeMgr.threads == NULL) {
+ return -1;
+ }
+
+ pthread_mutex_init(&(vnodeMgr.mutex), NULL);
+ pthread_cond_init(&(vnodeMgr.hasTask), NULL);
+ tDListInit(&(vnodeMgr.queue));
+
+ for (uint16_t i = 0; i < nthreads; i++) {
+ pthread_create(&(vnodeMgr.threads[i]), NULL, loop, NULL);
+ }
+ } else {
+ // TODO: if no commit thread is set, then another mechanism should be
+ // given. Otherwise, it is a false.
+ ASSERT(0);
+ }
+
+ if (walInit() < 0) {
+ return -1;
+ }
+
+ return 0;
+}
+
+void vnodeClear() {
+ if (TD_CHECK_AND_SET_MOD_CLEAR(&(vnodeMgr.vnodeClearFlag)) == TD_MOD_CLEARD) {
+ return;
+ }
+
+ walCleanUp();
+
+ // Stop commit handler
+ pthread_mutex_lock(&(vnodeMgr.mutex));
+ vnodeMgr.stop = true;
+ pthread_cond_broadcast(&(vnodeMgr.hasTask));
+ pthread_mutex_unlock(&(vnodeMgr.mutex));
+
+ for (uint16_t i = 0; i < vnodeMgr.nthreads; i++) {
+ pthread_join(vnodeMgr.threads[i], NULL);
+ }
+
+ tfree(vnodeMgr.threads);
+ pthread_cond_destroy(&(vnodeMgr.hasTask));
+ pthread_mutex_destroy(&(vnodeMgr.mutex));
+}
+
+int vnodeScheduleTask(SVnodeTask* pTask) {
+ pthread_mutex_lock(&(vnodeMgr.mutex));
+
+ tDListAppend(&(vnodeMgr.queue), pTask);
+
+ pthread_cond_signal(&(vnodeMgr.hasTask));
+
+ pthread_mutex_unlock(&(vnodeMgr.mutex));
+
+ return 0;
+}
+
+/* ------------------------ STATIC METHODS ------------------------ */
+static void* loop(void* arg) {
+ SVnodeTask* pTask;
+ for (;;) {
+ pthread_mutex_lock(&(vnodeMgr.mutex));
+ for (;;) {
+ pTask = TD_DLIST_HEAD(&(vnodeMgr.queue));
+ if (pTask == NULL) {
+ if (vnodeMgr.stop) {
+ pthread_mutex_unlock(&(vnodeMgr.mutex));
+ return NULL;
+ } else {
+ pthread_cond_wait(&(vnodeMgr.hasTask), &(vnodeMgr.mutex));
+ }
+ } else {
+ tDListPop(&(vnodeMgr.queue), pTask);
+ break;
+ }
+ }
+
+ pthread_mutex_unlock(&(vnodeMgr.mutex));
+
+ (*(pTask->execute))(pTask->arg);
+ }
+
+ return NULL;
+}
\ No newline at end of file
diff --git a/source/dnode/vnode/impl/test/vnodeApiTests.cpp b/source/dnode/vnode/impl/test/vnodeApiTests.cpp
index ac2ccbc132..a25b04e161 100644
--- a/source/dnode/vnode/impl/test/vnodeApiTests.cpp
+++ b/source/dnode/vnode/impl/test/vnodeApiTests.cpp
@@ -92,7 +92,7 @@ TEST(vnodeApiTest, test_create_table_encode_and_decode_function) {
#endif
TEST(vnodeApiTest, vnodeOpen_vnodeClose_test) {
- GTEST_ASSERT_GE(vnodeInit(), 0);
+ GTEST_ASSERT_GE(vnodeInit(2), 0);
// Create and open a vnode
SVnode *pVnode = vnodeOpen("vnode1", NULL);
diff --git a/source/libs/index/inc/index_fst_automation.h b/source/libs/index/inc/index_fst_automation.h
index c2ab61bf5f..8050b85b08 100644
--- a/source/libs/index/inc/index_fst_automation.h
+++ b/source/libs/index/inc/index_fst_automation.h
@@ -20,6 +20,8 @@ extern "C" {
#endif
#include "index_fst_util.h"
+
+
typedef struct AutomationCtx AutomationCtx;
typedef enum AutomationType {
@@ -38,18 +40,30 @@ typedef struct Complement {
// automation
typedef struct AutomationCtx {
AutomationType type;
- void *data;
+ void *stdata;
+ char *data;
} AutomationCtx;
-
+typedef enum ValueType { FST_INT, FST_CHAR, FST_ARRAY} ValueType;
typedef enum StartWithStateKind { Done, Running } StartWithStateKind;
typedef struct StartWithStateValue {
StartWithStateKind kind;
- void *value;
+ ValueType type;
+ union {
+ int val;
+ char *ptr;
+ SArray *arr;
+ // add more type
+ } ;
} StartWithStateValue;
+StartWithStateValue *startWithStateValueCreate(StartWithStateKind kind, ValueType ty, void *val);
+StartWithStateValue *startWithStateValueDump(StartWithStateValue *sv);
+void startWithStateValueDestroy(void *sv);
+
+
typedef struct AutomationFunc {
void* (*start)(AutomationCtx *ctx) ;
bool (*isMatch)(AutomationCtx *ctx, void *);
@@ -59,7 +73,7 @@ typedef struct AutomationFunc {
void* (*acceptEof)(AutomationCtx *ct, void *state);
} AutomationFunc;
-AutomationCtx *automCtxCreate(void *data, AutomationType type);
+AutomationCtx *automCtxCreate(void *data, AutomationType atype);
void automCtxDestroy(AutomationCtx *ctx);
extern AutomationFunc automFuncs[];
diff --git a/source/libs/index/src/index_fst.c b/source/libs/index/src/index_fst.c
index 07f1e343bd..37bdcb0ecf 100644
--- a/source/libs/index/src/index_fst.c
+++ b/source/libs/index/src/index_fst.c
@@ -17,6 +17,7 @@
#include "tcoding.h"
#include "tchecksum.h"
#include "indexInt.h"
+#include "index_fst_automation.h"
static void fstPackDeltaIn(FstCountingWriter *wrt, CompiledAddr nodeAddr, CompiledAddr transAddr, uint8_t nBytes) {
@@ -1322,6 +1323,7 @@ StreamWithStateResult *streamWithStateNextWith(StreamWithState *sws, StreamCallb
return swsResultCreate(&s, output, callback(start));
}
}
+ SArray *nodes = taosArrayInit(8, sizeof(FstNode *));
while (taosArrayGetSize(sws->stack) > 0) {
StreamState *p = (StreamState *)taosArrayPop(sws->stack);
if (p->trans >= FST_NODE_LEN(p->node) || automFuncs[aut->type].canMatch(aut, p->autState)) {
@@ -1337,8 +1339,8 @@ StreamWithStateResult *streamWithStateNextWith(StreamWithState *sws, StreamCallb
void* nextState = automFuncs[aut->type].accept(aut, p->autState, trn.inp);
void* tState = callback(nextState);
bool isMatch = automFuncs[aut->type].isMatch(aut, nextState);
- //bool isMatch = sws->aut->isMatch(nextState);
FstNode *nextNode = fstGetNode(sws->fst, trn.addr);
+ taosArrayPush(nodes, &nextNode);
taosArrayPush(sws->inp, &(trn.inp));
if (FST_NODE_IS_FINAL(nextNode)) {
@@ -1354,26 +1356,35 @@ StreamWithStateResult *streamWithStateNextWith(StreamWithState *sws, StreamCallb
StreamState s2 = {.node = nextNode, .trans = 0, .out = {.null = false, .out = out}, .autState = nextState};
taosArrayPush(sws->stack, &s2);
- uint8_t *buf = (uint8_t *)malloc(taosArrayGetSize(sws->inp) * sizeof(uint8_t));
- for (uint32_t i = 0; i < taosArrayGetSize(sws->inp); i++) {
- uint8_t *t = (uint8_t *)taosArrayGet(sws->inp, i);
- buf[i] = *t;
+
+ size_t isz = taosArrayGetSize(sws->inp);
+ uint8_t *buf = (uint8_t *)malloc(isz * sizeof(uint8_t));
+ for (uint32_t i = 0; i < isz; i++) {
+ buf[i] = *(uint8_t *)taosArrayGet(sws->inp, i);
}
FstSlice slice = fstSliceCreate(buf, taosArrayGetSize(sws->inp));
if (fstBoundWithDataExceededBy(sws->endAt, &slice)) {
taosArrayDestroyEx(sws->stack, streamStateDestroy);
sws->stack = (SArray *)taosArrayInit(256, sizeof(StreamState));
+ free(buf);
fstSliceDestroy(&slice);
return NULL;
}
if (FST_NODE_IS_FINAL(nextNode) && isMatch) {
FstOutput fOutput = {.null = false, .out = out + FST_NODE_FINAL_OUTPUT(nextNode)};
- StreamWithStateResult *result = swsResultCreate(&slice, fOutput , tState);
+ StreamWithStateResult *result = swsResultCreate(&slice, fOutput, tState);
+ free(buf);
fstSliceDestroy(&slice);
return result;
}
+ free(buf);
fstSliceDestroy(&slice);
}
+ for (size_t i = 0; i < taosArrayGetSize(nodes); i++) {
+ FstNode** node = (FstNode **)taosArrayGet(nodes, i);
+ fstNodeDestroy(*node);
+ }
+ taosArrayDestroy(nodes);
return NULL;
}
@@ -1392,6 +1403,7 @@ void swsResultDestroy(StreamWithStateResult *result) {
if (NULL == result) { return; }
fstSliceDestroy(&result->data);
+ startWithStateValueDestroy(result->state);
free(result);
}
diff --git a/source/libs/index/src/index_fst_automation.c b/source/libs/index/src/index_fst_automation.c
index 6a08b41b12..d905147654 100644
--- a/source/libs/index/src/index_fst_automation.c
+++ b/source/libs/index/src/index_fst_automation.c
@@ -16,21 +16,81 @@
#include "index_fst_automation.h"
+StartWithStateValue *startWithStateValueCreate(StartWithStateKind kind, ValueType ty, void *val) {
+ StartWithStateValue *nsv = calloc(1, sizeof(StartWithStateValue));
+ if (nsv == NULL) { return NULL; }
+
+ nsv->kind = kind;
+ nsv->type = ty;
+ if (ty == FST_INT) {
+ nsv->val = *(int *)val;
+ } else if (ty == FST_CHAR) {
+ size_t len = strlen((char *)val);
+ nsv->ptr = (char *)calloc(1, len + 1);
+ memcpy(nsv->ptr, val, len);
+ } else if (ty == FST_ARRAY) {
+ //TODO,
+ //nsv->arr = taosArrayFromList()
+ }
+ return nsv;
+}
+void startWithStateValueDestroy(void *val) {
+ StartWithStateValue *sv = (StartWithStateValue *)val;
+ if (sv == NULL) { return; }
+
+ if (sv->type == FST_INT) {
+ //
+ } else if (sv->type == FST_CHAR) {
+ free(sv->ptr);
+ } else if (sv->type == FST_ARRAY) {
+ taosArrayDestroy(sv->arr);
+ }
+ free(sv);
+}
+StartWithStateValue *startWithStateValueDump(StartWithStateValue *sv) {
+ StartWithStateValue *nsv = calloc(1, sizeof(StartWithStateValue));
+ if (nsv == NULL) { return NULL; }
+
+ nsv->kind = sv->kind;
+ nsv->type= sv->type;
+ if (nsv->type == FST_INT) {
+ nsv->val = sv->val;
+ } else if (nsv->type == FST_CHAR) {
+ size_t len = strlen(sv->ptr);
+ nsv->ptr = (char *)calloc(1, len + 1);
+ memcpy(nsv->ptr, sv->ptr, len);
+ } else if (nsv->type == FST_ARRAY) {
+ }
+ return nsv;
+}
+
+
// prefix query, impl later
+
static void* prefixStart(AutomationCtx *ctx) {
StartWithStateValue *data = (StartWithStateValue *)(ctx->data);
- return data;
+ return startWithStateValueDump(data);
};
-static bool prefixIsMatch(AutomationCtx *ctx, void *data) {
- return true;
+static bool prefixIsMatch(AutomationCtx *ctx, void *sv) {
+ StartWithStateValue* ssv = (StartWithStateValue *)sv;
+ return ssv->val == strlen(ctx->data);
}
-static bool prefixCanMatch(AutomationCtx *ctx, void *data) {
- return true;
+static bool prefixCanMatch(AutomationCtx *ctx, void *sv) {
+ StartWithStateValue* ssv = (StartWithStateValue *)sv;
+ return ssv->val >= 0;
}
static bool prefixWillAlwaysMatch(AutomationCtx *ctx, void *state) {
return true;
}
static void* prefixAccept(AutomationCtx *ctx, void *state, uint8_t byte) {
+ StartWithStateValue* ssv = (StartWithStateValue *)state;
+ if (ssv == NULL || ctx == NULL) {return NULL;}
+
+ char *data = ctx->data;
+ if ((strlen(data) > ssv->val) && data[ssv->val] == byte) {
+ int val = ssv->val + 1;
+ return startWithStateValueCreate(Running, FST_INT, &val);
+ }
return NULL;
}
static void* prefixAcceptEof(AutomationCtx *ctx, void *state) {
@@ -79,28 +139,33 @@ AutomationFunc automFuncs[] = {{
// add more search type
};
-AutomationCtx* automCtxCreate(void *data, AutomationType type) {
+AutomationCtx* automCtxCreate(void *data,AutomationType atype) {
AutomationCtx *ctx = calloc(1, sizeof(AutomationCtx));
if (ctx == NULL) { return NULL; }
- if (type == AUTOMATION_PREFIX) {
- StartWithStateValue *swsv = (StartWithStateValue *)calloc(1, sizeof(StartWithStateValue));
- swsv->kind = Done;
- swsv->value = NULL;
- ctx->data = (void *)swsv;
- } else if (type == AUTMMATION_MATCH) {
+ StartWithStateValue *sv = NULL;
+ if (atype == AUTOMATION_PREFIX) {
+ sv = startWithStateValueCreate(Running, FST_INT, 0);
+ ctx->stdata = (void *)sv;
+ } else if (atype == AUTMMATION_MATCH) {
} else {
// add more search type
}
- ctx->type = type;
+ char* src = (char *)data;
+ size_t len = strlen(src);
+ char* dst = (char *)malloc(len * sizeof(char) + 1);
+ memcpy(dst, src, len);
+ dst[len] = 0;
+
+ ctx->data = dst;
+ ctx->type = atype;
+ ctx->stdata = (void *)sv;
return ctx;
}
void automCtxDestroy(AutomationCtx *ctx) {
- if (ctx->type == AUTOMATION_PREFIX) {
- free(ctx->data);
- } else if (ctx->type == AUTMMATION_MATCH) {
- }
+ startWithStateValueDestroy(ctx->stdata);
+ free(ctx->data);
free(ctx);
}
diff --git a/source/libs/parser/test/plannerTest.cpp b/source/libs/parser/test/plannerTest.cpp
index bb9271a3c8..a2078defda 100644
--- a/source/libs/parser/test/plannerTest.cpp
+++ b/source/libs/parser/test/plannerTest.cpp
@@ -30,6 +30,7 @@
#include "tdef.h"
#include "tvariant.h"
#include "planner.h"
+#include "../../planner/inc/plannerInt.h"
namespace {
void setSchema(SSchema* p, int32_t type, int32_t bytes, const char* name, int32_t colId) {
@@ -92,10 +93,10 @@ void generateLogicplan(const char* sql) {
ASSERT_EQ(ret, 0);
struct SQueryPlanNode* n = nullptr;
- code = qCreateQueryPlan(pQueryInfo, &n);
+ code = createQueryPlan(pQueryInfo, &n);
char* str = NULL;
- qQueryPlanToString(n, &str);
+ queryPlanToString(n, &str);
printf("--------SQL:%s\n", sql);
printf("%s\n", str);
@@ -155,10 +156,10 @@ TEST(testCase, planner_test) {
ASSERT_EQ(pQueryInfo->fieldsInfo.numOfOutput, 2);
struct SQueryPlanNode* n = nullptr;
- code = qCreateQueryPlan(pQueryInfo, &n);
+ code = createQueryPlan(pQueryInfo, &n);
char* str = NULL;
- qQueryPlanToString(n, &str);
+ queryPlanToString(n, &str);
printf("%s\n", str);
destroyQueryInfo(pQueryInfo);
diff --git a/source/libs/planner/inc/plannerInt.h b/source/libs/planner/inc/plannerInt.h
index 6bd89905b1..2231c93362 100644
--- a/source/libs/planner/inc/plannerInt.h
+++ b/source/libs/planner/inc/plannerInt.h
@@ -25,6 +25,19 @@ extern "C" {
#include "planner.h"
#include "taosmsg.h"
+enum LOGIC_PLAN_E {
+ LP_SCAN = 1,
+ LP_SESSION = 2,
+ LP_STATE = 3,
+ LP_INTERVAL = 4,
+ LP_FILL = 5,
+ LP_AGG = 6,
+ LP_JOIN = 7,
+ LP_PROJECT = 8,
+ LP_DISTINCT = 9,
+ LP_ORDER = 10
+};
+
typedef struct SQueryNodeBasicInfo {
int32_t type; // operator type
char *name; // operator name
@@ -57,50 +70,94 @@ typedef struct SQueryPlanNode {
struct SQueryPlanNode *nextNode;
} SQueryPlanNode;
-typedef struct SQueryDistPlanNode {
+typedef SSchema SSlotSchema;
+
+typedef struct SDataBlockSchema {
+ int32_t index;
+ SSlotSchema *pSchema;
+ int32_t numOfCols; // number of columns
+} SDataBlockSchema;
+
+typedef struct SPhyNode {
SQueryNodeBasicInfo info;
- SSchema *pSchema; // the schema of the input SSDatablock
- int32_t numOfCols; // number of input columns
- SArray *pExpr; // the query functions or sql aggregations
- int32_t numOfExpr; // number of result columns, which is also the number of pExprs
- void *pExtInfo; // additional information
+ SArray *pTargets; // target list to be computed or scanned at this node
+ SArray *pConditions; // implicitly-ANDed qual conditions
+ SDataBlockSchema targetSchema;
+ // children plan to generated result for current node to process
+ // in case of join, multiple plan nodes exist.
+ SArray *pChildren;
+} SPhyNode;
- // previous operator to generated result for current node to process
- // in case of join, multiple prev nodes exist.
- SArray *pPrevNodes; // upstream nodes, or exchange operator to load data from multiple sources.
-} SQueryDistPlanNode;
+typedef struct SScanPhyNode {
+ SPhyNode node;
+ uint64_t uid; // unique id of the table
+} SScanPhyNode;
-typedef struct SQueryCostSummary {
- int64_t startTs; // Object created and added into the message queue
- int64_t endTs; // the timestamp when the task is completed
- int64_t cputime; // total cpu cost, not execute elapsed time
+typedef SScanPhyNode STagScanPhyNode;
- int64_t loadRemoteDataDuration; // remote io time
- int64_t loadNativeDataDuration; // native disk io time
+typedef SScanPhyNode SSystemTableScanPhyNode;
- uint64_t loadNativeData; // blocks + SMA + header files
- uint64_t loadRemoteData; // remote data acquired by exchange operator.
+typedef struct SMultiTableScanPhyNode {
+ SScanPhyNode scan;
+ SArray *pTagsConditions; // implicitly-ANDed tag qual conditions
+} SMultiTableScanPhyNode;
- uint64_t waitDuration; // the time to waiting to be scheduled in queue does matter, so we need to record it
- int64_t addQTs; // the time to be added into the message queue, used to calculate the waiting duration in queue.
+typedef SMultiTableScanPhyNode SMultiTableSeqScanPhyNode;
- uint64_t totalRows;
- uint64_t loadRows;
- uint32_t totalBlocks;
- uint32_t loadBlocks;
- uint32_t loadBlockAgg;
- uint32_t skipBlocks;
- uint64_t resultSize; // generated result size in Kb.
-} SQueryCostSummary;
+typedef struct SProjectPhyNode {
+ SPhyNode node;
+} SProjectPhyNode;
-typedef struct SQueryTask {
- uint64_t queryId; // query id
- uint64_t taskId; // task id
- SQueryDistPlanNode *pNode; // operator tree
- uint64_t status; // task status
- SQueryCostSummary summary; // task execution summary
- void *pOutputHandle; // result buffer handle, to temporarily keep the output result for next stage
-} SQueryTask;
+/**
+ * Optimize the query execution plan, currently not implement yet.
+ * @param pQueryNode
+ * @return
+ */
+int32_t optimizeQueryPlan(struct SQueryPlanNode* pQueryNode);
+
+/**
+ * Create the query plan according to the bound AST, which is in the form of pQueryInfo
+ * @param pQueryInfo
+ * @param pQueryNode
+ * @return
+ */
+int32_t createQueryPlan(const struct SQueryStmtInfo* pQueryInfo, struct SQueryPlanNode** pQueryNode);
+
+/**
+ * Convert the query plan to string, in order to display it in the shell.
+ * @param pQueryNode
+ * @return
+ */
+int32_t queryPlanToString(struct SQueryPlanNode* pQueryNode, char** str);
+
+/**
+ * Restore the SQL statement according to the logic query plan.
+ * @param pQueryNode
+ * @param sql
+ * @return
+ */
+int32_t queryPlanToSql(struct SQueryPlanNode* pQueryNode, char** sql);
+
+/**
+ * Convert to physical plan to string to enable to print it out in the shell.
+ * @param pPhyNode
+ * @param str
+ * @return
+ */
+int32_t phyPlanToString(struct SPhyNode *pPhyNode, char** str);
+
+/**
+ * Destroy the query plan object.
+ * @return
+ */
+void* destroyQueryPlan(struct SQueryPlanNode* pQueryNode);
+
+/**
+ * Destroy the physical plan.
+ * @param pQueryPhyNode
+ * @return
+ */
+void* destroyQueryPhyPlan(struct SPhyNode* pQueryPhyNode);
#ifdef __cplusplus
}
diff --git a/src/tsdb/inc/tsdbCommitQueue.h b/source/libs/planner/src/physicalPlan.c
similarity index 59%
rename from src/tsdb/inc/tsdbCommitQueue.h
rename to source/libs/planner/src/physicalPlan.c
index b690e3bdc2..2bdc159af8 100644
--- a/src/tsdb/inc/tsdbCommitQueue.h
+++ b/source/libs/planner/src/physicalPlan.c
@@ -13,11 +13,24 @@
* along with this program. If not, see .
*/
-#ifndef _TD_TSDB_COMMIT_QUEUE_H_
-#define _TD_TSDB_COMMIT_QUEUE_H_
+#include "plannerInt.h"
-typedef enum { COMMIT_REQ, COMPACT_REQ,COMMIT_CONFIG_REQ } TSDB_REQ_T;
+SPhyNode* createScanNode(SQueryPlanNode* pPlanNode) {
+ return NULL;
+}
-int tsdbScheduleCommit(STsdbRepo *pRepo, TSDB_REQ_T req);
+SPhyNode* createPhyNode(SQueryPlanNode* node) {
+ switch (node->info.type) {
+ case LP_SCAN:
+ return createScanNode(node);
+ }
+ return NULL;
+}
-#endif /* _TD_TSDB_COMMIT_QUEUE_H_ */
\ No newline at end of file
+SPhyNode* createSubplan(SQueryPlanNode* pSubquery) {
+ return NULL;
+}
+
+int32_t createDag(struct SQueryPlanNode* pQueryNode, struct SEpSet* pQnode, struct SQueryDag** pDag) {
+ return 0;
+}
diff --git a/source/libs/planner/src/planner.c b/source/libs/planner/src/planner.c
index 79c7691698..e54b847230 100644
--- a/source/libs/planner/src/planner.c
+++ b/source/libs/planner/src/planner.c
@@ -48,11 +48,11 @@ static SArray* createQueryPlanImpl(SQueryStmtInfo* pQueryInfo);
static void doDestroyQueryNode(SQueryPlanNode* pQueryNode);
int32_t printExprInfo(const char* buf, const SQueryPlanNode* pQueryNode, int32_t len);
-int32_t qOptimizeQueryPlan(struct SQueryPlanNode* pQueryNode) {
+int32_t optimizeQueryPlan(struct SQueryPlanNode* pQueryNode) {
return 0;
}
-int32_t qCreateQueryPlan(const struct SQueryStmtInfo* pQueryInfo, struct SQueryPlanNode** pQueryNode) {
+int32_t createQueryPlan(const struct SQueryStmtInfo* pQueryInfo, struct SQueryPlanNode** pQueryNode) {
SArray* upstream = createQueryPlanImpl((struct SQueryStmtInfo*) pQueryInfo);
assert(taosArrayGetSize(upstream) == 1);
@@ -62,19 +62,20 @@ int32_t qCreateQueryPlan(const struct SQueryStmtInfo* pQueryInfo, struct SQueryP
return TSDB_CODE_SUCCESS;
}
-int32_t qQueryPlanToSql(struct SQueryPlanNode* pQueryNode, char** sql) {
+int32_t queryPlanToSql(struct SQueryPlanNode* pQueryNode, char** sql) {
return 0;
}
-int32_t qCreatePhysicalPlan(struct SQueryPlanNode* pQueryNode, struct SEpSet* pQnode, struct SQueryDistPlanNode *pPhyNode) {
+int32_t qCreatePhysicalPlan(struct SQueryPlanNode* pQueryNode, struct SEpSet* pQnode, struct SQueryDag** pDag) {
+
return 0;
}
-int32_t qPhyPlanToString(struct SQueryDistPlanNode *pPhyNode, char** str) {
+int32_t phyPlanToString(struct SPhyNode *pPhyNode, char** str) {
return 0;
}
-void* qDestroyQueryPlan(SQueryPlanNode* pQueryNode) {
+void* destroyQueryPlan(SQueryPlanNode* pQueryNode) {
if (pQueryNode == NULL) {
return NULL;
}
@@ -83,14 +84,10 @@ void* qDestroyQueryPlan(SQueryPlanNode* pQueryNode) {
return NULL;
}
-void* qDestroyQueryPhyPlan(struct SQueryDistPlanNode* pQueryPhyNode) {
+void* destroyQueryPhyPlan(struct SPhyNode* pQueryPhyNode) {
return NULL;
}
-int32_t qCreateQueryJob(const struct SQueryDistPlanNode* pPhyNode, struct SQueryJob** pJob) {
- return 0;
-}
-
//======================================================================================================================
static SQueryPlanNode* createQueryNode(int32_t type, const char* name, SQueryPlanNode** prev, int32_t numOfPrev,
@@ -619,7 +616,7 @@ int32_t queryPlanToStringImpl(char* buf, SQueryPlanNode* pQueryNode, int32_t lev
return len;
}
-int32_t qQueryPlanToString(struct SQueryPlanNode* pQueryNode, char** str) {
+int32_t queryPlanToString(struct SQueryPlanNode* pQueryNode, char** str) {
assert(pQueryNode);
*str = calloc(1, 4096);
diff --git a/source/libs/wal/inc/walInt.h b/source/libs/wal/inc/walInt.h
index e546a87326..48142878c3 100644
--- a/source/libs/wal/inc/walInt.h
+++ b/source/libs/wal/inc/walInt.h
@@ -33,12 +33,10 @@ typedef struct WalFileInfo {
int64_t fileSize;
} WalFileInfo;
-#pragma pack(push,1)
typedef struct WalIdxEntry {
int64_t ver;
int64_t offset;
} WalIdxEntry;
-#pragma pack(pop)
static inline int32_t compareWalFileInfo(const void* pLeft, const void* pRight) {
WalFileInfo* pInfoLeft = (WalFileInfo*)pLeft;
@@ -107,8 +105,16 @@ static inline uint32_t walCalcBodyCksum(const void* body, uint32_t len) {
return taosCalcChecksum(0, (uint8_t*)body, len);
}
-int walReadMeta(SWal* pWal);
-int walWriteMeta(SWal* pWal);
+static inline void walResetVer(SWalVer* pVer) {
+ pVer->firstVer = -1;
+ pVer->verInSnapshotting = -1;
+ pVer->snapshotVer = -1;
+ pVer->commitVer = -1;
+ pVer->lastVer = -1;
+}
+
+int walLoadMeta(SWal* pWal);
+int walSaveMeta(SWal* pWal);
int walRollFileInfo(SWal* pWal);
char* walMetaSerialize(SWal* pWal);
diff --git a/source/libs/wal/src/walMeta.c b/source/libs/wal/src/walMeta.c
index 49f4fde3a0..aa592b4fe8 100644
--- a/source/libs/wal/src/walMeta.c
+++ b/source/libs/wal/src/walMeta.c
@@ -24,18 +24,22 @@
#include
#include
-int64_t walGetFirstVer(SWal *pWal) {
+int64_t inline walGetFirstVer(SWal *pWal) {
return pWal->vers.firstVer;
}
-int64_t walGetSnaphostVer(SWal *pWal) {
+int64_t inline walGetSnaphostVer(SWal *pWal) {
return pWal->vers.snapshotVer;
}
-int64_t walGetLastVer(SWal *pWal) {
+int64_t inline walGetLastVer(SWal *pWal) {
return pWal->vers.lastVer;
}
+static inline int walBuildMetaName(SWal* pWal, int metaVer, char* buf) {
+ return sprintf(buf, "%s/meta-ver%d", pWal->path, metaVer);
+}
+
int walRollFileInfo(SWal* pWal) {
int64_t ts = taosGetTimestampSec();
@@ -150,10 +154,6 @@ int walMetaDeserialize(SWal* pWal, const char* bytes) {
return 0;
}
-static inline int walBuildMetaName(SWal* pWal, int metaVer, char* buf) {
- return sprintf(buf, "%s/meta-ver%d", pWal->path, metaVer);
-}
-
static int walFindCurMetaVer(SWal* pWal) {
const char * pattern = "^meta-ver[0-9]+$";
regex_t walMetaRegexPattern;
@@ -182,7 +182,7 @@ static int walFindCurMetaVer(SWal* pWal) {
return metaVer;
}
-int walWriteMeta(SWal* pWal) {
+int walSaveMeta(SWal* pWal) {
int metaVer = walFindCurMetaVer(pWal);
char fnameStr[WAL_FILE_LEN];
walBuildMetaName(pWal, metaVer+1, fnameStr);
@@ -207,7 +207,7 @@ int walWriteMeta(SWal* pWal) {
return 0;
}
-int walReadMeta(SWal* pWal) {
+int walLoadMeta(SWal* pWal) {
ASSERT(pWal->fileInfoSet->size == 0);
//find existing meta file
int metaVer = walFindCurMetaVer(pWal);
diff --git a/source/libs/wal/src/walMgmt.c b/source/libs/wal/src/walMgmt.c
index 629451a722..9efeb83cf0 100644
--- a/source/libs/wal/src/walMgmt.c
+++ b/source/libs/wal/src/walMgmt.c
@@ -21,23 +21,17 @@
#include "compare.h"
#include "walInt.h"
-//internal
-int32_t walGetNextFile(SWal *pWal, int64_t *nextFileId);
-int32_t walGetOldFile(SWal *pWal, int64_t curFileId, int32_t minDiff, int64_t *oldFileId);
-int32_t walGetNewFile(SWal *pWal, int64_t *newFileId);
-
typedef struct {
- int32_t refSetId;
- uint32_t seq;
int8_t stop;
int8_t inited;
+ uint32_t seq;
+ int32_t refSetId;
pthread_t thread;
} SWalMgmt;
static SWalMgmt tsWal = {0, .seq = 1};
static int32_t walCreateThread();
static void walStopThread();
-static int32_t walInitObj(SWal *pWal);
static void walFreeObj(void *pWal);
int64_t walGetSeq() {
@@ -68,7 +62,7 @@ int32_t walInit() {
}
void walCleanUp() {
- int old = atomic_val_compare_exchange_8(&tsWal.inited, 1, 0);
+ int8_t old = atomic_val_compare_exchange_8(&tsWal.inited, 1, 0);
if(old == 0) {
return;
}
@@ -83,48 +77,59 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) {
terrno = TAOS_SYSTEM_ERROR(errno);
return NULL;
}
- memset(pWal, 0, sizeof(SWal));
- pWal->writeLogTfd = -1;
- pWal->writeIdxTfd = -1;
- pWal->writeCur = -1;
//set config
memcpy(&pWal->cfg, pCfg, sizeof(SWalCfg));
+ pWal->fsyncSeq = pCfg->fsyncPeriod / 1000;
+ if(pWal->fsyncSeq <= 0) pWal->fsyncSeq = 1;
- //init version info
- pWal->vers.firstVer = -1;
- pWal->vers.commitVer = -1;
- pWal->vers.snapshotVer = -1;
- pWal->vers.lastVer = -1;
+ tstrncpy(pWal->path, path, sizeof(pWal->path));
+ if(taosMkDir(pWal->path) != 0) {
+ wError("vgId:%d, path:%s, failed to create directory since %s", pWal->cfg.vgId, pWal->path, strerror(errno));
+ return NULL;
+ }
- pWal->vers.verInSnapshotting = -1;
-
- pWal->totSize = 0;
+ //open meta
+ pWal->writeLogTfd = -1;
+ pWal->writeIdxTfd = -1;
+ pWal->writeCur = -1;
+ pWal->fileInfoSet = taosArrayInit(8, sizeof(WalFileInfo));
+ if(pWal->fileInfoSet == NULL) {
+ wError("vgId:%d, path:%s, failed to init taosArray %s", pWal->cfg.vgId, pWal->path, strerror(errno));
+ free(pWal);
+ return NULL;
+ }
//init status
+ walResetVer(&pWal->vers);
+ pWal->totSize = 0;
pWal->lastRollSeq = -1;
//init write buffer
memset(&pWal->writeHead, 0, sizeof(SWalHead));
- pWal->writeHead.head.sver = 0;
+ pWal->writeHead.head.headVer = WAL_HEAD_VER;
- tstrncpy(pWal->path, path, sizeof(pWal->path));
- pthread_mutex_init(&pWal->mutex, NULL);
-
- pWal->fsyncSeq = pCfg->fsyncPeriod / 1000;
- if (pWal->fsyncSeq <= 0) pWal->fsyncSeq = 1;
-
- if (walInitObj(pWal) != 0) {
- walFreeObj(pWal);
+ if(pthread_mutex_init(&pWal->mutex, NULL) < 0) {
+ taosArrayDestroy(pWal->fileInfoSet);
+ free(pWal);
return NULL;
}
- pWal->refId = taosAddRef(tsWal.refSetId, pWal);
- if (pWal->refId < 0) {
- walFreeObj(pWal);
+ pWal->refId = taosAddRef(tsWal.refSetId, pWal);
+ if(pWal->refId < 0) {
+ pthread_mutex_destroy(&pWal->mutex);
+ taosArrayDestroy(pWal->fileInfoSet);
+ free(pWal);
+ return NULL;
+ }
+
+ if(walLoadMeta(pWal) < 0) {
+ taosRemoveRef(tsWal.refSetId, pWal->refId);
+ pthread_mutex_destroy(&pWal->mutex);
+ taosArrayDestroy(pWal->fileInfoSet);
+ free(pWal);
return NULL;
}
- walReadMeta(pWal);
wDebug("vgId:%d, wal:%p is opened, level:%d fsyncPeriod:%d", pWal->cfg.vgId, pWal, pWal->cfg.level, pWal->cfg.fsyncPeriod);
@@ -152,43 +157,23 @@ int32_t walAlter(SWal *pWal, SWalCfg *pCfg) {
}
void walClose(SWal *pWal) {
- if (pWal == NULL) return;
-
pthread_mutex_lock(&pWal->mutex);
tfClose(pWal->writeLogTfd);
pWal->writeLogTfd = -1;
tfClose(pWal->writeIdxTfd);
pWal->writeIdxTfd = -1;
- walWriteMeta(pWal);
+ walSaveMeta(pWal);
taosArrayDestroy(pWal->fileInfoSet);
pWal->fileInfoSet = NULL;
pthread_mutex_unlock(&pWal->mutex);
+
taosRemoveRef(tsWal.refSetId, pWal->refId);
}
-static int32_t walInitObj(SWal *pWal) {
- if (taosMkDir(pWal->path) != 0) {
- wError("vgId:%d, path:%s, failed to create directory since %s", pWal->cfg.vgId, pWal->path, strerror(errno));
- return TAOS_SYSTEM_ERROR(errno);
- }
- pWal->fileInfoSet = taosArrayInit(8, sizeof(WalFileInfo));
- if(pWal->fileInfoSet == NULL) {
- wError("vgId:%d, path:%s, failed to init taosArray %s", pWal->cfg.vgId, pWal->path, strerror(errno));
- return TAOS_SYSTEM_ERROR(errno);
- }
-
- wDebug("vgId:%d, object is initialized", pWal->cfg.vgId);
- return 0;
-}
-
static void walFreeObj(void *wal) {
SWal *pWal = wal;
wDebug("vgId:%d, wal:%p is freed", pWal->cfg.vgId, pWal);
- tfClose(pWal->writeLogTfd);
- tfClose(pWal->writeIdxTfd);
- taosArrayDestroy(pWal->fileInfoSet);
- pWal->fileInfoSet = NULL;
pthread_mutex_destroy(&pWal->mutex);
tfree(pWal);
}
diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c
index b6aafedea3..42fcb8c375 100644
--- a/source/libs/wal/src/walRead.c
+++ b/source/libs/wal/src/walRead.c
@@ -54,7 +54,7 @@ static int32_t walReadSeekFilePos(SWalReadHandle *pRead, int64_t fileFirstVer, i
int64_t logTfd = pRead->readLogTfd;
//seek position
- int64_t offset = (ver - fileFirstVer) * WAL_IDX_ENTRY_SIZE;
+ int64_t offset = (ver - fileFirstVer) * sizeof(WalIdxEntry);
code = tfLseek(idxTfd, offset, SEEK_SET);
if(code < 0) {
return -1;
@@ -210,6 +210,6 @@ int32_t walRead(SWal *pWal, SWalHead **ppHead, int64_t ver) {
return 0;
}
-int32_t walReadWithFp(SWal *pWal, FWalWrite writeFp, int64_t verStart, int32_t readNum) {
- return 0;
-}
+/*int32_t walReadWithFp(SWal *pWal, FWalWrite writeFp, int64_t verStart, int32_t readNum) {*/
+ /*return 0;*/
+/*}*/
diff --git a/source/libs/wal/src/walSeek.c b/source/libs/wal/src/walSeek.c
index 953aae703c..7db5b90c1d 100644
--- a/source/libs/wal/src/walSeek.c
+++ b/source/libs/wal/src/walSeek.c
@@ -27,7 +27,7 @@ static int walSeekFilePos(SWal* pWal, int64_t ver) {
int64_t logTfd = pWal->writeLogTfd;
//seek position
- int64_t offset = (ver - walGetCurFileFirstVer(pWal)) * WAL_IDX_ENTRY_SIZE;
+ int64_t offset = (ver - walGetCurFileFirstVer(pWal)) * sizeof(WalIdxEntry);
code = tfLseek(idxTfd, offset, SEEK_SET);
if(code != 0) {
return -1;
@@ -66,8 +66,6 @@ int walChangeFileToLast(SWal *pWal) {
//switch file
pWal->writeIdxTfd = idxTfd;
pWal->writeLogTfd = logTfd;
- //change status
- pWal->curStatus = WAL_CUR_FILE_WRITABLE;
return 0;
}
@@ -93,13 +91,11 @@ int walChangeFile(SWal *pWal, int64_t ver) {
int64_t fileFirstVer = pRet->firstVer;
//closed
if(taosArrayGetLast(pWal->fileInfoSet) != pRet) {
- pWal->curStatus &= ~WAL_CUR_FILE_WRITABLE;
walBuildIdxName(pWal, fileFirstVer, fnameStr);
idxTfd = tfOpenRead(fnameStr);
walBuildLogName(pWal, fileFirstVer, fnameStr);
logTfd = tfOpenRead(fnameStr);
} else {
- pWal->curStatus |= WAL_CUR_FILE_WRITABLE;
walBuildIdxName(pWal, fileFirstVer, fnameStr);
idxTfd = tfOpenReadWrite(fnameStr);
walBuildLogName(pWal, fileFirstVer, fnameStr);
diff --git a/source/libs/wal/src/walUtil.c b/source/libs/wal/src/walUtil.c
deleted file mode 100644
index 849d0c3e51..0000000000
--- a/source/libs/wal/src/walUtil.c
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Copyright (c) 2019 TAOS Data, Inc.
- *
- * This program is free software: you can use, redistribute, and/or modify
- * it under the terms of the GNU Affero General Public License, version 3
- * or later ("AGPL"), as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see .
- */
-
-#define _DEFAULT_SOURCE
-#include "os.h"
-#include "walInt.h"
-
-#if 0
-int32_t walGetNextFile(SWal *pWal, int64_t *nextFileId) {
- int64_t curFileId = *nextFileId;
- int64_t minFileId = INT64_MAX;
-
- DIR *dir = opendir(pWal->path);
- if (dir == NULL) {
- wError("vgId:%d, path:%s, failed to open since %s", pWal->vgId, pWal->path, strerror(errno));
- return -1;
- }
-
- struct dirent *ent;
- while ((ent = readdir(dir)) != NULL) {
- char *name = ent->d_name;
-
- if (strncmp(name, WAL_PREFIX, WAL_PREFIX_LEN) == 0) {
- int64_t id = atoll(name + WAL_PREFIX_LEN);
- if (id <= curFileId) continue;
-
- if (id < minFileId) {
- minFileId = id;
- }
- }
- }
- closedir(dir);
-
- if (minFileId == INT64_MAX) return -1;
-
- *nextFileId = minFileId;
- wTrace("vgId:%d, path:%s, curFileId:%" PRId64 " nextFileId:%" PRId64, pWal->vgId, pWal->path, curFileId, *nextFileId);
-
- return 0;
-}
-
-int32_t walGetOldFile(SWal *pWal, int64_t curFileId, int32_t minDiff, int64_t *oldFileId) {
- int64_t minFileId = INT64_MAX;
-
- DIR *dir = opendir(pWal->path);
- if (dir == NULL) {
- wError("vgId:%d, path:%s, failed to open since %s", pWal->vgId, pWal->path, strerror(errno));
- return -1;
- }
-
- struct dirent *ent;
- while ((ent = readdir(dir)) != NULL) {
- char *name = ent->d_name;
-
- if (strncmp(name, WAL_PREFIX, WAL_PREFIX_LEN) == 0) {
- int64_t id = atoll(name + WAL_PREFIX_LEN);
- if (id >= curFileId) continue;
-
- minDiff--;
- if (id < minFileId) {
- minFileId = id;
- }
- }
- }
- closedir(dir);
-
- if (minFileId == INT64_MAX) return -1;
- if (minDiff > 0) return -1;
-
- *oldFileId = minFileId;
- wTrace("vgId:%d, path:%s, curFileId:%" PRId64 " oldFildId:%" PRId64, pWal->vgId, pWal->path, curFileId, *oldFileId);
-
- return 0;
-}
-
-int32_t walGetNewFile(SWal *pWal, int64_t *newFileId) {
- int64_t maxFileId = INT64_MIN;
-
- DIR *dir = opendir(pWal->path);
- if (dir == NULL) {
- wError("vgId:%d, path:%s, failed to open since %s", pWal->vgId, pWal->path, strerror(errno));
- return -1;
- }
-
- struct dirent *ent;
- while ((ent = readdir(dir)) != NULL) {
- char *name = ent->d_name;
-
- if (strncmp(name, WAL_PREFIX, WAL_PREFIX_LEN) == 0) {
- int64_t id = atoll(name + WAL_PREFIX_LEN);
- if (id > maxFileId) {
- maxFileId = id;
- }
- }
- }
- closedir(dir);
-
- if (maxFileId == INT64_MIN) {
- *newFileId = 0;
- } else {
- *newFileId = maxFileId;
- }
-
- wTrace("vgId:%d, path:%s, newFileId:%" PRId64, pWal->vgId, pWal->path, *newFileId);
-
- return 0;
-}
-#endif
diff --git a/source/libs/wal/src/walWrite.c b/source/libs/wal/src/walWrite.c
index 994b8fc333..ffbb19c6b7 100644
--- a/source/libs/wal/src/walWrite.c
+++ b/source/libs/wal/src/walWrite.c
@@ -21,98 +21,6 @@
#include "tfile.h"
#include "walInt.h"
-
-#if 0
-static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, char *name, int64_t fileId);
-
-int32_t walRenew(void *handle) {
- if (handle == NULL) return 0;
-
- SWal * pWal = handle;
- int32_t code = 0;
-
- /*if (pWal->stop) {*/
- /*wDebug("vgId:%d, do not create a new wal file", pWal->vgId);*/
- /*return 0;*/
- /*}*/
-
- pthread_mutex_lock(&pWal->mutex);
-
- if (tfValid(pWal->logTfd)) {
- tfClose(pWal->logTfd);
- wDebug("vgId:%d, file:%s, it is closed while renew", pWal->vgId, pWal->logName);
- }
-
- /*if (pWal->keep == TAOS_WAL_KEEP) {*/
- /*pWal->fileId = 0;*/
- /*} else {*/
- /*if (walGetNewFile(pWal, &pWal->fileId) != 0) pWal->fileId = 0;*/
- /*pWal->fileId++;*/
- /*}*/
-
- snprintf(pWal->logName, sizeof(pWal->logName), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, pWal->curFileId);
- pWal->logTfd = tfOpenCreateWrite(pWal->logName);
-
- if (!tfValid(pWal->logTfd)) {
- code = TAOS_SYSTEM_ERROR(errno);
- wError("vgId:%d, file:%s, failed to open since %s", pWal->vgId, pWal->logName, strerror(errno));
- } else {
- wDebug("vgId:%d, file:%s, it is created and open while renew", pWal->vgId, pWal->logName);
- }
-
- pthread_mutex_unlock(&pWal->mutex);
-
- return code;
-}
-
-void walRemoveOneOldFile(void *handle) {
- SWal *pWal = handle;
- if (pWal == NULL) return;
- /*if (pWal->keep == TAOS_WAL_KEEP) return;*/
- if (!tfValid(pWal->logTfd)) return;
-
- pthread_mutex_lock(&pWal->mutex);
-
- // remove the oldest wal file
- int64_t oldFileId = -1;
- if (walGetOldFile(pWal, pWal->curFileId, WAL_FILE_NUM, &oldFileId) == 0) {
- char walName[WAL_FILE_LEN] = {0};
- snprintf(walName, sizeof(walName), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, oldFileId);
-
- if (remove(walName) < 0) {
- wError("vgId:%d, file:%s, failed to remove since %s", pWal->vgId, walName, strerror(errno));
- } else {
- wInfo("vgId:%d, file:%s, it is removed", pWal->vgId, walName);
- }
- }
-
- pthread_mutex_unlock(&pWal->mutex);
-}
-
-void walRemoveAllOldFiles(void *handle) {
- if (handle == NULL) return;
-
- SWal * pWal = handle;
- int64_t fileId = -1;
-
- pthread_mutex_lock(&pWal->mutex);
-
- tfClose(pWal->logTfd);
- wDebug("vgId:%d, file:%s, it is closed before remove all wals", pWal->vgId, pWal->logName);
-
- while (walGetNextFile(pWal, &fileId) >= 0) {
- snprintf(pWal->logName, sizeof(pWal->logName), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, fileId);
-
- if (remove(pWal->logName) < 0) {
- wError("vgId:%d, wal:%p file:%s, failed to remove since %s", pWal->vgId, pWal, pWal->logName, strerror(errno));
- } else {
- wInfo("vgId:%d, wal:%p file:%s, it is removed", pWal->vgId, pWal, pWal->logName);
- }
- }
- pthread_mutex_unlock(&pWal->mutex);
-}
-#endif
-
int32_t walCommit(SWal *pWal, int64_t ver) {
ASSERT(pWal->vers.commitVer >= pWal->vers.snapshotVer);
ASSERT(pWal->vers.commitVer <= pWal->vers.lastVer);
@@ -166,7 +74,7 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
pthread_mutex_unlock(&pWal->mutex);
return -1;
}
- int idxOff = (ver - walGetCurFileFirstVer(pWal)) * WAL_IDX_ENTRY_SIZE;
+ int idxOff = (ver - walGetCurFileFirstVer(pWal)) * sizeof(WalIdxEntry);
code = tfLseek(idxTfd, idxOff, SEEK_SET);
if(code < 0) {
pthread_mutex_unlock(&pWal->mutex);
@@ -229,7 +137,7 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
return 0;
}
-int32_t walBeginTakeSnapshot(SWal* pWal, int64_t ver) {
+int32_t walBeginSnapshot(SWal* pWal, int64_t ver) {
pWal->vers.verInSnapshotting = ver;
//check file rolling
if(pWal->cfg.retentionPeriod == 0) {
@@ -239,7 +147,7 @@ int32_t walBeginTakeSnapshot(SWal* pWal, int64_t ver) {
return 0;
}
-int32_t walEndTakeSnapshot(SWal *pWal) {
+int32_t walEndSnapshot(SWal *pWal) {
int64_t ver = pWal->vers.verInSnapshotting;
if(ver == -1) return -1;
@@ -287,7 +195,7 @@ int32_t walEndTakeSnapshot(SWal *pWal) {
pWal->vers.verInSnapshotting = -1;
//save snapshot ver, commit ver
- int code = walWriteMeta(pWal);
+ int code = walSaveMeta(pWal);
if(code != 0) {
return -1;
}
@@ -314,13 +222,13 @@ int walRoll(SWal *pWal) {
int64_t newFileFirstVersion = pWal->vers.lastVer + 1;
char fnameStr[WAL_FILE_LEN];
walBuildIdxName(pWal, newFileFirstVersion, fnameStr);
- idxTfd = tfOpenCreateWrite(fnameStr);
+ idxTfd = tfOpenCreateWriteAppend(fnameStr);
if(idxTfd < 0) {
ASSERT(0);
return -1;
}
walBuildLogName(pWal, newFileFirstVersion, fnameStr);
- logTfd = tfOpenCreateWrite(fnameStr);
+ logTfd = tfOpenCreateWriteAppend(fnameStr);
if(logTfd < 0) {
ASSERT(0);
return -1;
@@ -335,8 +243,6 @@ int walRoll(SWal *pWal) {
pWal->writeIdxTfd = idxTfd;
pWal->writeLogTfd = logTfd;
pWal->writeCur = taosArrayGetSize(pWal->fileInfoSet) - 1;
- //change status
- pWal->curStatus = WAL_CUR_FILE_WRITABLE & WAL_CUR_POS_WRITABLE;
pWal->lastRollSeq = walGetSeq();
return 0;
@@ -425,74 +331,6 @@ void walFsync(SWal *pWal, bool forceFsync) {
}
}
-#if 0
-int32_t walRestore(void *handle, void *pVnode, FWalWrite writeFp) {
- if (handle == NULL) return -1;
-
- SWal * pWal = handle;
- int32_t count = 0;
- int32_t code = 0;
- int64_t fileId = -1;
-
- while ((code = walGetNextFile(pWal, &fileId)) >= 0) {
- /*if (fileId == pWal->curFileId) continue;*/
-
- char walName[WAL_FILE_LEN];
- snprintf(walName, sizeof(pWal->logName), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, fileId);
-
- wInfo("vgId:%d, file:%s, will be restored", pWal->vgId, walName);
- code = walRestoreWalFile(pWal, pVnode, writeFp, walName, fileId);
- if (code != TSDB_CODE_SUCCESS) {
- wError("vgId:%d, file:%s, failed to restore since %s", pWal->vgId, walName, tstrerror(code));
- continue;
- }
-
- wInfo("vgId:%d, file:%s, restore success, wver:%" PRIu64, pWal->vgId, walName, pWal->curVersion);
-
- count++;
- }
-
- /*if (pWal->keep != TAOS_WAL_KEEP) return TSDB_CODE_SUCCESS;*/
-
- if (count == 0) {
- wDebug("vgId:%d, wal file not exist, renew it", pWal->vgId);
- return walRenew(pWal);
- } else {
- // open the existing WAL file in append mode
- /*pWal->curFileId = 0;*/
- snprintf(pWal->logName, sizeof(pWal->logName), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, pWal->curFileId);
- pWal->logTfd = tfOpenCreateWriteAppend(pWal->logName);
- if (!tfValid(pWal->logTfd)) {
- wError("vgId:%d, file:%s, failed to open since %s", pWal->vgId, pWal->logName, strerror(errno));
- return TAOS_SYSTEM_ERROR(errno);
- }
- wDebug("vgId:%d, file:%s, it is created and open while restore", pWal->vgId, pWal->logName);
- }
-
- return TSDB_CODE_SUCCESS;
-}
-
-int32_t walGetWalFile(void *handle, char *fileName, int64_t *fileId) {
- if (handle == NULL) return -1;
- SWal *pWal = handle;
-
- if (*fileId == 0) *fileId = -1;
-
- pthread_mutex_lock(&(pWal->mutex));
-
- int32_t code = walGetNextFile(pWal, fileId);
- if (code >= 0) {
- sprintf(fileName, "wal/%s%" PRId64, WAL_PREFIX, *fileId);
- /*code = (*fileId == pWal->curFileId) ? 0 : 1;*/
- }
-
- wDebug("vgId:%d, get wal file, code:%d curId:%" PRId64 " outId:%" PRId64, pWal->vgId, code, pWal->curFileId, *fileId);
- pthread_mutex_unlock(&(pWal->mutex));
-
- return code;
-}
-#endif
-
/*static int walValidateOffset(SWal* pWal, int64_t ver) {*/
/*int code = 0;*/
/*SWalHead *pHead = NULL;*/
@@ -516,139 +354,3 @@ int32_t walGetWalFile(void *handle, char *fileName, int64_t *fileId) {
/*return 0;*/
/*}*/
-
-#if 0
-static int32_t walSkipCorruptedRecord(SWal *pWal, SWalHead *pHead, int64_t tfd, int64_t *offset) {
- int64_t pos = *offset;
- while (1) {
- pos++;
-
- if (tfLseek(tfd, pos, SEEK_SET) < 0) {
- wError("vgId:%d, failed to seek from corrupted wal file since %s", pWal->vgId, strerror(errno));
- return TSDB_CODE_WAL_FILE_CORRUPTED;
- }
-
- if (tfRead(tfd, pHead, sizeof(SWalHead)) <= 0) {
- wError("vgId:%d, read to end of corrupted wal file, offset:%" PRId64, pWal->vgId, pos);
- return TSDB_CODE_WAL_FILE_CORRUPTED;
- }
-
- if (pHead->signature != WAL_SIGNATURE) {
- continue;
- }
-
- if (pHead->sver >= 1) {
- if (tfRead(tfd, pHead->cont, pHead->len) < pHead->len) {
- wError("vgId:%d, read to end of corrupted wal file, offset:%" PRId64, pWal->vgId, pos);
- return TSDB_CODE_WAL_FILE_CORRUPTED;
- }
-
- if (walValidateChecksum(pHead)) {
- wInfo("vgId:%d, wal whole cksum check passed, offset:%" PRId64, pWal->vgId, pos);
- *offset = pos;
- return TSDB_CODE_SUCCESS;
- }
- }
- }
-
- return TSDB_CODE_WAL_FILE_CORRUPTED;
-}
-
-static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, char *name, int64_t fileId) {
- int32_t size = WAL_MAX_SIZE;
- void * buffer = malloc(size);
- if (buffer == NULL) {
- wError("vgId:%d, file:%s, failed to open for restore since %s", pWal->vgId, name, strerror(errno));
- return TAOS_SYSTEM_ERROR(errno);
- }
-
- int64_t tfd = tfOpenReadWrite(name);
- if (!tfValid(tfd)) {
- wError("vgId:%d, file:%s, failed to open for restore since %s", pWal->vgId, name, strerror(errno));
- tfree(buffer);
- return TAOS_SYSTEM_ERROR(errno);
- } else {
- wDebug("vgId:%d, file:%s, open for restore", pWal->vgId, name);
- }
-
- int32_t code = TSDB_CODE_SUCCESS;
- int64_t offset = 0;
- SWalHead *pHead = buffer;
-
- while (1) {
- int32_t ret = (int32_t)tfRead(tfd, pHead, sizeof(SWalHead));
- if (ret == 0) break;
-
- if (ret < 0) {
- wError("vgId:%d, file:%s, failed to read wal head since %s", pWal->vgId, name, strerror(errno));
- code = TAOS_SYSTEM_ERROR(errno);
- break;
- }
-
- if (ret < sizeof(SWalHead)) {
- wError("vgId:%d, file:%s, failed to read wal head, ret is %d", pWal->vgId, name, ret);
- walFtruncate(pWal, tfd, offset);
- break;
- }
-
- if ((pHead->sver == 0 && !walValidateChecksum(pHead)) || pHead->sver < 0 || pHead->sver > 2) {
- wError("vgId:%d, file:%s, wal head cksum is messed up, hver:%" PRIu64 " len:%d offset:%" PRId64, pWal->vgId, name,
- pHead->version, pHead->len, offset);
- code = walSkipCorruptedRecord(pWal, pHead, tfd, &offset);
- if (code != TSDB_CODE_SUCCESS) {
- walFtruncate(pWal, tfd, offset);
- break;
- }
- }
-
- if (pHead->len < 0 || pHead->len > size - sizeof(SWalHead)) {
- wError("vgId:%d, file:%s, wal head len out of range, hver:%" PRIu64 " len:%d offset:%" PRId64, pWal->vgId, name,
- pHead->version, pHead->len, offset);
- code = walSkipCorruptedRecord(pWal, pHead, tfd, &offset);
- if (code != TSDB_CODE_SUCCESS) {
- walFtruncate(pWal, tfd, offset);
- break;
- }
- }
-
- ret = (int32_t)tfRead(tfd, pHead->cont, pHead->len);
- if (ret < 0) {
- wError("vgId:%d, file:%s, failed to read wal body since %s", pWal->vgId, name, strerror(errno));
- code = TAOS_SYSTEM_ERROR(errno);
- break;
- }
-
- if (ret < pHead->len) {
- wError("vgId:%d, file:%s, failed to read wal body, ret:%d len:%d", pWal->vgId, name, ret, pHead->len);
- offset += sizeof(SWalHead);
- continue;
- }
-
- if ((pHead->sver >= 1) && !walValidateChecksum(pHead)) {
- wError("vgId:%d, file:%s, wal whole cksum is messed up, hver:%" PRIu64 " len:%d offset:%" PRId64, pWal->vgId, name,
- pHead->version, pHead->len, offset);
- code = walSkipCorruptedRecord(pWal, pHead, tfd, &offset);
- if (code != TSDB_CODE_SUCCESS) {
- walFtruncate(pWal, tfd, offset);
- break;
- }
- }
-
- offset = offset + sizeof(SWalHead) + pHead->len;
-
- wTrace("vgId:%d, restore wal, fileId:%" PRId64 " hver:%" PRIu64 " wver:%" PRIu64 " len:%d offset:%" PRId64,
- pWal->vgId, fileId, pHead->version, pWal->curVersion, pHead->len, offset);
-
- pWal->curVersion = pHead->version;
-
- // wInfo("writeFp: %ld", offset);
- (*writeFp)(pVnode, pHead);
- }
-
- tfClose(tfd);
- tfree(buffer);
-
- wDebug("vgId:%d, file:%s, it is closed after restore", pWal->vgId, name);
- return code;
-}
-#endif
diff --git a/source/libs/wal/test/walMetaTest.cpp b/source/libs/wal/test/walMetaTest.cpp
index 200bf39c5a..d06388201e 100644
--- a/source/libs/wal/test/walMetaTest.cpp
+++ b/source/libs/wal/test/walMetaTest.cpp
@@ -142,7 +142,7 @@ TEST_F(WalCleanEnv, serialize) {
char*ss = walMetaSerialize(pWal);
printf("%s\n", ss);
free(ss);
- code = walWriteMeta(pWal);
+ code = walSaveMeta(pWal);
ASSERT(code == 0);
}
@@ -150,11 +150,11 @@ TEST_F(WalCleanEnv, removeOldMeta) {
int code = walRollFileInfo(pWal);
ASSERT(code == 0);
ASSERT(pWal->fileInfoSet != NULL);
- code = walWriteMeta(pWal);
+ code = walSaveMeta(pWal);
ASSERT(code == 0);
code = walRollFileInfo(pWal);
ASSERT(code == 0);
- code = walWriteMeta(pWal);
+ code = walSaveMeta(pWal);
ASSERT(code == 0);
}
@@ -199,7 +199,7 @@ TEST_F(WalCleanEnv, write) {
ASSERT_EQ(code, -1);
ASSERT_EQ(pWal->vers.lastVer, i);
}
- code = walWriteMeta(pWal);
+ code = walSaveMeta(pWal);
ASSERT_EQ(code, 0);
}
@@ -216,7 +216,7 @@ TEST_F(WalCleanEnv, rollback) {
code = walRollback(pWal, 3);
ASSERT_EQ(code, 0);
ASSERT_EQ(pWal->vers.lastVer, 2);
- code = walWriteMeta(pWal);
+ code = walSaveMeta(pWal);
ASSERT_EQ(code, 0);
}
@@ -231,9 +231,9 @@ TEST_F(WalCleanDeleteEnv, roll) {
ASSERT_EQ(pWal->vers.commitVer, i);
}
- walBeginTakeSnapshot(pWal, i-1);
+ walBeginSnapshot(pWal, i-1);
ASSERT_EQ(pWal->vers.verInSnapshotting, i-1);
- walEndTakeSnapshot(pWal);
+ walEndSnapshot(pWal);
ASSERT_EQ(pWal->vers.snapshotVer, i-1);
ASSERT_EQ(pWal->vers.verInSnapshotting, -1);
@@ -247,9 +247,9 @@ TEST_F(WalCleanDeleteEnv, roll) {
ASSERT_EQ(pWal->vers.commitVer, i);
}
- code = walBeginTakeSnapshot(pWal, i - 1);
+ code = walBeginSnapshot(pWal, i - 1);
ASSERT_EQ(code, 0);
- code = walEndTakeSnapshot(pWal);
+ code = walEndSnapshot(pWal);
ASSERT_EQ(code, 0);
}
diff --git a/src/tsdb/src/tsdbCommitQueue.c b/src/tsdb/src/tsdbCommitQueue.c
deleted file mode 100644
index 59fb4f334d..0000000000
--- a/src/tsdb/src/tsdbCommitQueue.c
+++ /dev/null
@@ -1,213 +0,0 @@
-/*
- * Copyright (c) 2019 TAOS Data, Inc.
- *
- * This program is free software: you can use, redistribute, and/or modify
- * it under the terms of the GNU Affero General Public License, version 3
- * or later ("AGPL"), as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see .
- */
-
-#include "tsdbint.h"
-
-typedef struct {
- bool stop;
- pthread_mutex_t lock;
- pthread_cond_t queueNotEmpty;
- int nthreads;
- int refCount;
- SList * queue;
- pthread_t * threads;
-} SCommitQueue;
-
-typedef struct {
- TSDB_REQ_T req;
- STsdbRepo *pRepo;
-} SReq;
-
-static void *tsdbLoopCommit(void *arg);
-
-static SCommitQueue tsCommitQueue = {0};
-
-int tsdbInitCommitQueue() {
- int nthreads = tsNumOfCommitThreads;
- SCommitQueue *pQueue = &tsCommitQueue;
-
- if (nthreads < 1) nthreads = 1;
-
- pQueue->stop = false;
- pQueue->nthreads = nthreads;
-
- pQueue->queue = tdListNew(0);
- if (pQueue->queue == NULL) {
- terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
- return -1;
- }
-
- pQueue->threads = (pthread_t *)calloc(nthreads, sizeof(pthread_t));
- if (pQueue->threads == NULL) {
- terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
- tdListFree(pQueue->queue);
- return -1;
- }
-
- pthread_mutex_init(&(pQueue->lock), NULL);
- pthread_cond_init(&(pQueue->queueNotEmpty), NULL);
-
- for (int i = 0; i < nthreads; i++) {
- pthread_create(pQueue->threads + i, NULL, tsdbLoopCommit, NULL);
- }
-
- return 0;
-}
-
-void tsdbDestroyCommitQueue() {
- SCommitQueue *pQueue = &tsCommitQueue;
-
- pthread_mutex_lock(&(pQueue->lock));
-
- if (pQueue->stop) {
- pthread_mutex_unlock(&(pQueue->lock));
- return;
- }
-
- pQueue->stop = true;
- pthread_cond_broadcast(&(pQueue->queueNotEmpty));
-
- pthread_mutex_unlock(&(pQueue->lock));
-
- for (size_t i = 0; i < pQueue->nthreads; i++) {
- pthread_join(pQueue->threads[i], NULL);
- }
-
- free(pQueue->threads);
- tdListFree(pQueue->queue);
- pthread_cond_destroy(&(pQueue->queueNotEmpty));
- pthread_mutex_destroy(&(pQueue->lock));
-}
-
-int tsdbScheduleCommit(STsdbRepo *pRepo, TSDB_REQ_T req) {
- SCommitQueue *pQueue = &tsCommitQueue;
-
- SListNode *pNode = (SListNode *)calloc(1, sizeof(SListNode) + sizeof(SReq));
- if (pNode == NULL) {
- terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
- return -1;
- }
-
- ((SReq *)pNode->data)->req = req;
- ((SReq *)pNode->data)->pRepo = pRepo;
-
- pthread_mutex_lock(&(pQueue->lock));
-
- // ASSERT(pQueue->stop);
-
- tdListAppendNode(pQueue->queue, pNode);
- pthread_cond_signal(&(pQueue->queueNotEmpty));
-
- pthread_mutex_unlock(&(pQueue->lock));
- return 0;
-}
-
-static void tsdbApplyRepoConfig(STsdbRepo *pRepo) {
- pthread_mutex_lock(&pRepo->save_mutex);
-
- pRepo->config_changed = false;
- STsdbCfg * pSaveCfg = &pRepo->save_config;
- STsdbCfg oldCfg;
- int32_t oldTotalBlocks = pRepo->config.totalBlocks;
-
- memcpy(&oldCfg, &(pRepo->config), sizeof(STsdbCfg));
-
- pRepo->config.compression = pRepo->save_config.compression;
- pRepo->config.keep = pRepo->save_config.keep;
- pRepo->config.keep1 = pRepo->save_config.keep1;
- pRepo->config.keep2 = pRepo->save_config.keep2;
- pRepo->config.cacheLastRow = pRepo->save_config.cacheLastRow;
- pRepo->config.totalBlocks = pRepo->save_config.totalBlocks;
-
- pthread_mutex_unlock(&pRepo->save_mutex);
-
- tsdbInfo("vgId:%d apply new config: compression(%d), keep(%d,%d,%d), totalBlocks(%d), cacheLastRow(%d->%d),totalBlocks(%d->%d)",
- REPO_ID(pRepo),
- pSaveCfg->compression, pSaveCfg->keep,pSaveCfg->keep1, pSaveCfg->keep2,
- pSaveCfg->totalBlocks, oldCfg.cacheLastRow, pSaveCfg->cacheLastRow, oldTotalBlocks, pSaveCfg->totalBlocks);
-
- int err = tsdbExpandPool(pRepo, oldTotalBlocks);
- if (!TAOS_SUCCEEDED(err)) {
- tsdbError("vgId:%d expand pool from %d to %d fail,reason:%s",
- REPO_ID(pRepo), oldTotalBlocks, pSaveCfg->totalBlocks, tstrerror(err));
- }
-
- if (oldCfg.cacheLastRow != pRepo->config.cacheLastRow) {
- if (tsdbLockRepo(pRepo) < 0) return;
- tsdbCacheLastData(pRepo, &oldCfg);
- tsdbUnlockRepo(pRepo);
- }
-
-}
-
-static void *tsdbLoopCommit(void *arg) {
- SCommitQueue *pQueue = &tsCommitQueue;
- SListNode * pNode = NULL;
- STsdbRepo * pRepo = NULL;
- TSDB_REQ_T req;
-
- setThreadName("tsdbCommit");
-
- while (true) {
- pthread_mutex_lock(&(pQueue->lock));
-
- while (true) {
- pNode = tdListPopHead(pQueue->queue);
- if (pNode == NULL) {
- if (pQueue->stop && pQueue->refCount <= 0) {
- pthread_mutex_unlock(&(pQueue->lock));
- goto _exit;
- } else {
- pthread_cond_wait(&(pQueue->queueNotEmpty), &(pQueue->lock));
- }
- } else {
- break;
- }
- }
-
- pthread_mutex_unlock(&(pQueue->lock));
-
- req = ((SReq *)pNode->data)->req;
- pRepo = ((SReq *)pNode->data)->pRepo;
-
- if (req == COMMIT_REQ) {
- tsdbCommitData(pRepo);
- } else if (req == COMPACT_REQ) {
- tsdbCompactImpl(pRepo);
- } else if (req == COMMIT_CONFIG_REQ) {
- ASSERT(pRepo->config_changed);
- tsdbApplyRepoConfig(pRepo);
- tsem_post(&(pRepo->readyToCommit));
- } else {
- ASSERT(0);
- }
-
- listNodeFree(pNode);
- }
-
-_exit:
- return NULL;
-}
-
-void tsdbIncCommitRef(int vgId) {
- int refCount = atomic_add_fetch_32(&tsCommitQueue.refCount, 1);
- tsdbDebug("vgId:%d, inc commit queue ref to %d", vgId, refCount);
-}
-
-void tsdbDecCommitRef(int vgId) {
- int refCount = atomic_sub_fetch_32(&tsCommitQueue.refCount, 1);
- pthread_cond_broadcast(&(tsCommitQueue.queueNotEmpty));
- tsdbDebug("vgId:%d, dec commit queue ref to %d", vgId, refCount);
-}