commit
e75983e9d5
|
@ -4,24 +4,24 @@ sidebar_label: Documentation Home
|
|||
slug: /
|
||||
---
|
||||
|
||||
TDengine is a [high-performance](https://tdengine.com/fast), [scalable](https://tdengine.com/scalable) time series database with [SQL support](https://tdengine.com/sql-support). This document is the TDengine user manual. It introduces the basic concepts, installation, features, SQL, APIs, operation, maintenance, kernel design, etc. It’s written mainly for architects, developers and system administrators.
|
||||
TDengine is a [high-performance](https://tdengine.com/fast), [scalable](https://tdengine.com/scalable) time series database with [SQL support](https://tdengine.com/sql-support). This document is the TDengine user manual. It introduces the basic, as well as novel concepts, in TDengine, and also talks in detail about installation, features, SQL, APIs, operation, maintenance, kernel design and other topics. It’s written mainly for architects, developers and system administrators.
|
||||
|
||||
To get a global view about TDengine, like feature list, benchmarks, and competitive advantages, please browse through section [Introduction](./intro).
|
||||
To get an overview of TDengine, such as a feature list, benchmarks, and competitive advantages, please browse through the [Introduction](./intro) section.
|
||||
|
||||
TDengine makes full use of the characteristics of time series data, proposes the concepts of "one table for one data collection point" and "super table", and designs an innovative storage engine, which greatly improves the efficiency of data ingestion, querying and storage. To understand the new concepts and use TDengine in the right way, please read [“Concepts”](./concept) thoroughly.
|
||||
TDengine greatly improves the efficiency of data ingestion, querying and storage by exploiting the characteristics of time series data, introducing the novel concepts of "one table for one data collection point" and "super table", and designing an innovative storage engine. To understand the new concepts in TDengine and make full use of the features and capabilities of TDengine, please read [“Concepts”](./concept) thoroughly.
|
||||
|
||||
If you are a developer, please read the [“Developer Guide”](./develop) carefully. This section introduces the database connection, data modeling, data ingestion, query, continuous query, cache, data subscription, user-defined function, etc. in detail. Sample code is provided for a variety of programming languages. In most cases, you can just copy and paste the sample code, make a few changes to accommodate your application, and it will work.
|
||||
If you are a developer, please read the [“Developer Guide”](./develop) carefully. This section introduces the database connection, data modeling, data ingestion, query, continuous query, cache, data subscription, user-defined functions, and other functionality in detail. Sample code is provided for a variety of programming languages. In most cases, you can just copy and paste the sample code, make a few changes to accommodate your application, and it will work.
|
||||
|
||||
We live in the era of big data, and scale-up is unable to meet the growing business needs. Any modern data system must have the ability to scale out, and clustering has become an indispensable feature of big data systems. The TDengine team has not only developed the cluster feature, they also decided to open source this important feature. To learn how to deploy, manage and maintain a TDengine cluster please refer to ["Cluster"](./cluster).
|
||||
We live in the era of big data, and scale-up is unable to meet the growing needs of business. Any modern data system must have the ability to scale out, and clustering has become an indispensable feature of big data systems. Not only did the TDengine team develop the cluster feature, but also decided to open source this important feature. To learn how to deploy, manage and maintain a TDengine cluster please refer to ["cluster"](./cluster).
|
||||
|
||||
TDengine uses SQL as its query language, which greatly reduces learning costs and migration costs. In addition to the standard SQL, TDengine has extensions to support time series data scenarios better, such as roll up, interpolation, time weighted average, etc. The ["SQL Reference"](./taos-sql) chapter describes the SQL syntax in detail, and lists the various supported commands and functions.
|
||||
TDengine uses ubiquitious SQL as its query language, which greatly reduces learning costs and migration costs. In addition to the standard SQL, TDengine has extensions to better support time series data analysis. These extensions include functions such as roll up, interpolation and time weighted average, among many others. The ["SQL Reference"](./taos-sql) chapter describes the SQL syntax in detail, and lists the various supported commands and functions.
|
||||
|
||||
If you are a system administrator who cares about installation, upgrade, fault tolerance, disaster recovery, data import, data export, system configuration, how to monitor whether TDengine is running healthily, and how to improve system performance, please refer to the ["Administration"](./operation) thoroughly.
|
||||
If you are a system administrator who cares about installation, upgrade, fault tolerance, disaster recovery, data import, data export, system configuration, how to monitor whether TDengine is running healthily, and how to improve system performance, please refer to, and thoroughly read the ["Administration"](./operation) section.
|
||||
|
||||
If you want to know more about TDengine tools, REST API, and connectors for various programming languages, please see the ["Reference"](./reference) chapter.
|
||||
If you want to know more about TDengine tools, the REST API, and connectors for various programming languages, please see the ["Reference"](./reference) chapter.
|
||||
|
||||
If you are very interested in the internal design of TDengine, please read the chapter ["Inside TDengine”](./tdinternal), which introduces the cluster design, data partitioning, sharding, writing, and reading processes in detail. If you want to study TDengine code or even contribute code, please read this chapter carefully.
|
||||
|
||||
TDengine is an open source database, you are welcome to be a part of TDengine. If you find any errors in the documentation, or the description is not clear, please click "Edit this page" at the bottom of each page to edit it directly.
|
||||
TDengine is an open source database, and we would love for you to be a part of TDengine. If you find any errors in the documentation, or see parts where more clarity or elaboration is needed, please click "Edit this page" at the bottom of each page to edit it directly.
|
||||
|
||||
Together, we make a difference.
|
||||
|
|
|
@ -96,6 +96,7 @@ extern char *qtypeStr[];
|
|||
#define TSDB_PORT_HTTP 11
|
||||
|
||||
#undef TD_DEBUG_PRINT_ROW
|
||||
#undef TD_DEBUG_PRINT_TSDB_LOAD_DCOLS
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -158,6 +158,7 @@ enum {
|
|||
TD_DEF_MSG_TYPE(TDMT_MND_DROP_INDEX, "mnode-drop-index", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_GET_DB_CFG, "mnode-get-db-cfg", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_GET_INDEX, "mnode-get-index", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_APPLY_MSG, "mnode-apply-msg", NULL, NULL)
|
||||
|
||||
// Requests handled by VNODE
|
||||
TD_NEW_MSG_SEG(TDMT_VND_MSG)
|
||||
|
|
|
@ -89,6 +89,7 @@ int32_t mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad);
|
|||
* @return int32_t 0 for success, -1 for failure.
|
||||
*/
|
||||
int32_t mndProcessMsg(SRpcMsg *pMsg);
|
||||
int32_t mndProcessSyncMsg(SRpcMsg *pMsg);
|
||||
|
||||
/**
|
||||
* @brief Generate machine code
|
||||
|
|
|
@ -304,13 +304,16 @@ int32_t sdbGetMaxId(SSdb *pSdb, ESdbType type);
|
|||
int64_t sdbGetTableVer(SSdb *pSdb, ESdbType type);
|
||||
|
||||
/**
|
||||
* @brief Update the version of sdb
|
||||
* @brief Update the index of sdb
|
||||
*
|
||||
* @param pSdb The sdb object.
|
||||
* @param val The update value of the version.
|
||||
* @return int32_t The current version of sdb
|
||||
* @param index The update value of the apply index.
|
||||
* @return int32_t The current index of sdb
|
||||
*/
|
||||
int64_t sdbUpdateVer(SSdb *pSdb, int32_t val);
|
||||
void sdbSetApplyIndex(SSdb *pSdb, int64_t index);
|
||||
int64_t sdbGetApplyIndex(SSdb *pSdb);
|
||||
void sdbSetApplyTerm(SSdb *pSdb, int64_t term);
|
||||
int64_t sdbGetApplyTerm(SSdb *pSdb);
|
||||
|
||||
SSdbRaw *sdbAllocRaw(ESdbType type, int8_t sver, int32_t dataLen);
|
||||
void sdbFreeRaw(SSdbRaw *pRaw);
|
||||
|
@ -339,6 +342,7 @@ typedef struct SSdb {
|
|||
char *tmpDir;
|
||||
int64_t lastCommitVer;
|
||||
int64_t curVer;
|
||||
int64_t curTerm;
|
||||
int64_t tableVer[SDB_MAX];
|
||||
int64_t maxId[SDB_MAX];
|
||||
EKeyType keyTypes[SDB_MAX];
|
||||
|
|
|
@ -78,6 +78,8 @@ typedef struct SFsmCbMeta {
|
|||
int32_t code;
|
||||
ESyncState state;
|
||||
uint64_t seqNum;
|
||||
SyncTerm term;
|
||||
SyncTerm currentTerm;
|
||||
} SFsmCbMeta;
|
||||
|
||||
typedef struct SSyncFSM {
|
||||
|
@ -85,6 +87,7 @@ typedef struct SSyncFSM {
|
|||
void (*FpCommitCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta);
|
||||
void (*FpPreCommitCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta);
|
||||
void (*FpRollBackCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta);
|
||||
void (*FpRestoreFinish)(struct SSyncFSM* pFsm);
|
||||
int32_t (*FpGetSnapshot)(struct SSyncFSM* pFsm, SSnapshot* pSnapshot);
|
||||
int32_t (*FpRestoreSnapshot)(struct SSyncFSM* pFsm, const SSnapshot* snapshot);
|
||||
} SSyncFSM;
|
||||
|
@ -117,7 +120,6 @@ typedef struct SSyncLogStore {
|
|||
|
||||
} SSyncLogStore;
|
||||
|
||||
|
||||
typedef struct SSyncInfo {
|
||||
SyncGroupId vgId;
|
||||
SSyncCfg syncCfg;
|
||||
|
|
|
@ -28,7 +28,7 @@ extern "C" {
|
|||
#define TAOS_CONN_CLIENT 1
|
||||
#define IsReq(pMsg) (pMsg->msgType & 1U)
|
||||
|
||||
extern int tsRpcHeadSize;
|
||||
extern int32_t tsRpcHeadSize;
|
||||
|
||||
typedef struct {
|
||||
uint32_t clientIp;
|
||||
|
@ -69,10 +69,10 @@ typedef struct SRpcInit {
|
|||
char localFqdn[TSDB_FQDN_LEN];
|
||||
uint16_t localPort; // local port
|
||||
char * label; // for debug purpose
|
||||
int numOfThreads; // number of threads to handle connections
|
||||
int sessions; // number of sessions allowed
|
||||
int32_t numOfThreads; // number of threads to handle connections
|
||||
int32_t sessions; // number of sessions allowed
|
||||
int8_t connType; // TAOS_CONN_UDP, TAOS_CONN_TCPC, TAOS_CONN_TCPS
|
||||
int idleTime; // milliseconds, 0 means idle timer is disabled
|
||||
int32_t idleTime; // milliseconds, 0 means idle timer is disabled
|
||||
|
||||
// the following is for client app ecurity only
|
||||
char *user; // user name
|
||||
|
@ -108,9 +108,9 @@ int32_t rpcInit();
|
|||
void rpcCleanup();
|
||||
void * rpcOpen(const SRpcInit *pRpc);
|
||||
void rpcClose(void *);
|
||||
void * rpcMallocCont(int contLen);
|
||||
void * rpcMallocCont(int32_t contLen);
|
||||
void rpcFreeCont(void *pCont);
|
||||
void * rpcReallocCont(void *ptr, int contLen);
|
||||
void * rpcReallocCont(void *ptr, int32_t contLen);
|
||||
|
||||
// Because taosd supports multi-process mode
|
||||
// These functions should not be used on the server side
|
||||
|
@ -121,10 +121,10 @@ void rpcRegisterBrokenLinkArg(SRpcMsg *msg);
|
|||
void rpcReleaseHandle(void *handle, int8_t type); // just release client conn to rpc instance, no close sock
|
||||
|
||||
// These functions will not be called in the child process
|
||||
void rpcSendRedirectRsp(void *pConn, const SEpSet *pEpSet);
|
||||
void rpcSendRequestWithCtx(void *thandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid, SRpcCtx *ctx);
|
||||
int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo);
|
||||
void rpcSendRecv(void *shandle, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp);
|
||||
void rpcSendRedirectRsp(void *pConn, const SEpSet *pEpSet);
|
||||
void rpcSendRequestWithCtx(void *thandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid, SRpcCtx *ctx);
|
||||
int32_t rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo);
|
||||
void rpcSendRecv(void *shandle, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -253,6 +253,7 @@ int32_t* taosGetErrno();
|
|||
#define TSDB_CODE_MND_TRANS_INVALID_STAGE TAOS_DEF_ERROR_CODE(0, 0x03D2)
|
||||
#define TSDB_CODE_MND_TRANS_CONFLICT TAOS_DEF_ERROR_CODE(0, 0x03D3)
|
||||
#define TSDB_CODE_MND_TRANS_UNKNOW_ERROR TAOS_DEF_ERROR_CODE(0, 0x03D4)
|
||||
#define TSDB_CODE_MND_TRANS_CLOG_IS_NULL TAOS_DEF_ERROR_CODE(0, 0x03D5)
|
||||
|
||||
// mnode-mq
|
||||
#define TSDB_CODE_MND_TOPIC_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x03E0)
|
||||
|
|
|
@ -82,7 +82,7 @@ typedef struct {
|
|||
do { \
|
||||
SEncoder coder = {0}; \
|
||||
tEncoderInit(&coder, NULL, 0); \
|
||||
if ((E)(&coder, S) == 0) { \
|
||||
if ((E)(&coder, S) >= 0) { \
|
||||
SIZE = coder.pos; \
|
||||
RET = 0; \
|
||||
} else { \
|
||||
|
|
|
@ -24,19 +24,22 @@ extern "C" {
|
|||
#endif
|
||||
|
||||
typedef struct SMnodeMgmt {
|
||||
SDnodeData *pData;
|
||||
SMnode *pMnode;
|
||||
SMsgCb msgCb;
|
||||
const char *path;
|
||||
const char *name;
|
||||
SSingleWorker queryWorker;
|
||||
SSingleWorker readWorker;
|
||||
SSingleWorker writeWorker;
|
||||
SSingleWorker syncWorker;
|
||||
SSingleWorker monitorWorker;
|
||||
SReplica replicas[TSDB_MAX_REPLICA];
|
||||
int8_t replica;
|
||||
int8_t selfIndex;
|
||||
SDnodeData *pData;
|
||||
SMnode *pMnode;
|
||||
SMsgCb msgCb;
|
||||
const char *path;
|
||||
const char *name;
|
||||
SSingleWorker queryWorker;
|
||||
SSingleWorker readWorker;
|
||||
SSingleWorker writeWorker;
|
||||
SSingleWorker syncWorker;
|
||||
SSingleWorker monitorWorker;
|
||||
SReplica replicas[TSDB_MAX_REPLICA];
|
||||
int8_t replica;
|
||||
int8_t selfIndex;
|
||||
bool stopped;
|
||||
int32_t refCount;
|
||||
TdThreadRwlock lock;
|
||||
} SMnodeMgmt;
|
||||
|
||||
// mmFile.c
|
||||
|
@ -45,6 +48,8 @@ int32_t mmWriteFile(SMnodeMgmt *pMgmt, SDCreateMnodeReq *pMsg, bool deployed);
|
|||
|
||||
// mmInt.c
|
||||
int32_t mmAlter(SMnodeMgmt *pMgmt, SDAlterMnodeReq *pMsg);
|
||||
int32_t mmAcquire(SMnodeMgmt *pMgmt);
|
||||
void mmRelease(SMnodeMgmt *pMgmt);
|
||||
|
||||
// mmHandle.c
|
||||
SArray *mmGetMsgHandles();
|
||||
|
|
|
@ -237,6 +237,16 @@ SArray *mmGetMsgHandles() {
|
|||
if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_VNODE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_COMPACT_VNODE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_TIMEOUT, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_PING, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_PING_REPLY, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_CLIENT_REQUEST, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_CLIENT_REQUEST_REPLY, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_REQUEST_VOTE, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_REQUEST_VOTE_REPLY, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_APPEND_ENTRIES, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_APPEND_ENTRIES_REPLY, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER;
|
||||
|
||||
code = 0;
|
||||
|
||||
_OVER:
|
||||
|
|
|
@ -110,6 +110,7 @@ static void mmClose(SMnodeMgmt *pMgmt) {
|
|||
if (pMgmt->pMnode != NULL) {
|
||||
mmStopWorker(pMgmt);
|
||||
mndClose(pMgmt->pMnode);
|
||||
taosThreadRwlockDestroy(&pMgmt->lock);
|
||||
pMgmt->pMnode = NULL;
|
||||
}
|
||||
|
||||
|
@ -122,6 +123,11 @@ static int32_t mmOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
|
|||
return -1;
|
||||
}
|
||||
|
||||
if (syncInit() != 0) {
|
||||
dError("failed to init sync since %s", terrstr());
|
||||
return -1;
|
||||
}
|
||||
|
||||
SMnodeMgmt *pMgmt = taosMemoryCalloc(1, sizeof(SMnodeMgmt));
|
||||
if (pMgmt == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
@ -137,6 +143,7 @@ static int32_t mmOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
|
|||
pMgmt->msgCb.queueFps[WRITE_QUEUE] = (PutToQueueFp)mmPutRpcMsgToWriteQueue;
|
||||
pMgmt->msgCb.queueFps[SYNC_QUEUE] = (PutToQueueFp)mmPutRpcMsgToSyncQueue;
|
||||
pMgmt->msgCb.mgmt = pMgmt;
|
||||
taosThreadRwlockInit(&pMgmt->lock, NULL);
|
||||
|
||||
bool deployed = false;
|
||||
if (mmReadFile(pMgmt, &deployed) != 0) {
|
||||
|
@ -206,3 +213,22 @@ SMgmtFunc mmGetMgmtFunc() {
|
|||
|
||||
return mgmtFunc;
|
||||
}
|
||||
|
||||
int32_t mmAcquire(SMnodeMgmt *pMgmt) {
|
||||
int32_t code = 0;
|
||||
|
||||
taosThreadRwlockRdlock(&pMgmt->lock);
|
||||
if (pMgmt->stopped) {
|
||||
code = -1;
|
||||
} else {
|
||||
atomic_add_fetch_32(&pMgmt->refCount, 1);
|
||||
}
|
||||
taosThreadRwlockUnlock(&pMgmt->lock);
|
||||
return code;
|
||||
}
|
||||
|
||||
void mmRelease(SMnodeMgmt *pMgmt) {
|
||||
taosThreadRwlockRdlock(&pMgmt->lock);
|
||||
atomic_sub_fetch_32(&pMgmt->refCount, 1);
|
||||
taosThreadRwlockUnlock(&pMgmt->lock);
|
||||
}
|
|
@ -56,6 +56,12 @@ static void mmProcessQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
|
|||
taosFreeQitem(pMsg);
|
||||
}
|
||||
|
||||
static void mmProcessSyncQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
|
||||
SMnodeMgmt *pMgmt = pInfo->ahandle;
|
||||
pMsg->info.node = pMgmt->pMnode;
|
||||
mndProcessSyncMsg(pMsg);
|
||||
}
|
||||
|
||||
static int32_t mmPutNodeMsgToWorker(SSingleWorker *pWorker, SRpcMsg *pMsg) {
|
||||
dTrace("msg:%p, put into worker %s, type:%s", pMsg, pWorker->name, TMSG_INFO(pMsg->msgType));
|
||||
taosWriteQitem(pWorker->queue, pMsg);
|
||||
|
@ -105,7 +111,10 @@ int32_t mmPutRpcMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
|||
}
|
||||
|
||||
int32_t mmPutRpcMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
||||
return mmPutRpcMsgToWorker(&pMgmt->syncWorker, pMsg);
|
||||
if (mmAcquire(pMgmt) != 0) return -1;
|
||||
int32_t code = mmPutRpcMsgToWorker(&pMgmt->syncWorker, pMsg);
|
||||
mmRelease(pMgmt);
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t mmStartWorker(SMnodeMgmt *pMgmt) {
|
||||
|
@ -149,7 +158,7 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt) {
|
|||
.min = 1,
|
||||
.max = 1,
|
||||
.name = "mnode-sync",
|
||||
.fp = (FItem)mmProcessQueue,
|
||||
.fp = (FItem)mmProcessSyncQueue,
|
||||
.param = pMgmt,
|
||||
};
|
||||
if (tSingleWorkerInit(&pMgmt->syncWorker, &sCfg) != 0) {
|
||||
|
@ -174,6 +183,11 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt) {
|
|||
}
|
||||
|
||||
void mmStopWorker(SMnodeMgmt *pMgmt) {
|
||||
taosThreadRwlockWrlock(&pMgmt->lock);
|
||||
pMgmt->stopped = 1;
|
||||
taosThreadRwlockUnlock(&pMgmt->lock);
|
||||
while (pMgmt->refCount > 0) taosMsleep(10);
|
||||
|
||||
tSingleWorkerCleanup(&pMgmt->monitorWorker);
|
||||
tSingleWorkerCleanup(&pMgmt->queryWorker);
|
||||
tSingleWorkerCleanup(&pMgmt->readWorker);
|
||||
|
|
|
@ -4,7 +4,7 @@ target_link_libraries(
|
|||
dmnodeTest sut
|
||||
)
|
||||
|
||||
add_test(
|
||||
NAME dmnodeTest
|
||||
COMMAND dmnodeTest
|
||||
)
|
||||
#add_test(
|
||||
# NAME dmnodeTest
|
||||
# COMMAND dmnodeTest
|
||||
#)
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
#include "mndDef.h"
|
||||
|
||||
#include "sdb.h"
|
||||
#include "syncTools.h"
|
||||
#include "tcache.h"
|
||||
#include "tdatablock.h"
|
||||
#include "tglobal.h"
|
||||
|
@ -31,12 +32,14 @@
|
|||
extern "C" {
|
||||
#endif
|
||||
|
||||
// clang-format off
|
||||
#define mFatal(...) { if (mDebugFlag & DEBUG_FATAL) { taosPrintLog("MND FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }}
|
||||
#define mError(...) { if (mDebugFlag & DEBUG_ERROR) { taosPrintLog("MND ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }}
|
||||
#define mWarn(...) { if (mDebugFlag & DEBUG_WARN) { taosPrintLog("MND WARN ", DEBUG_WARN, 255, __VA_ARGS__); }}
|
||||
#define mInfo(...) { if (mDebugFlag & DEBUG_INFO) { taosPrintLog("MND ", DEBUG_INFO, 255, __VA_ARGS__); }}
|
||||
#define mDebug(...) { if (mDebugFlag & DEBUG_DEBUG) { taosPrintLog("MND ", DEBUG_DEBUG, mDebugFlag, __VA_ARGS__); }}
|
||||
#define mTrace(...) { if (mDebugFlag & DEBUG_TRACE) { taosPrintLog("MND ", DEBUG_TRACE, mDebugFlag, __VA_ARGS__); }}
|
||||
// clang-format on
|
||||
|
||||
#define SYSTABLE_SCH_TABLE_NAME_LEN ((TSDB_TABLE_NAME_LEN - 1) + VARSTR_HEADER_SIZE)
|
||||
#define SYSTABLE_SCH_DB_NAME_LEN ((TSDB_DB_NAME_LEN - 1) + VARSTR_HEADER_SIZE)
|
||||
|
@ -72,10 +75,11 @@ typedef struct {
|
|||
} STelemMgmt;
|
||||
|
||||
typedef struct {
|
||||
int32_t errCode;
|
||||
sem_t syncSem;
|
||||
SWal *pWal;
|
||||
SSyncNode *pSyncNode;
|
||||
int32_t errCode;
|
||||
bool restored;
|
||||
sem_t syncSem;
|
||||
int64_t sync;
|
||||
ESyncState state;
|
||||
} SSyncMgmt;
|
||||
|
||||
|
|
|
@ -26,6 +26,8 @@ int32_t mndInitSync(SMnode *pMnode);
|
|||
void mndCleanupSync(SMnode *pMnode);
|
||||
bool mndIsMaster(SMnode *pMnode);
|
||||
int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw);
|
||||
void mndSyncStart(SMnode *pMnode);
|
||||
void mndSyncStop(SMnode *pMnode);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -35,7 +35,7 @@ int32_t mndDropTopicByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb);
|
|||
|
||||
const char *mndTopicGetShowName(const char topic[TSDB_TOPIC_FNAME_LEN]);
|
||||
|
||||
int32_t mndSetTopicRedoLogs(SMnode *pMnode, STrans *pTrans, SMqTopicObj *pTopic);
|
||||
int32_t mndSetTopicCommitLogs(SMnode *pMnode, STrans *pTrans, SMqTopicObj *pTopic);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -419,7 +419,7 @@ static int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
|
|||
SMqTopicObj topicObj = {0};
|
||||
memcpy(&topicObj, pTopic, sizeof(SMqTopicObj));
|
||||
topicObj.refConsumerCnt = pTopic->refConsumerCnt + 1;
|
||||
if (mndSetTopicRedoLogs(pMnode, pTrans, &topicObj) != 0) goto SUBSCRIBE_OVER;
|
||||
if (mndSetTopicCommitLogs(pMnode, pTrans, &topicObj) != 0) goto SUBSCRIBE_OVER;
|
||||
|
||||
mndReleaseTopic(pMnode, pTopic);
|
||||
}
|
||||
|
|
|
@ -448,13 +448,13 @@ static int32_t mndCreateDnode(SMnode *pMnode, SRpcMsg *pReq, SCreateDnodeReq *pC
|
|||
}
|
||||
mDebug("trans:%d, used to create dnode:%s", pTrans->id, dnodeObj.ep);
|
||||
|
||||
SSdbRaw *pRedoRaw = mndDnodeActionEncode(&dnodeObj);
|
||||
if (pRedoRaw == NULL || mndTransAppendRedolog(pTrans, pRedoRaw) != 0) {
|
||||
mError("trans:%d, failed to append redo log since %s", pTrans->id, terrstr());
|
||||
SSdbRaw *pCommitRaw = mndDnodeActionEncode(&dnodeObj);
|
||||
if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
|
||||
mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
|
||||
mndTransDrop(pTrans);
|
||||
return -1;
|
||||
}
|
||||
sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY);
|
||||
sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY);
|
||||
|
||||
if (mndTransPrepare(pMnode, pTrans) != 0) {
|
||||
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
|
||||
|
@ -524,13 +524,13 @@ static int32_t mndDropDnode(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode) {
|
|||
}
|
||||
mDebug("trans:%d, used to drop dnode:%d", pTrans->id, pDnode->id);
|
||||
|
||||
SSdbRaw *pRedoRaw = mndDnodeActionEncode(pDnode);
|
||||
if (pRedoRaw == NULL || mndTransAppendRedolog(pTrans, pRedoRaw) != 0) {
|
||||
mError("trans:%d, failed to append redo log since %s", pTrans->id, terrstr());
|
||||
SSdbRaw *pCommitRaw = mndDnodeActionEncode(pDnode);
|
||||
if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
|
||||
mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
|
||||
mndTransDrop(pTrans);
|
||||
return -1;
|
||||
}
|
||||
sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPED);
|
||||
sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED);
|
||||
|
||||
if (mndTransPrepare(pMnode, pTrans) != 0) {
|
||||
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
|
||||
|
|
|
@ -153,6 +153,7 @@ int32_t mndCreateOffsets(STrans *pTrans, const char *cgroup, const char *topicNa
|
|||
return -1;
|
||||
}
|
||||
sdbSetRawStatus(pOffsetRaw, SDB_STATUS_READY);
|
||||
// commit log or redo log?
|
||||
if (mndTransAppendRedolog(pTrans, pOffsetRaw) < 0) {
|
||||
return -1;
|
||||
}
|
||||
|
@ -188,7 +189,7 @@ static int32_t mndProcessCommitOffsetReq(SRpcMsg *pMsg) {
|
|||
pOffsetObj->offset = pOffset->offset;
|
||||
SSdbRaw *pOffsetRaw = mndOffsetActionEncode(pOffsetObj);
|
||||
sdbSetRawStatus(pOffsetRaw, SDB_STATUS_READY);
|
||||
mndTransAppendRedolog(pTrans, pOffsetRaw);
|
||||
mndTransAppendCommitlog(pTrans, pOffsetRaw);
|
||||
if (create) {
|
||||
taosMemoryFree(pOffsetObj);
|
||||
} else {
|
||||
|
|
|
@ -743,9 +743,7 @@ static int32_t mndCreateStb(SMnode *pMnode, SRpcMsg *pReq, SMCreateStbReq *pCrea
|
|||
|
||||
mDebug("trans:%d, used to create stb:%s", pTrans->id, pCreate->name);
|
||||
|
||||
if (mndBuildStbFromReq(pMnode, &stbObj, pCreate, pDb) != 0) {
|
||||
goto _OVER;
|
||||
}
|
||||
if (mndBuildStbFromReq(pMnode, &stbObj, pCreate, pDb) != 0) goto _OVER;
|
||||
|
||||
if (mndAddStbToTrans(pMnode, pTrans, pDb, &stbObj) < 0) goto _OVER;
|
||||
|
||||
|
|
|
@ -279,13 +279,13 @@ int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast
|
|||
}
|
||||
mDebug("trans:%d, used to create stream:%s", pTrans->id, pStream->name);
|
||||
|
||||
SSdbRaw *pRedoRaw = mndStreamActionEncode(pStream);
|
||||
if (pRedoRaw == NULL || mndTransAppendRedolog(pTrans, pRedoRaw) != 0) {
|
||||
mError("trans:%d, failed to append redo log since %s", pTrans->id, terrstr());
|
||||
SSdbRaw *pCommitRaw = mndStreamActionEncode(pStream);
|
||||
if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
|
||||
mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
|
||||
mndTransDrop(pTrans);
|
||||
return -1;
|
||||
}
|
||||
sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY);
|
||||
sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -479,7 +479,7 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu
|
|||
SMqTopicObj topicObj = {0};
|
||||
memcpy(&topicObj, pTopic, sizeof(SMqTopicObj));
|
||||
topicObj.refConsumerCnt = pTopic->refConsumerCnt - consumerNum;
|
||||
if (mndSetTopicRedoLogs(pMnode, pTrans, &topicObj) != 0) goto REB_FAIL;
|
||||
if (mndSetTopicCommitLogs(pMnode, pTrans, &topicObj) != 0) goto REB_FAIL;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -17,10 +17,60 @@
|
|||
#include "mndSync.h"
|
||||
#include "mndTrans.h"
|
||||
|
||||
static int32_t mndInitWal(SMnode *pMnode) {
|
||||
int32_t mndSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
|
||||
int32_t code = tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg);
|
||||
if (code != 0) {
|
||||
rpcFreeCont(pMsg->pCont);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t mndSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) { return tmsgSendReq(pEpSet, pMsg); }
|
||||
|
||||
void mndSyncCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
|
||||
SMnode *pMnode = pFsm->data;
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
|
||||
SSdbRaw *pRaw = pMsg->pCont;
|
||||
|
||||
mTrace("ver:%" PRId64 ", apply raw:%p to sdb, role:%s", cbMeta.index, pRaw, syncStr(cbMeta.state));
|
||||
sdbWriteWithoutFree(pSdb, pRaw);
|
||||
sdbSetApplyIndex(pSdb, cbMeta.index);
|
||||
sdbSetApplyTerm(pSdb, cbMeta.term);
|
||||
if (cbMeta.state == TAOS_SYNC_STATE_LEADER) {
|
||||
tsem_post(&pMgmt->syncSem);
|
||||
}
|
||||
}
|
||||
|
||||
int32_t mndSyncGetSnapshot(struct SSyncFSM *pFsm, SSnapshot *pSnapshot) {
|
||||
SMnode *pMnode = pFsm->data;
|
||||
pSnapshot->lastApplyIndex = sdbGetApplyIndex(pMnode->pSdb);
|
||||
pSnapshot->lastApplyTerm = sdbGetApplyTerm(pMnode->pSdb);
|
||||
return 0;
|
||||
}
|
||||
|
||||
void mndRestoreFinish(struct SSyncFSM *pFsm) {
|
||||
SMnode *pMnode = pFsm->data;
|
||||
mndTransPullup(pMnode);
|
||||
pMnode->syncMgmt.restored = true;
|
||||
}
|
||||
|
||||
SSyncFSM *mndSyncMakeFsm(SMnode *pMnode) {
|
||||
SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM));
|
||||
pFsm->data = pMnode;
|
||||
pFsm->FpCommitCb = mndSyncCommitMsg;
|
||||
pFsm->FpPreCommitCb = NULL;
|
||||
pFsm->FpRollBackCb = NULL;
|
||||
pFsm->FpGetSnapshot = mndSyncGetSnapshot;
|
||||
pFsm->FpRestoreFinish = mndRestoreFinish;
|
||||
pFsm->FpRestoreSnapshot = NULL;
|
||||
return pFsm;
|
||||
}
|
||||
|
||||
int32_t mndInitSync(SMnode *pMnode) {
|
||||
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
|
||||
|
||||
char path[PATH_MAX] = {0};
|
||||
char path[PATH_MAX + 20] = {0};
|
||||
snprintf(path, sizeof(path), "%s%swal", pMnode->path, TD_DIRSEP);
|
||||
SWalCfg cfg = {
|
||||
.vgId = 1,
|
||||
|
@ -31,164 +81,88 @@ static int32_t mndInitWal(SMnode *pMnode) {
|
|||
.retentionSize = -1,
|
||||
.level = TAOS_WAL_FSYNC,
|
||||
};
|
||||
|
||||
pMgmt->pWal = walOpen(path, &cfg);
|
||||
if (pMgmt->pWal == NULL) return -1;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void mndCloseWal(SMnode *pMnode) {
|
||||
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
|
||||
if (pMgmt->pWal != NULL) {
|
||||
walClose(pMgmt->pWal);
|
||||
pMgmt->pWal = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
static int32_t mndRestoreWal(SMnode *pMnode) {
|
||||
SWal *pWal = pMnode->syncMgmt.pWal;
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
int64_t lastSdbVer = sdbUpdateVer(pSdb, 0);
|
||||
int32_t code = -1;
|
||||
|
||||
SWalReadHandle *pHandle = walOpenReadHandle(pWal);
|
||||
if (pHandle == NULL) return -1;
|
||||
|
||||
int64_t first = walGetFirstVer(pWal);
|
||||
int64_t last = walGetLastVer(pWal);
|
||||
mDebug("start to restore wal, sdbver:%" PRId64 ", first:%" PRId64 " last:%" PRId64, lastSdbVer, first, last);
|
||||
|
||||
first = TMAX(lastSdbVer + 1, first);
|
||||
for (int64_t ver = first; ver >= 0 && ver <= last; ++ver) {
|
||||
if (walReadWithHandle(pHandle, ver) < 0) {
|
||||
mError("ver:%" PRId64 ", failed to read from wal since %s", ver, terrstr());
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
SWalHead *pHead = pHandle->pHead;
|
||||
int64_t sdbVer = sdbUpdateVer(pSdb, 0);
|
||||
if (sdbVer + 1 != ver) {
|
||||
terrno = TSDB_CODE_SDB_INVALID_WAl_VER;
|
||||
mError("ver:%" PRId64 ", failed to write to sdb, since inconsistent with sdbver:%" PRId64, ver, sdbVer);
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
mTrace("ver:%" PRId64 ", will be restored, content:%p", ver, pHead->head.body);
|
||||
if (sdbWriteWithoutFree(pSdb, (void *)pHead->head.body) < 0) {
|
||||
mError("ver:%" PRId64 ", failed to write to sdb since %s", ver, terrstr());
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
sdbUpdateVer(pSdb, 1);
|
||||
mDebug("ver:%" PRId64 ", is restored", ver);
|
||||
}
|
||||
|
||||
int64_t sdbVer = sdbUpdateVer(pSdb, 0);
|
||||
mDebug("restore wal finished, sdbver:%" PRId64, sdbVer);
|
||||
|
||||
mndTransPullup(pMnode);
|
||||
sdbVer = sdbUpdateVer(pSdb, 0);
|
||||
mDebug("pullup trans finished, sdbver:%" PRId64, sdbVer);
|
||||
|
||||
if (sdbVer != lastSdbVer) {
|
||||
mInfo("sdb restored from %" PRId64 " to %" PRId64 ", write file", lastSdbVer, sdbVer);
|
||||
if (sdbWriteFile(pSdb) != 0) {
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
if (walCommit(pWal, sdbVer) != 0) {
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
if (walBeginSnapshot(pWal, sdbVer) < 0) {
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
if (walEndSnapshot(pWal) < 0) {
|
||||
goto _OVER;
|
||||
}
|
||||
}
|
||||
|
||||
code = 0;
|
||||
|
||||
_OVER:
|
||||
walCloseReadHandle(pHandle);
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t mndInitSync(SMnode *pMnode) {
|
||||
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
|
||||
tsem_init(&pMgmt->syncSem, 0, 0);
|
||||
|
||||
if (mndInitWal(pMnode) < 0) {
|
||||
if (pMgmt->pWal == NULL) {
|
||||
mError("failed to open wal since %s", terrstr());
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (mndRestoreWal(pMnode) < 0) {
|
||||
mError("failed to restore wal since %s", terrstr());
|
||||
SSyncInfo syncInfo = {.vgId = 1, .FpSendMsg = mndSyncSendMsg, .FpEqMsg = mndSyncEqMsg};
|
||||
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s%ssync", pMnode->path, TD_DIRSEP);
|
||||
syncInfo.pWal = pMgmt->pWal;
|
||||
syncInfo.pFsm = mndSyncMakeFsm(pMnode);
|
||||
|
||||
SSyncCfg *pCfg = &syncInfo.syncCfg;
|
||||
pCfg->replicaNum = pMnode->replica;
|
||||
pCfg->myIndex = pMnode->selfIndex;
|
||||
for (int32_t i = 0; i < pMnode->replica; ++i) {
|
||||
SNodeInfo *pNode = &pCfg->nodeInfo[i];
|
||||
tstrncpy(pNode->nodeFqdn, pMnode->replicas[i].fqdn, sizeof(pNode->nodeFqdn));
|
||||
pNode->nodePort = pMnode->replicas[i].port;
|
||||
}
|
||||
|
||||
tsem_init(&pMgmt->syncSem, 0, 0);
|
||||
pMgmt->sync = syncOpen(&syncInfo);
|
||||
if (pMgmt->sync <= 0) {
|
||||
mError("failed to open sync since %s", terrstr());
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (pMnode->selfId == 1) {
|
||||
pMgmt->state = TAOS_SYNC_STATE_LEADER;
|
||||
}
|
||||
pMgmt->pSyncNode = NULL;
|
||||
mDebug("mnode sync is opened, id:%" PRId64, pMgmt->sync);
|
||||
return 0;
|
||||
}
|
||||
|
||||
void mndCleanupSync(SMnode *pMnode) {
|
||||
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
|
||||
syncStop(pMgmt->sync);
|
||||
mDebug("sync:%" PRId64 " is stopped", pMgmt->sync);
|
||||
|
||||
tsem_destroy(&pMgmt->syncSem);
|
||||
mndCloseWal(pMnode);
|
||||
}
|
||||
if (pMgmt->pWal != NULL) {
|
||||
walClose(pMgmt->pWal);
|
||||
}
|
||||
|
||||
static int32_t mndSyncApplyCb(struct SSyncFSM *fsm, SyncIndex index, const SSyncBuffer *buf, void *pData) {
|
||||
SMnode *pMnode = pData;
|
||||
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
|
||||
|
||||
pMgmt->errCode = 0;
|
||||
tsem_post(&pMgmt->syncSem);
|
||||
|
||||
return 0;
|
||||
memset(pMgmt, 0, sizeof(SSyncMgmt));
|
||||
}
|
||||
|
||||
int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw) {
|
||||
SWal *pWal = pMnode->syncMgmt.pWal;
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
|
||||
int64_t ver = sdbUpdateVer(pSdb, 1);
|
||||
if (walWrite(pWal, ver, 1, pRaw, sdbGetRawTotalSize(pRaw)) < 0) {
|
||||
sdbUpdateVer(pSdb, -1);
|
||||
mError("ver:%" PRId64 ", failed to write raw:%p to wal since %s", ver, pRaw, terrstr());
|
||||
return -1;
|
||||
}
|
||||
|
||||
mTrace("ver:%" PRId64 ", write to wal, raw:%p", ver, pRaw);
|
||||
walCommit(pWal, ver);
|
||||
walFsync(pWal, true);
|
||||
|
||||
#if 1
|
||||
return 0;
|
||||
#else
|
||||
if (pMnode->replica == 1) return 0;
|
||||
|
||||
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
|
||||
pMgmt->errCode = 0;
|
||||
|
||||
SSyncBuffer buf = {.data = pRaw, .len = sdbGetRawTotalSize(pRaw)};
|
||||
SRpcMsg rsp = {.code = TDMT_MND_APPLY_MSG, .contLen = sdbGetRawTotalSize(pRaw)};
|
||||
rsp.pCont = rpcMallocCont(rsp.contLen);
|
||||
if (rsp.pCont == NULL) return -1;
|
||||
memcpy(rsp.pCont, pRaw, rsp.contLen);
|
||||
|
||||
bool isWeak = false;
|
||||
int32_t code = syncPropose(pMgmt->pSyncNode, &buf, pMnode, isWeak);
|
||||
const bool isWeak = false;
|
||||
int32_t code = syncPropose(pMgmt->sync, &rsp, isWeak);
|
||||
if (code == 0) {
|
||||
tsem_wait(&pMgmt->syncSem);
|
||||
} else if (code == TAOS_SYNC_PROPOSE_NOT_LEADER) {
|
||||
terrno = TSDB_CODE_APP_NOT_READY;
|
||||
} else if (code == TAOS_SYNC_PROPOSE_OTHER_ERROR) {
|
||||
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
|
||||
} else {
|
||||
terrno = TSDB_CODE_APP_ERROR;
|
||||
}
|
||||
|
||||
if (code != 0) return code;
|
||||
|
||||
tsem_wait(&pMgmt->syncSem);
|
||||
return pMgmt->errCode;
|
||||
#endif
|
||||
}
|
||||
|
||||
void mndSyncStart(SMnode *pMnode) {
|
||||
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
|
||||
syncSetMsgCb(pMgmt->sync, &pMnode->msgCb);
|
||||
syncStart(pMgmt->sync);
|
||||
mDebug("sync:%" PRId64 " is started", pMgmt->sync);
|
||||
}
|
||||
|
||||
void mndSyncStop(SMnode *pMnode) {}
|
||||
|
||||
bool mndIsMaster(SMnode *pMnode) {
|
||||
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
|
||||
return pMgmt->state == TAOS_SYNC_STATE_LEADER;
|
||||
pMgmt->state = syncGetMyRole(pMgmt->sync);
|
||||
|
||||
return (pMgmt->state == TAOS_SYNC_STATE_LEADER) && (pMnode->syncMgmt.restored);
|
||||
}
|
||||
|
|
|
@ -386,14 +386,14 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
|
|||
}
|
||||
mDebug("trans:%d, used to create topic:%s", pTrans->id, pCreate->name);
|
||||
|
||||
SSdbRaw *pRedoRaw = mndTopicActionEncode(&topicObj);
|
||||
if (pRedoRaw == NULL || mndTransAppendRedolog(pTrans, pRedoRaw) != 0) {
|
||||
mError("trans:%d, failed to append redo log since %s", pTrans->id, terrstr());
|
||||
SSdbRaw *pCommitRaw = mndTopicActionEncode(&topicObj);
|
||||
if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
|
||||
mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
|
||||
taosMemoryFreeClear(topicObj.physicalPlan);
|
||||
mndTransDrop(pTrans);
|
||||
return -1;
|
||||
}
|
||||
sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY);
|
||||
sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY);
|
||||
|
||||
if (mndTransPrepare(pMnode, pTrans) != 0) {
|
||||
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
|
||||
|
@ -473,13 +473,13 @@ CREATE_TOPIC_OVER:
|
|||
}
|
||||
|
||||
static int32_t mndDropTopic(SMnode *pMnode, STrans *pTrans, SRpcMsg *pReq, SMqTopicObj *pTopic) {
|
||||
SSdbRaw *pRedoRaw = mndTopicActionEncode(pTopic);
|
||||
if (pRedoRaw == NULL || mndTransAppendRedolog(pTrans, pRedoRaw) != 0) {
|
||||
mError("trans:%d, failed to append redo log since %s", pTrans->id, terrstr());
|
||||
SSdbRaw *pCommitRaw = mndTopicActionEncode(pTopic);
|
||||
if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
|
||||
mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
|
||||
mndTransDrop(pTrans);
|
||||
return -1;
|
||||
}
|
||||
sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPED);
|
||||
sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED);
|
||||
|
||||
if (mndTransPrepare(pMnode, pTrans) != 0) {
|
||||
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
|
||||
|
@ -627,11 +627,11 @@ static int32_t mndRetrieveTopic(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBl
|
|||
return numOfRows;
|
||||
}
|
||||
|
||||
int32_t mndSetTopicRedoLogs(SMnode *pMnode, STrans *pTrans, SMqTopicObj *pTopic) {
|
||||
SSdbRaw *pRedoRaw = mndTopicActionEncode(pTopic);
|
||||
if (pRedoRaw == NULL) return -1;
|
||||
if (mndTransAppendCommitlog(pTrans, pRedoRaw) != 0) return -1;
|
||||
if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY) != 0) return -1;
|
||||
int32_t mndSetTopicCommitLogs(SMnode *pMnode, STrans *pTrans, SMqTopicObj *pTopic) {
|
||||
SSdbRaw *pCommitRaw = mndTopicActionEncode(pTopic);
|
||||
if (pCommitRaw == NULL) return -1;
|
||||
if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1;
|
||||
if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY) != 0) return -1;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -682,13 +682,6 @@ static int32_t mndTransSync(SMnode *pMnode, STrans *pTrans) {
|
|||
}
|
||||
|
||||
mDebug("trans:%d, sync finished", pTrans->id);
|
||||
|
||||
code = sdbWrite(pMnode->pSdb, pRaw);
|
||||
if (code != 0) {
|
||||
mError("trans:%d, failed to write sdb since %s", pTrans->id, terrstr());
|
||||
return -1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -768,6 +761,12 @@ int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) {
|
|||
return -1;
|
||||
}
|
||||
|
||||
if (taosArrayGetSize(pTrans->commitLogs) <= 0) {
|
||||
terrno = TSDB_CODE_MND_TRANS_CLOG_IS_NULL;
|
||||
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
|
||||
return -1;
|
||||
}
|
||||
|
||||
mDebug("trans:%d, prepare transaction", pTrans->id);
|
||||
if (mndTransSync(pMnode, pTrans) != 0) {
|
||||
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
|
||||
|
@ -1080,6 +1079,8 @@ static bool mndTransPerformRedoLogStage(SMnode *pMnode, STrans *pTrans) {
|
|||
}
|
||||
|
||||
static bool mndTransPerformRedoActionStage(SMnode *pMnode, STrans *pTrans) {
|
||||
if (!mndIsMaster(pMnode)) return false;
|
||||
|
||||
bool continueExec = true;
|
||||
int32_t code = mndTransExecuteRedoActions(pMnode, pTrans);
|
||||
|
||||
|
@ -1169,6 +1170,8 @@ static bool mndTransPerformUndoLogStage(SMnode *pMnode, STrans *pTrans) {
|
|||
}
|
||||
|
||||
static bool mndTransPerformUndoActionStage(SMnode *pMnode, STrans *pTrans) {
|
||||
if (!mndIsMaster(pMnode)) return false;
|
||||
|
||||
bool continueExec = true;
|
||||
int32_t code = mndTransExecuteUndoActions(pMnode, pTrans);
|
||||
|
||||
|
@ -1350,19 +1353,35 @@ _OVER:
|
|||
return code;
|
||||
}
|
||||
|
||||
void mndTransPullup(SMnode *pMnode) {
|
||||
STrans *pTrans = NULL;
|
||||
void *pIter = NULL;
|
||||
static int32_t mndCompareTransId(int32_t *pTransId1, int32_t *pTransId2) { return *pTransId1 >= *pTransId2 ? 1 : 0; }
|
||||
|
||||
void mndTransPullup(SMnode *pMnode) {
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
SArray *pArray = taosArrayInit(sdbGetSize(pSdb, SDB_TRANS), sizeof(int32_t));
|
||||
if (pArray == NULL) return;
|
||||
|
||||
void *pIter = NULL;
|
||||
while (1) {
|
||||
STrans *pTrans = NULL;
|
||||
pIter = sdbFetch(pMnode->pSdb, SDB_TRANS, pIter, (void **)&pTrans);
|
||||
if (pIter == NULL) break;
|
||||
taosArrayPush(pArray, &pTrans->id);
|
||||
sdbRelease(pSdb, pTrans);
|
||||
}
|
||||
|
||||
mndTransExecute(pMnode, pTrans);
|
||||
sdbRelease(pMnode->pSdb, pTrans);
|
||||
taosArraySort(pArray, (__compar_fn_t)mndCompareTransId);
|
||||
|
||||
for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
|
||||
int32_t *pTransId = taosArrayGet(pArray, i);
|
||||
STrans *pTrans = mndAcquireTrans(pMnode, *pTransId);
|
||||
if (pTrans != NULL) {
|
||||
mndTransExecute(pMnode, pTrans);
|
||||
}
|
||||
mndReleaseTrans(pMnode, pTrans);
|
||||
}
|
||||
|
||||
sdbWriteFile(pMnode->pSdb);
|
||||
taosArrayDestroy(pArray);
|
||||
}
|
||||
|
||||
static int32_t mndRetrieveTrans(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
|
||||
|
|
|
@ -272,13 +272,13 @@ static int32_t mndCreateUser(SMnode *pMnode, char *acct, SCreateUserReq *pCreate
|
|||
}
|
||||
mDebug("trans:%d, used to create user:%s", pTrans->id, pCreate->user);
|
||||
|
||||
SSdbRaw *pRedoRaw = mndUserActionEncode(&userObj);
|
||||
if (pRedoRaw == NULL || mndTransAppendRedolog(pTrans, pRedoRaw) != 0) {
|
||||
mError("trans:%d, failed to append redo log since %s", pTrans->id, terrstr());
|
||||
SSdbRaw *pCommitRaw = mndUserActionEncode(&userObj);
|
||||
if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
|
||||
mError("trans:%d, failed to commit redo log since %s", pTrans->id, terrstr());
|
||||
mndTransDrop(pTrans);
|
||||
return -1;
|
||||
}
|
||||
sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY);
|
||||
sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY);
|
||||
|
||||
if (mndTransPrepare(pMnode, pTrans) != 0) {
|
||||
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
|
||||
|
@ -352,13 +352,13 @@ static int32_t mndAlterUser(SMnode *pMnode, SUserObj *pOld, SUserObj *pNew, SRpc
|
|||
}
|
||||
mDebug("trans:%d, used to alter user:%s", pTrans->id, pOld->user);
|
||||
|
||||
SSdbRaw *pRedoRaw = mndUserActionEncode(pNew);
|
||||
if (pRedoRaw == NULL || mndTransAppendRedolog(pTrans, pRedoRaw) != 0) {
|
||||
mError("trans:%d, failed to append redo log since %s", pTrans->id, terrstr());
|
||||
SSdbRaw *pCommitRaw = mndUserActionEncode(pNew);
|
||||
if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
|
||||
mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
|
||||
mndTransDrop(pTrans);
|
||||
return -1;
|
||||
}
|
||||
sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY);
|
||||
sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY);
|
||||
|
||||
if (mndTransPrepare(pMnode, pTrans) != 0) {
|
||||
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
|
||||
|
@ -559,13 +559,13 @@ static int32_t mndDropUser(SMnode *pMnode, SRpcMsg *pReq, SUserObj *pUser) {
|
|||
}
|
||||
mDebug("trans:%d, used to drop user:%s", pTrans->id, pUser->user);
|
||||
|
||||
SSdbRaw *pRedoRaw = mndUserActionEncode(pUser);
|
||||
if (pRedoRaw == NULL || mndTransAppendRedolog(pTrans, pRedoRaw) != 0) {
|
||||
mError("trans:%d, failed to append redo log since %s", pTrans->id, terrstr());
|
||||
SSdbRaw *pCommitRaw = mndUserActionEncode(pUser);
|
||||
if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
|
||||
mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
|
||||
mndTransDrop(pTrans);
|
||||
return -1;
|
||||
}
|
||||
sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPED);
|
||||
sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED);
|
||||
|
||||
if (mndTransPrepare(pMnode, pTrans) != 0) {
|
||||
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
|
||||
|
|
|
@ -86,7 +86,6 @@ static void *mndThreadFp(void *param) {
|
|||
lastTime++;
|
||||
taosMsleep(100);
|
||||
if (pMnode->stopped) break;
|
||||
if (!mndIsMaster(pMnode)) continue;
|
||||
|
||||
if (lastTime % (tsTransPullupInterval * 10) == 0) {
|
||||
mndPullupTrans(pMnode);
|
||||
|
@ -336,9 +335,107 @@ int32_t mndAlter(SMnode *pMnode, const SMnodeOpt *pOption) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t mndStart(SMnode *pMnode) { return mndInitTimer(pMnode); }
|
||||
int32_t mndStart(SMnode *pMnode) {
|
||||
mndSyncStart(pMnode);
|
||||
return mndInitTimer(pMnode);
|
||||
}
|
||||
|
||||
void mndStop(SMnode *pMnode) { return mndCleanupTimer(pMnode); }
|
||||
void mndStop(SMnode *pMnode) {
|
||||
mndSyncStop(pMnode);
|
||||
return mndCleanupTimer(pMnode);
|
||||
}
|
||||
|
||||
int32_t mndProcessSyncMsg(SRpcMsg *pMsg) {
|
||||
SMnode *pMnode = pMsg->info.node;
|
||||
void *ahandle = pMsg->info.ahandle;
|
||||
int32_t ret = TAOS_SYNC_PROPOSE_OTHER_ERROR;
|
||||
|
||||
if (syncEnvIsStart()) {
|
||||
SSyncNode *pSyncNode = syncNodeAcquire(pMnode->syncMgmt.sync);
|
||||
assert(pSyncNode != NULL);
|
||||
|
||||
ESyncState state = syncGetMyRole(pMnode->syncMgmt.sync);
|
||||
SyncTerm currentTerm = syncGetMyTerm(pMnode->syncMgmt.sync);
|
||||
|
||||
SMsgHead *pHead = pMsg->pCont;
|
||||
|
||||
char logBuf[512];
|
||||
char *syncNodeStr = sync2SimpleStr(pMnode->syncMgmt.sync);
|
||||
snprintf(logBuf, sizeof(logBuf), "==vnodeProcessSyncReq== msgType:%d, syncNode: %s", pMsg->msgType, syncNodeStr);
|
||||
syncRpcMsgLog2(logBuf, pMsg);
|
||||
taosMemoryFree(syncNodeStr);
|
||||
|
||||
SRpcMsg *pRpcMsg = pMsg;
|
||||
|
||||
if (pRpcMsg->msgType == TDMT_VND_SYNC_TIMEOUT) {
|
||||
SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pRpcMsg);
|
||||
assert(pSyncMsg != NULL);
|
||||
|
||||
ret = syncNodeOnTimeoutCb(pSyncNode, pSyncMsg);
|
||||
syncTimeoutDestroy(pSyncMsg);
|
||||
|
||||
} else if (pRpcMsg->msgType == TDMT_VND_SYNC_PING) {
|
||||
SyncPing *pSyncMsg = syncPingFromRpcMsg2(pRpcMsg);
|
||||
assert(pSyncMsg != NULL);
|
||||
|
||||
ret = syncNodeOnPingCb(pSyncNode, pSyncMsg);
|
||||
syncPingDestroy(pSyncMsg);
|
||||
|
||||
} else if (pRpcMsg->msgType == TDMT_VND_SYNC_PING_REPLY) {
|
||||
SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pRpcMsg);
|
||||
assert(pSyncMsg != NULL);
|
||||
|
||||
ret = syncNodeOnPingReplyCb(pSyncNode, pSyncMsg);
|
||||
syncPingReplyDestroy(pSyncMsg);
|
||||
|
||||
} else if (pRpcMsg->msgType == TDMT_VND_SYNC_CLIENT_REQUEST) {
|
||||
SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pRpcMsg);
|
||||
assert(pSyncMsg != NULL);
|
||||
|
||||
ret = syncNodeOnClientRequestCb(pSyncNode, pSyncMsg);
|
||||
syncClientRequestDestroy(pSyncMsg);
|
||||
|
||||
} else if (pRpcMsg->msgType == TDMT_VND_SYNC_REQUEST_VOTE) {
|
||||
SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pRpcMsg);
|
||||
assert(pSyncMsg != NULL);
|
||||
|
||||
ret = syncNodeOnRequestVoteCb(pSyncNode, pSyncMsg);
|
||||
syncRequestVoteDestroy(pSyncMsg);
|
||||
|
||||
} else if (pRpcMsg->msgType == TDMT_VND_SYNC_REQUEST_VOTE_REPLY) {
|
||||
SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pRpcMsg);
|
||||
assert(pSyncMsg != NULL);
|
||||
|
||||
ret = syncNodeOnRequestVoteReplyCb(pSyncNode, pSyncMsg);
|
||||
syncRequestVoteReplyDestroy(pSyncMsg);
|
||||
|
||||
} else if (pRpcMsg->msgType == TDMT_VND_SYNC_APPEND_ENTRIES) {
|
||||
SyncAppendEntries *pSyncMsg = syncAppendEntriesFromRpcMsg2(pRpcMsg);
|
||||
assert(pSyncMsg != NULL);
|
||||
|
||||
ret = syncNodeOnAppendEntriesCb(pSyncNode, pSyncMsg);
|
||||
syncAppendEntriesDestroy(pSyncMsg);
|
||||
|
||||
} else if (pRpcMsg->msgType == TDMT_VND_SYNC_APPEND_ENTRIES_REPLY) {
|
||||
SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pRpcMsg);
|
||||
assert(pSyncMsg != NULL);
|
||||
|
||||
ret = syncNodeOnAppendEntriesReplyCb(pSyncNode, pSyncMsg);
|
||||
syncAppendEntriesReplyDestroy(pSyncMsg);
|
||||
|
||||
} else {
|
||||
mError("==mndProcessSyncMsg== error msg type:%d", pRpcMsg->msgType);
|
||||
ret = TAOS_SYNC_PROPOSE_OTHER_ERROR;
|
||||
}
|
||||
|
||||
syncNodeRelease(pSyncNode);
|
||||
} else {
|
||||
mError("==mndProcessSyncMsg== error syncEnv stop");
|
||||
ret = TAOS_SYNC_PROPOSE_OTHER_ERROR;
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
int32_t mndProcessMsg(SRpcMsg *pMsg) {
|
||||
SMnode *pMnode = pMsg->info.node;
|
||||
|
@ -346,7 +443,8 @@ int32_t mndProcessMsg(SRpcMsg *pMsg) {
|
|||
mTrace("msg:%p, will be processed, type:%s app:%p", pMsg, TMSG_INFO(pMsg->msgType), ahandle);
|
||||
|
||||
if (IsReq(pMsg)) {
|
||||
if (!mndIsMaster(pMnode)) {
|
||||
if (!mndIsMaster(pMnode) && pMsg->msgType != TDMT_MND_TRANS_TIMER && pMsg->msgType != TDMT_MND_MQ_TIMER &&
|
||||
pMsg->msgType != TDMT_MND_TELEM_TIMER) {
|
||||
terrno = TSDB_CODE_APP_NOT_READY;
|
||||
mDebug("msg:%p, failed to process since %s, app:%p", pMsg, terrstr(), ahandle);
|
||||
return -1;
|
||||
|
|
|
@ -493,9 +493,8 @@ TEST_F(MndTestSdb, 01_Write_Str) {
|
|||
ASSERT_EQ(sdbGetSize(pSdb, SDB_USER), 2);
|
||||
ASSERT_EQ(sdbGetMaxId(pSdb, SDB_USER), -1);
|
||||
ASSERT_EQ(sdbGetTableVer(pSdb, SDB_USER), 2 );
|
||||
ASSERT_EQ(sdbUpdateVer(pSdb, 0), -1);
|
||||
ASSERT_EQ(sdbUpdateVer(pSdb, 1), 0);
|
||||
ASSERT_EQ(sdbUpdateVer(pSdb, -1), -1);
|
||||
sdbSetApplyIndex(pSdb, -1);
|
||||
ASSERT_EQ(sdbGetApplyIndex(pSdb), -1);
|
||||
ASSERT_EQ(mnode.insertTimes, 2);
|
||||
ASSERT_EQ(mnode.deleteTimes, 0);
|
||||
|
||||
|
@ -537,9 +536,6 @@ TEST_F(MndTestSdb, 01_Write_Str) {
|
|||
|
||||
ASSERT_EQ(sdbGetSize(pSdb, SDB_USER), 3);
|
||||
ASSERT_EQ(sdbGetTableVer(pSdb, SDB_USER), 4);
|
||||
ASSERT_EQ(sdbUpdateVer(pSdb, 0), -1);
|
||||
ASSERT_EQ(sdbUpdateVer(pSdb, 1), 0);
|
||||
ASSERT_EQ(sdbUpdateVer(pSdb, -1), -1);
|
||||
ASSERT_EQ(mnode.insertTimes, 3);
|
||||
ASSERT_EQ(mnode.deleteTimes, 0);
|
||||
|
||||
|
@ -704,8 +700,9 @@ TEST_F(MndTestSdb, 01_Write_Str) {
|
|||
}
|
||||
|
||||
// write version
|
||||
ASSERT_EQ(sdbUpdateVer(pSdb, 1), 0);
|
||||
ASSERT_EQ(sdbUpdateVer(pSdb, 1), 1);
|
||||
sdbSetApplyIndex(pSdb, 0);
|
||||
sdbSetApplyIndex(pSdb, 1);
|
||||
ASSERT_EQ(sdbGetApplyIndex(pSdb), 1);
|
||||
ASSERT_EQ(sdbWriteFile(pSdb), 0);
|
||||
ASSERT_EQ(sdbWriteFile(pSdb), 0);
|
||||
|
||||
|
@ -775,7 +772,7 @@ TEST_F(MndTestSdb, 01_Read_Str) {
|
|||
ASSERT_EQ(sdbGetSize(pSdb, SDB_USER), 2);
|
||||
ASSERT_EQ(sdbGetMaxId(pSdb, SDB_USER), -1);
|
||||
ASSERT_EQ(sdbGetTableVer(pSdb, SDB_USER), 5);
|
||||
ASSERT_EQ(sdbUpdateVer(pSdb, 0), 1);
|
||||
ASSERT_EQ(sdbGetApplyIndex(pSdb), 1);
|
||||
ASSERT_EQ(mnode.insertTimes, 4);
|
||||
ASSERT_EQ(mnode.deleteTimes, 0);
|
||||
|
||||
|
|
|
@ -31,7 +31,7 @@ target_include_directories(
|
|||
PUBLIC "${TD_SOURCE_DIR}/include/dnode/mnode"
|
||||
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/../../inc"
|
||||
)
|
||||
add_test(
|
||||
NAME transTest2
|
||||
COMMAND transTest2
|
||||
)
|
||||
#add_test(
|
||||
# NAME transTest2
|
||||
# COMMAND transTest2
|
||||
#)
|
||||
|
|
|
@ -23,6 +23,11 @@ int32_t sendReq(const SEpSet *pEpSet, SRpcMsg *pMsg) {
|
|||
return -1;
|
||||
}
|
||||
|
||||
int32_t putToQueue(void *pMgmt, SRpcMsg *pMsg) {
|
||||
terrno = TSDB_CODE_INVALID_PTR;
|
||||
return -1;
|
||||
}
|
||||
|
||||
class MndTestTrans2 : public ::testing::Test {
|
||||
protected:
|
||||
static void InitLog() {
|
||||
|
@ -55,6 +60,9 @@ class MndTestTrans2 : public ::testing::Test {
|
|||
msgCb.reportStartupFp = reportStartup;
|
||||
msgCb.sendReqFp = sendReq;
|
||||
msgCb.sendRspFp = sendRsp;
|
||||
msgCb.queueFps[SYNC_QUEUE] = putToQueue;
|
||||
msgCb.queueFps[WRITE_QUEUE] = putToQueue;
|
||||
msgCb.queueFps[READ_QUEUE] = putToQueue;
|
||||
msgCb.mgmt = (SMgmtWrapper *)(&msgCb); // hack
|
||||
tmsgSetDefault(&msgCb);
|
||||
|
||||
|
@ -77,6 +85,7 @@ class MndTestTrans2 : public ::testing::Test {
|
|||
static void SetUpTestSuite() {
|
||||
InitLog();
|
||||
walInit();
|
||||
syncInit();
|
||||
InitMnode();
|
||||
}
|
||||
|
||||
|
|
|
@ -31,11 +31,9 @@ SSdb *sdbInit(SSdbOpt *pOption) {
|
|||
char path[PATH_MAX + 100] = {0};
|
||||
snprintf(path, sizeof(path), "%s%sdata", pOption->path, TD_DIRSEP);
|
||||
pSdb->currDir = strdup(path);
|
||||
snprintf(path, sizeof(path), "%s%ssync", pOption->path, TD_DIRSEP);
|
||||
pSdb->syncDir = strdup(path);
|
||||
snprintf(path, sizeof(path), "%s%stmp", pOption->path, TD_DIRSEP);
|
||||
pSdb->tmpDir = strdup(path);
|
||||
if (pSdb->currDir == NULL || pSdb->currDir == NULL || pSdb->currDir == NULL) {
|
||||
if (pSdb->currDir == NULL || pSdb->tmpDir == NULL) {
|
||||
sdbCleanup(pSdb);
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
mError("failed to init sdb since %s", terrstr());
|
||||
|
@ -55,6 +53,7 @@ SSdb *sdbInit(SSdbOpt *pOption) {
|
|||
}
|
||||
|
||||
pSdb->curVer = -1;
|
||||
pSdb->curTerm = -1;
|
||||
pSdb->lastCommitVer = -1;
|
||||
pSdb->pMnode = pOption->pMnode;
|
||||
mDebug("sdb init successfully");
|
||||
|
@ -149,12 +148,6 @@ static int32_t sdbCreateDir(SSdb *pSdb) {
|
|||
return -1;
|
||||
}
|
||||
|
||||
if (taosMkDir(pSdb->syncDir) != 0) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
mError("failed to create dir:%s since %s", pSdb->syncDir, terrstr());
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (taosMkDir(pSdb->tmpDir) != 0) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
mError("failed to create dir:%s since %s", pSdb->tmpDir, terrstr());
|
||||
|
@ -164,4 +157,10 @@ static int32_t sdbCreateDir(SSdb *pSdb) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int64_t sdbUpdateVer(SSdb *pSdb, int32_t val) { return atomic_add_fetch_64(&pSdb->curVer, val); }
|
||||
void sdbSetApplyIndex(SSdb *pSdb, int64_t index) { pSdb->curVer = index; }
|
||||
|
||||
int64_t sdbGetApplyIndex(SSdb *pSdb) { return pSdb->curVer; }
|
||||
|
||||
void sdbSetApplyTerm(SSdb *pSdb, int64_t term) { pSdb->curTerm = term; }
|
||||
|
||||
int64_t sdbGetApplyTerm(SSdb *pSdb) { return pSdb->curTerm; }
|
||||
|
|
|
@ -65,6 +65,16 @@ static int32_t sdbReadFileHead(SSdb *pSdb, TdFilePtr pFile) {
|
|||
return -1;
|
||||
}
|
||||
|
||||
ret = taosReadFile(pFile, &pSdb->curTerm, sizeof(int64_t));
|
||||
if (ret < 0) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
return -1;
|
||||
}
|
||||
if (ret != sizeof(int64_t)) {
|
||||
terrno = TSDB_CODE_FILE_CORRUPTED;
|
||||
return -1;
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < SDB_TABLE_SIZE; ++i) {
|
||||
int64_t maxId = 0;
|
||||
ret = taosReadFile(pFile, &maxId, sizeof(int64_t));
|
||||
|
@ -123,6 +133,11 @@ static int32_t sdbWriteFileHead(SSdb *pSdb, TdFilePtr pFile) {
|
|||
return -1;
|
||||
}
|
||||
|
||||
if (taosWriteFile(pFile, &pSdb->curTerm, sizeof(int64_t)) != sizeof(int64_t)) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
return -1;
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < SDB_TABLE_SIZE; ++i) {
|
||||
int64_t maxId = 0;
|
||||
if (i < SDB_MAX) {
|
||||
|
@ -182,6 +197,7 @@ int32_t sdbReadFile(SSdb *pSdb) {
|
|||
if (sdbReadFileHead(pSdb, pFile) != 0) {
|
||||
mError("failed to read file:%s head since %s", file, terrstr());
|
||||
pSdb->curVer = -1;
|
||||
pSdb->curTerm = -1;
|
||||
taosMemoryFree(pRaw);
|
||||
taosCloseFile(&pFile);
|
||||
return -1;
|
||||
|
@ -256,8 +272,8 @@ static int32_t sdbWriteFileImp(SSdb *pSdb) {
|
|||
char curfile[PATH_MAX] = {0};
|
||||
snprintf(curfile, sizeof(curfile), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP);
|
||||
|
||||
mDebug("start to write file:%s, current ver:%" PRId64 ", commit ver:%" PRId64, curfile, pSdb->curVer,
|
||||
pSdb->lastCommitVer);
|
||||
mDebug("start to write file:%s, current ver:%" PRId64 " term:%" PRId64 ", commit ver:%" PRId64, curfile, pSdb->curVer,
|
||||
pSdb->curTerm, pSdb->lastCommitVer);
|
||||
|
||||
TdFilePtr pFile = taosOpenFile(tmpfile, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
|
||||
if (pFile == NULL) {
|
||||
|
@ -350,7 +366,7 @@ static int32_t sdbWriteFileImp(SSdb *pSdb) {
|
|||
mError("failed to write file:%s since %s", curfile, tstrerror(code));
|
||||
} else {
|
||||
pSdb->lastCommitVer = pSdb->curVer;
|
||||
mDebug("write file:%s successfully, ver:%" PRId64, curfile, pSdb->lastCommitVer);
|
||||
mDebug("write file:%s successfully, ver:%" PRId64 " term:%" PRId64, curfile, pSdb->lastCommitVer, pSdb->curTerm);
|
||||
}
|
||||
|
||||
terrno = code;
|
||||
|
|
|
@ -20,9 +20,9 @@
|
|||
|
||||
#include "executor.h"
|
||||
#include "os.h"
|
||||
#include "tcache.h"
|
||||
#include "thash.h"
|
||||
#include "tmsg.h"
|
||||
#include "tqueue.h"
|
||||
#include "trpc.h"
|
||||
#include "ttimer.h"
|
||||
#include "wal.h"
|
||||
|
@ -86,6 +86,9 @@ typedef struct {
|
|||
qTaskInfo_t task[5];
|
||||
} STqExec;
|
||||
|
||||
int32_t tEncodeSTqExec(SEncoder* pEncoder, const STqExec* pExec);
|
||||
int32_t tDecodeSTqExec(SDecoder* pDecoder, STqExec* pExec);
|
||||
|
||||
struct STQ {
|
||||
char* path;
|
||||
SHashObj* pushMgr; // consumerId -> STqExec*
|
||||
|
@ -93,7 +96,7 @@ struct STQ {
|
|||
SHashObj* pStreamTasks;
|
||||
SVnode* pVnode;
|
||||
SWal* pWal;
|
||||
// TDB* pTdb;
|
||||
TDB* pTdb;
|
||||
};
|
||||
|
||||
typedef struct {
|
||||
|
@ -101,7 +104,7 @@ typedef struct {
|
|||
tmr_h timer;
|
||||
} STqMgmt;
|
||||
|
||||
static STqMgmt tqMgmt;
|
||||
static STqMgmt tqMgmt = {0};
|
||||
|
||||
// init once
|
||||
int tqInit();
|
||||
|
|
|
@ -14,14 +14,37 @@
|
|||
*/
|
||||
|
||||
#include "tq.h"
|
||||
#include "tqueue.h"
|
||||
|
||||
int32_t tqInit() {
|
||||
//
|
||||
int8_t old;
|
||||
while (1) {
|
||||
old = atomic_val_compare_exchange_8(&tqMgmt.inited, 0, 2);
|
||||
if (old != 2) break;
|
||||
}
|
||||
|
||||
if (old == 0) {
|
||||
tqMgmt.timer = taosTmrInit(10000, 100, 10000, "TQ");
|
||||
if (tqMgmt.timer == NULL) {
|
||||
atomic_store_8(&tqMgmt.inited, 0);
|
||||
return -1;
|
||||
}
|
||||
atomic_store_8(&tqMgmt.inited, 1);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
void tqCleanUp() {}
|
||||
void tqCleanUp() {
|
||||
int8_t old;
|
||||
while (1) {
|
||||
old = atomic_val_compare_exchange_8(&tqMgmt.inited, 1, 2);
|
||||
if (old != 2) break;
|
||||
}
|
||||
|
||||
if (old == 1) {
|
||||
taosTmrCleanUp(tqMgmt.timer);
|
||||
atomic_store_8(&tqMgmt.inited, 0);
|
||||
}
|
||||
}
|
||||
|
||||
STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal) {
|
||||
STQ* pTq = taosMemoryMalloc(sizeof(STQ));
|
||||
|
@ -32,9 +55,9 @@ STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal) {
|
|||
pTq->path = strdup(path);
|
||||
pTq->pVnode = pVnode;
|
||||
pTq->pWal = pWal;
|
||||
/*if (tdbOpen(path, 4096, 1, &pTq->pTdb) < 0) {*/
|
||||
/*ASSERT(0);*/
|
||||
/*}*/
|
||||
if (tdbOpen(path, 4096, 1, &pTq->pTdb) < 0) {
|
||||
ASSERT(0);
|
||||
}
|
||||
|
||||
pTq->execs = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
|
||||
|
||||
|
@ -51,11 +74,45 @@ void tqClose(STQ* pTq) {
|
|||
taosHashCleanup(pTq->execs);
|
||||
taosHashCleanup(pTq->pStreamTasks);
|
||||
taosHashCleanup(pTq->pushMgr);
|
||||
tdbClose(pTq->pTdb);
|
||||
taosMemoryFree(pTq);
|
||||
}
|
||||
// TODO
|
||||
}
|
||||
|
||||
int32_t tEncodeSTqExec(SEncoder* pEncoder, const STqExec* pExec) {
|
||||
if (tStartEncode(pEncoder) < 0) return -1;
|
||||
if (tEncodeCStr(pEncoder, pExec->subKey) < 0) return -1;
|
||||
if (tEncodeI64(pEncoder, pExec->consumerId) < 0) return -1;
|
||||
if (tEncodeI32(pEncoder, pExec->epoch) < 0) return -1;
|
||||
if (tEncodeI8(pEncoder, pExec->subType) < 0) return -1;
|
||||
if (tEncodeI8(pEncoder, pExec->withTbName) < 0) return -1;
|
||||
if (tEncodeI8(pEncoder, pExec->withSchema) < 0) return -1;
|
||||
if (tEncodeI8(pEncoder, pExec->withTag) < 0) return -1;
|
||||
if (pExec->subType == TOPIC_SUB_TYPE__TABLE) {
|
||||
if (tEncodeCStr(pEncoder, pExec->qmsg) < 0) return -1;
|
||||
// TODO encode modified exec
|
||||
}
|
||||
tEndEncode(pEncoder);
|
||||
return pEncoder->pos;
|
||||
}
|
||||
|
||||
int32_t tDecodeSTqExec(SDecoder* pDecoder, STqExec* pExec) {
|
||||
if (tStartDecode(pDecoder) < 0) return -1;
|
||||
if (tDecodeCStrTo(pDecoder, pExec->subKey) < 0) return -1;
|
||||
if (tDecodeI64(pDecoder, &pExec->consumerId) < 0) return -1;
|
||||
if (tDecodeI32(pDecoder, &pExec->epoch) < 0) return -1;
|
||||
if (tDecodeI8(pDecoder, &pExec->subType) < 0) return -1;
|
||||
if (tDecodeI8(pDecoder, &pExec->withTbName) < 0) return -1;
|
||||
if (tDecodeI8(pDecoder, &pExec->withSchema) < 0) return -1;
|
||||
if (tDecodeI8(pDecoder, &pExec->withTag) < 0) return -1;
|
||||
if (pExec->subType == TOPIC_SUB_TYPE__TABLE) {
|
||||
if (tDecodeCStrAlloc(pDecoder, &pExec->qmsg) < 0) return -1;
|
||||
// TODO decode modified exec
|
||||
}
|
||||
tEndDecode(pDecoder);
|
||||
return 0;
|
||||
}
|
||||
int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
|
||||
void* pIter = NULL;
|
||||
while (1) {
|
||||
|
|
|
@ -252,6 +252,45 @@ static FORCE_INLINE void tsdbSwapDataCols(SDataCols *pDest, SDataCols *pSrc) {
|
|||
pSrc->cols = pCols;
|
||||
}
|
||||
|
||||
static void printTsdbLoadBlkData(SReadH *readh, SDataCols *pDCols, SBlock *pBlock, const char *tag, int32_t ln) {
|
||||
printf("%s:%d:%" PRIi64 " ================\n", tag, ln, taosGetSelfPthreadId());
|
||||
if (pBlock) {
|
||||
SDFile *pHeadf = TSDB_READ_HEAD_FILE(readh);
|
||||
printf("%s:%d:%" PRIi64 ":%p:%d %s\n", tag, ln, taosGetSelfPthreadId(), pBlock, (int32_t)pBlock->len,
|
||||
pHeadf->f.aname);
|
||||
SDFile *pDFile = pBlock->last ? TSDB_READ_LAST_FILE(readh) : TSDB_READ_DATA_FILE(readh);
|
||||
printf("%s:%d:%" PRIi64 ":%p:%d %s\n", tag, ln, taosGetSelfPthreadId(), pBlock, (int32_t)pBlock->len,
|
||||
pDFile->f.aname);
|
||||
}
|
||||
SDataCol *pDCol = pDCols->cols + 0;
|
||||
if (TSKEY_MIN == *(int64_t *)pDCol->pData) {
|
||||
ASSERT(0);
|
||||
}
|
||||
|
||||
int rows = pDCols->numOfRows;
|
||||
for (int r = 0; r < rows; ++r) {
|
||||
if (pBlock) {
|
||||
printf("%s:%d:%" PRIi64 ":%p:%d rows[%d][%d] ", tag, ln, taosGetSelfPthreadId(), pBlock, (int32_t)pBlock->len,
|
||||
rows, r);
|
||||
} else {
|
||||
printf("%s:%d:%" PRIi64 ":%s rows[%d][%d] ", tag, ln, taosGetSelfPthreadId(), "=== merge === ", rows, r);
|
||||
}
|
||||
|
||||
int nDataCols = pDCols->numOfCols;
|
||||
int j = 0;
|
||||
SCellVal sVal = {0};
|
||||
while (j < nDataCols) {
|
||||
SDataCol *pDataCol = pDCols->cols + j;
|
||||
tdGetColDataOfRow(&sVal, pDataCol, r, pDCols->bitmapMode);
|
||||
tdSCellValPrint(&sVal, pDataCol->type);
|
||||
++j;
|
||||
}
|
||||
printf("\n");
|
||||
}
|
||||
|
||||
fflush(stdout);
|
||||
}
|
||||
|
||||
int tsdbLoadBlockData(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo) {
|
||||
ASSERT(pBlock->numOfSubBlocks > 0);
|
||||
STsdbCfg *pCfg = REPO_CFG(pReadh->pRepo);
|
||||
|
@ -266,15 +305,23 @@ int tsdbLoadBlockData(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo) {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
if (tsdbLoadBlockDataImpl(pReadh, iBlock, pReadh->pDCols[0], TSDB_BITMODE_ONE_BIT) < 0) return -1;
|
||||
#ifdef TD_DEBUG_PRINT_TSDB_LOAD_DCOLS
|
||||
printTsdbLoadBlkData(pReadh, pReadh->pDCols[0], iBlock, __func__, __LINE__);
|
||||
#endif
|
||||
for (int i = 1; i < pBlock->numOfSubBlocks; i++) {
|
||||
iBlock++;
|
||||
if (tsdbLoadBlockDataImpl(pReadh, iBlock, pReadh->pDCols[1], TSDB_BITMODE_DEFAULT) < 0) return -1;
|
||||
#ifdef TD_DEBUG_PRINT_TSDB_LOAD_DCOLS
|
||||
printTsdbLoadBlkData(pReadh, pReadh->pDCols[1], iBlock, __func__, __LINE__);
|
||||
#endif
|
||||
// TODO: use the real maxVersion to replace the UINT64_MAX to support Multi-Version
|
||||
if (tdMergeDataCols(pReadh->pDCols[0], pReadh->pDCols[1], pReadh->pDCols[1]->numOfRows, NULL,
|
||||
TD_SUPPORT_UPDATE(update), TD_VER_MAX) < 0)
|
||||
return -1;
|
||||
#ifdef TD_DEBUG_PRINT_TSDB_LOAD_DCOLS
|
||||
printTsdbLoadBlkData(pReadh, pReadh->pDCols[0], iBlock, " === MERGE === ", __LINE__);
|
||||
#endif
|
||||
}
|
||||
// if ((pBlock->numOfSubBlocks == 1) && (iBlock->hasDupKey)) { // TODO: use this line
|
||||
if (pBlock->numOfSubBlocks == 1) {
|
||||
|
@ -286,6 +333,9 @@ int tsdbLoadBlockData(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo) {
|
|||
}
|
||||
tsdbSwapDataCols(pReadh->pDCols[0], pReadh->pDCols[1]);
|
||||
ASSERT(pReadh->pDCols[0]->bitmapMode != 0);
|
||||
#ifdef TD_DEBUG_PRINT_TSDB_LOAD_DCOLS
|
||||
printTsdbLoadBlkData(pReadh, pReadh->pDCols[0], iBlock, " === UPDATE FILTER === ", __LINE__);
|
||||
#endif
|
||||
}
|
||||
|
||||
ASSERT(pReadh->pDCols[0]->numOfRows <= pBlock->numOfRows);
|
||||
|
@ -295,6 +345,53 @@ int tsdbLoadBlockData(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
static void printTsdbLoadBlkDataCols(SReadH *readh, SDataCols *pDCols, SBlock *pBlock, const int16_t *colIds,
|
||||
int numOfColsIds, const char *tag, int32_t ln) {
|
||||
printf("%s:%d:%" PRIi64 " ================\n", tag, ln, taosGetSelfPthreadId());
|
||||
if (pBlock) {
|
||||
SDFile *pHeadf = TSDB_READ_HEAD_FILE(readh);
|
||||
printf("%s:%d:%" PRIi64 ":%p:%d %s\n", tag, ln, taosGetSelfPthreadId(), pBlock, (int32_t)pBlock->len,
|
||||
pHeadf->f.aname);
|
||||
SDFile *pDFile = pBlock->last ? TSDB_READ_LAST_FILE(readh) : TSDB_READ_DATA_FILE(readh);
|
||||
printf("%s:%d:%" PRIi64 ":%p:%d %s\n", tag, ln, taosGetSelfPthreadId(), pBlock, (int32_t)pBlock->len,
|
||||
pDFile->f.aname);
|
||||
}
|
||||
|
||||
int rows = pDCols->numOfRows;
|
||||
for (int r = 0; r < rows; ++r) {
|
||||
if (pBlock) {
|
||||
printf("%s:%d:%" PRIi64 ":%p:%d rows[%d][%d] ", tag, ln, taosGetSelfPthreadId(), pBlock, (int32_t)pBlock->len,
|
||||
rows, r);
|
||||
} else {
|
||||
printf("%s:%d:%" PRIi64 ":%s rows[%d][%d] ", tag, ln, taosGetSelfPthreadId(), "=== merge === ", rows, r);
|
||||
}
|
||||
|
||||
int nDataCols = pDCols->numOfCols;
|
||||
int j = 0, k = 0;
|
||||
SCellVal sVal = {0};
|
||||
while (j < nDataCols) {
|
||||
if (k >= numOfColsIds) break;
|
||||
SDataCol *pDataCol = pDCols->cols + j;
|
||||
int16_t colId1 = pDataCol->colId;
|
||||
int16_t colId2 = *(colIds + k);
|
||||
if (colId1 < colId2) {
|
||||
++j;
|
||||
} else if (colId1 > colId2) {
|
||||
++k; // colId2 not exists in SDataCols
|
||||
printf("NotExists ");
|
||||
} else {
|
||||
tdGetColDataOfRow(&sVal, pDataCol, r, pDCols->bitmapMode);
|
||||
tdSCellValPrint(&sVal, pDataCol->type);
|
||||
++j;
|
||||
++k;
|
||||
}
|
||||
}
|
||||
printf("\n");
|
||||
}
|
||||
|
||||
fflush(stdout);
|
||||
}
|
||||
|
||||
// TODO: filter by Multi-Version
|
||||
int tsdbLoadBlockDataCols(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo, const int16_t *colIds, int numOfColsIds,
|
||||
bool mergeBitmap) {
|
||||
|
@ -310,14 +407,25 @@ int tsdbLoadBlockDataCols(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo,
|
|||
}
|
||||
}
|
||||
|
||||
if (tsdbLoadBlockDataColsImpl(pReadh, iBlock, pReadh->pDCols[0], colIds, numOfColsIds, TSDB_BITMODE_ONE_BIT) < 0) return -1;
|
||||
if (tsdbLoadBlockDataColsImpl(pReadh, iBlock, pReadh->pDCols[0], colIds, numOfColsIds, TSDB_BITMODE_ONE_BIT) < 0)
|
||||
return -1;
|
||||
#ifdef TD_DEBUG_PRINT_TSDB_LOAD_DCOLS
|
||||
printTsdbLoadBlkDataCols(pReadh, pReadh->pDCols[0], iBlock, colIds, numOfColsIds, __func__, __LINE__);
|
||||
#endif
|
||||
for (int i = 1; i < pBlock->numOfSubBlocks; i++) {
|
||||
iBlock++;
|
||||
if (tsdbLoadBlockDataColsImpl(pReadh, iBlock, pReadh->pDCols[1], colIds, numOfColsIds, TSDB_BITMODE_DEFAULT) < 0) return -1;
|
||||
if (tsdbLoadBlockDataColsImpl(pReadh, iBlock, pReadh->pDCols[1], colIds, numOfColsIds, TSDB_BITMODE_DEFAULT) < 0)
|
||||
return -1;
|
||||
#ifdef TD_DEBUG_PRINT_TSDB_LOAD_DCOLS
|
||||
printTsdbLoadBlkDataCols(pReadh, pReadh->pDCols[1], iBlock, colIds, numOfColsIds, __func__, __LINE__);
|
||||
#endif
|
||||
// TODO: use the real maxVersion to replace the UINT64_MAX to support Multi-Version
|
||||
if (tdMergeDataCols(pReadh->pDCols[0], pReadh->pDCols[1], pReadh->pDCols[1]->numOfRows, NULL,
|
||||
TD_SUPPORT_UPDATE(update), TD_VER_MAX) < 0)
|
||||
return -1;
|
||||
#ifdef TD_DEBUG_PRINT_TSDB_LOAD_DCOLS
|
||||
printTsdbLoadBlkDataCols(pReadh, pReadh->pDCols[0], NULL, colIds, numOfColsIds, __func__, __LINE__);
|
||||
#endif
|
||||
}
|
||||
// if ((pBlock->numOfSubBlocks == 1) && (iBlock->hasDupKey)) { // TODO: use this line
|
||||
if (pBlock->numOfSubBlocks == 1) {
|
||||
|
@ -329,18 +437,23 @@ int tsdbLoadBlockDataCols(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo,
|
|||
}
|
||||
tsdbSwapDataCols(pReadh->pDCols[0], pReadh->pDCols[1]);
|
||||
ASSERT(pReadh->pDCols[0]->bitmapMode != 0);
|
||||
#ifdef TD_DEBUG_PRINT_TSDB_LOAD_DCOLS
|
||||
printTsdbLoadBlkDataCols(pReadh, pReadh->pDCols[0], NULL, colIds, numOfColsIds,
|
||||
" === update filter === ", __LINE__);
|
||||
#endif
|
||||
}
|
||||
|
||||
if (mergeBitmap && !tdDataColsIsBitmapI(pReadh->pDCols[0])) {
|
||||
for (int i = 0; i < numOfColsIds; ++i) {
|
||||
SDataCol *pDataCol = pReadh->pDCols[0]->cols + i;
|
||||
if (pDataCol->len > 0 && pDataCol->bitmap) {
|
||||
ASSERT(pDataCol->colId != PRIMARYKEY_TIMESTAMP_COL_ID);
|
||||
ASSERT(pDataCol->pBitmap);
|
||||
tdMergeBitmap(pDataCol->pBitmap, pReadh->pDCols[0]->numOfRows, pDataCol->pBitmap);
|
||||
tdDataColsSetBitmapI(pReadh->pDCols[0]);
|
||||
}
|
||||
}
|
||||
#ifdef TD_DEBUG_PRINT_TSDB_LOAD_DCOLS
|
||||
printTsdbLoadBlkDataCols(pReadh, pReadh->pDCols[0], NULL, colIds, numOfColsIds, " === merge bitmap === ", __LINE__);
|
||||
#endif
|
||||
}
|
||||
|
||||
ASSERT(pReadh->pDCols[0]->numOfRows <= pBlock->numOfRows);
|
||||
|
@ -551,9 +664,7 @@ static int tsdbLoadBlockDataImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *pDat
|
|||
|
||||
tdResetDataCols(pDataCols);
|
||||
|
||||
if (tdIsBitmapModeI(bitmapMode)) {
|
||||
tdDataColsSetBitmapI(pDataCols);
|
||||
}
|
||||
pDataCols->bitmapMode = bitmapMode;
|
||||
|
||||
if (tsdbMakeRoom((void **)(&TSDB_READ_BUF(pReadh)), pBlock->len) < 0) return -1;
|
||||
|
||||
|
@ -740,9 +851,7 @@ static int tsdbLoadBlockDataColsImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *
|
|||
|
||||
tdResetDataCols(pDataCols);
|
||||
|
||||
if (tdIsBitmapModeI(bitmapMode)) {
|
||||
tdDataColsSetBitmapI(pDataCols);
|
||||
}
|
||||
pDataCols->bitmapMode = bitmapMode;
|
||||
|
||||
// If only load timestamp column, no need to load SBlockData part
|
||||
if (numOfColIds > 1 && tsdbLoadBlockOffset(pReadh, pBlock) < 0) return -1;
|
||||
|
|
|
@ -56,7 +56,13 @@ void vnodeSyncStart(SVnode *pVnode) {
|
|||
|
||||
void vnodeSyncClose(SVnode *pVnode) { syncStop(pVnode->sync); }
|
||||
|
||||
int32_t vnodeSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { return tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg); }
|
||||
int32_t vnodeSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
|
||||
int32_t code = tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg);
|
||||
if (code != 0) {
|
||||
rpcFreeCont(pMsg->pCont);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t vnodeSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) { return tmsgSendReq(pEpSet, pMsg); }
|
||||
|
||||
|
@ -141,5 +147,6 @@ SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) {
|
|||
pFsm->FpPreCommitCb = vnodeSyncPreCommitMsg;
|
||||
pFsm->FpRollBackCb = vnodeSyncRollBackMsg;
|
||||
pFsm->FpGetSnapshot = vnodeSyncGetSnapshot;
|
||||
pFsm->FpRestoreFinish = NULL;
|
||||
return pFsm;
|
||||
}
|
|
@ -34,7 +34,7 @@ if (${BUILD_WITH_INVERTEDINDEX})
|
|||
endif(${BUILD_WITH_INVERTEDINDEX})
|
||||
|
||||
|
||||
#if (${BUILD_TEST})
|
||||
# add_subdirectory(test)
|
||||
#endif(${BUILD_TEST})
|
||||
if (${BUILD_TEST})
|
||||
add_subdirectory(test)
|
||||
endif(${BUILD_TEST})
|
||||
|
||||
|
|
|
@ -40,7 +40,7 @@ typedef struct TFileHeader {
|
|||
} TFileHeader;
|
||||
#pragma pack(pop)
|
||||
|
||||
#define TFILE_HEADER_SIZE (sizeof(TFileHeader))
|
||||
#define TFILE_HEADER_SIZE (sizeof(TFileHeader))
|
||||
#define TFILE_HEADER_NO_FST (TFILE_HEADER_SIZE - sizeof(int32_t))
|
||||
|
||||
typedef struct TFileValue {
|
||||
|
|
|
@ -29,7 +29,7 @@
|
|||
#include "lucene++/Lucene_c.h"
|
||||
#endif
|
||||
|
||||
#define INDEX_NUM_OF_THREADS 4
|
||||
#define INDEX_NUM_OF_THREADS 1
|
||||
#define INDEX_QUEUE_SIZE 200
|
||||
|
||||
#define INDEX_DATA_BOOL_NULL 0x02
|
||||
|
@ -117,7 +117,6 @@ int indexOpen(SIndexOpts* opts, const char* path, SIndex** index) {
|
|||
sIdx->path = tstrdup(path);
|
||||
taosThreadMutexInit(&sIdx->mtx, NULL);
|
||||
tsem_init(&sIdx->sem, 0, 0);
|
||||
// taosThreadCondInit(&sIdx->finished, NULL);
|
||||
|
||||
sIdx->refId = indexAddRef(sIdx);
|
||||
indexAcquireRef(sIdx->refId);
|
||||
|
@ -143,13 +142,13 @@ void indexDestroy(void* handle) {
|
|||
return;
|
||||
}
|
||||
void indexClose(SIndex* sIdx) {
|
||||
indexReleaseRef(sIdx->refId);
|
||||
bool ref = 0;
|
||||
if (sIdx->colObj != NULL) {
|
||||
void* iter = taosHashIterate(sIdx->colObj, NULL);
|
||||
while (iter) {
|
||||
IndexCache** pCache = iter;
|
||||
indexCacheForceToMerge((void*)(*pCache));
|
||||
indexInfo("%s wait to merge", (*pCache)->colName);
|
||||
indexWait((void*)(sIdx));
|
||||
iter = taosHashIterate(sIdx->colObj, iter);
|
||||
indexCacheUnRef(*pCache);
|
||||
|
@ -157,7 +156,7 @@ void indexClose(SIndex* sIdx) {
|
|||
taosHashCleanup(sIdx->colObj);
|
||||
sIdx->colObj = NULL;
|
||||
}
|
||||
// taosMsleep(1000 * 5);
|
||||
indexReleaseRef(sIdx->refId);
|
||||
indexRemoveRef(sIdx->refId);
|
||||
}
|
||||
int64_t indexAddRef(void* p) {
|
||||
|
@ -554,8 +553,29 @@ void iterateValueDestroy(IterateValue* value, bool destroy) {
|
|||
taosMemoryFree(value->colVal);
|
||||
value->colVal = NULL;
|
||||
}
|
||||
|
||||
static int64_t indexGetAvaialbleVer(SIndex* sIdx, IndexCache* cache) {
|
||||
ICacheKey key = {.suid = cache->suid, .colName = cache->colName, .nColName = strlen(cache->colName)};
|
||||
int64_t ver = CACHE_VERSION(cache);
|
||||
taosThreadMutexLock(&sIdx->mtx);
|
||||
TFileReader* trd = tfileCacheGet(((IndexTFile*)sIdx->tindex)->cache, &key);
|
||||
if (trd != NULL) {
|
||||
if (ver < trd->header.version) {
|
||||
ver = trd->header.version + 1;
|
||||
} else {
|
||||
ver += 1;
|
||||
}
|
||||
indexInfo("header: %d, ver: %" PRId64 "", trd->header.version, ver);
|
||||
tfileReaderUnRef(trd);
|
||||
} else {
|
||||
indexInfo("not found reader base %p", trd);
|
||||
}
|
||||
taosThreadMutexUnlock(&sIdx->mtx);
|
||||
return ver;
|
||||
}
|
||||
static int indexGenTFile(SIndex* sIdx, IndexCache* cache, SArray* batch) {
|
||||
int32_t version = CACHE_VERSION(cache);
|
||||
int64_t version = indexGetAvaialbleVer(sIdx, cache);
|
||||
indexInfo("file name version: %" PRId64 "", version);
|
||||
uint8_t colType = cache->type;
|
||||
|
||||
TFileWriter* tw = tfileWriterOpen(sIdx->path, cache->suid, version, cache->colName, colType);
|
||||
|
@ -575,6 +595,7 @@ static int indexGenTFile(SIndex* sIdx, IndexCache* cache, SArray* batch) {
|
|||
if (reader == NULL) {
|
||||
return -1;
|
||||
}
|
||||
indexInfo("success to create tfile, reopen it, %s", reader->ctx->file.buf);
|
||||
|
||||
TFileHeader* header = &reader->header;
|
||||
ICacheKey key = {.suid = cache->suid, .colName = header->colName, .nColName = strlen(header->colName)};
|
||||
|
|
|
@ -335,6 +335,9 @@ IndexCache* indexCacheCreate(SIndex* idx, uint64_t suid, const char* colName, in
|
|||
taosThreadCondInit(&cache->finished, NULL);
|
||||
|
||||
indexCacheRef(cache);
|
||||
if (idx != NULL) {
|
||||
indexAcquireRef(idx->refId);
|
||||
}
|
||||
return cache;
|
||||
}
|
||||
void indexCacheDebug(IndexCache* cache) {
|
||||
|
@ -426,13 +429,16 @@ void indexCacheDestroy(void* cache) {
|
|||
if (pCache == NULL) {
|
||||
return;
|
||||
}
|
||||
|
||||
indexMemUnRef(pCache->mem);
|
||||
indexMemUnRef(pCache->imm);
|
||||
taosMemoryFree(pCache->colName);
|
||||
|
||||
taosThreadMutexDestroy(&pCache->mtx);
|
||||
taosThreadCondDestroy(&pCache->finished);
|
||||
|
||||
if (pCache->index != NULL) {
|
||||
indexReleaseRef(((SIndex*)pCache->index)->refId);
|
||||
}
|
||||
taosMemoryFree(pCache);
|
||||
}
|
||||
|
||||
|
|
|
@ -97,6 +97,7 @@ WriterCtx* writerCtxCreate(WriterType type, const char* path, bool readOnly, int
|
|||
int64_t file_size;
|
||||
taosStatFile(path, &file_size, NULL);
|
||||
ctx->file.size = (int)file_size;
|
||||
|
||||
} else {
|
||||
// ctx->file.pFile = open(path, O_RDONLY, S_IRWXU | S_IRWXG | S_IRWXO);
|
||||
ctx->file.pFile = taosOpenFile(path, TD_FILE_READ);
|
||||
|
|
|
@ -1,6 +1,5 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
p *
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
|
@ -152,10 +151,13 @@ TFileReader* tfileCacheGet(TFileCache* tcache, ICacheKey* key) {
|
|||
char buf[128] = {0};
|
||||
int32_t sz = indexSerialCacheKey(key, buf);
|
||||
assert(sz < sizeof(buf));
|
||||
indexInfo("Try to get key: %s", buf);
|
||||
TFileReader** reader = taosHashGet(tcache->tableCache, buf, sz);
|
||||
if (reader == NULL) {
|
||||
if (reader == NULL || *reader == NULL) {
|
||||
indexInfo("failed to get key: %s", buf);
|
||||
return NULL;
|
||||
}
|
||||
indexInfo("Get key: %s file: %s", buf, (*reader)->ctx->file.buf);
|
||||
tfileReaderRef(*reader);
|
||||
|
||||
return *reader;
|
||||
|
@ -165,9 +167,10 @@ void tfileCachePut(TFileCache* tcache, ICacheKey* key, TFileReader* reader) {
|
|||
int32_t sz = indexSerialCacheKey(key, buf);
|
||||
// remove last version index reader
|
||||
TFileReader** p = taosHashGet(tcache->tableCache, buf, sz);
|
||||
if (p != NULL) {
|
||||
if (p != NULL && *p != NULL) {
|
||||
TFileReader* oldReader = *p;
|
||||
taosHashRemove(tcache->tableCache, buf, sz);
|
||||
indexInfo("found %s, remove file %s", buf, oldReader->ctx->file.buf);
|
||||
oldReader->remove = true;
|
||||
tfileReaderUnRef(oldReader);
|
||||
}
|
||||
|
@ -180,7 +183,6 @@ TFileReader* tfileReaderCreate(WriterCtx* ctx) {
|
|||
if (reader == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
reader->ctx = ctx;
|
||||
|
||||
if (0 != tfileReaderVerify(reader)) {
|
||||
|
@ -202,6 +204,7 @@ TFileReader* tfileReaderCreate(WriterCtx* ctx) {
|
|||
tfileReaderDestroy(reader);
|
||||
return NULL;
|
||||
}
|
||||
reader->remove = false;
|
||||
|
||||
return reader;
|
||||
}
|
||||
|
@ -536,7 +539,7 @@ TFileReader* tfileReaderOpen(char* path, uint64_t suid, int32_t version, const c
|
|||
indexError("failed to open readonly file: %s, reason: %s", fullname, terrstr());
|
||||
return NULL;
|
||||
}
|
||||
indexInfo("open read file name:%s, file size: %d", wc->file.buf, wc->file.size);
|
||||
indexTrace("open read file name:%s, file size: %d", wc->file.buf, wc->file.size);
|
||||
|
||||
TFileReader* reader = tfileReaderCreate(wc);
|
||||
return reader;
|
||||
|
|
|
@ -674,10 +674,13 @@ class IndexObj {
|
|||
// opt
|
||||
numOfWrite = 0;
|
||||
numOfRead = 0;
|
||||
indexInit();
|
||||
// indexInit();
|
||||
}
|
||||
int Init(const std::string& dir) {
|
||||
taosRemoveDir(dir.c_str());
|
||||
int Init(const std::string& dir, bool remove = true) {
|
||||
if (remove) {
|
||||
taosRemoveDir(dir.c_str());
|
||||
taosMkDir(dir.c_str());
|
||||
}
|
||||
taosMkDir(dir.c_str());
|
||||
int ret = indexOpen(&opts, dir.c_str(), &idx);
|
||||
if (ret != 0) {
|
||||
|
@ -838,8 +841,11 @@ class IndexEnv2 : public ::testing::Test {
|
|||
initLog();
|
||||
index = new IndexObj();
|
||||
}
|
||||
virtual void TearDown() { delete index; }
|
||||
IndexObj* index;
|
||||
virtual void TearDown() {
|
||||
// taosMsleep(500);
|
||||
delete index;
|
||||
}
|
||||
IndexObj* index;
|
||||
};
|
||||
TEST_F(IndexEnv2, testIndexOpen) {
|
||||
std::string path = TD_TMP_DIR_PATH "test";
|
||||
|
@ -951,6 +957,8 @@ static void single_write_and_search(IndexObj* idx) {
|
|||
target = idx->SearchOne("tag2", "Test");
|
||||
}
|
||||
static void multi_write_and_search(IndexObj* idx) {
|
||||
idx->PutOne("tag1", "Hello");
|
||||
idx->PutOne("tag2", "Test");
|
||||
int target = idx->SearchOne("tag1", "Hello");
|
||||
target = idx->SearchOne("tag2", "Test");
|
||||
idx->WriteMultiMillonData("tag1", "hello world test", 100 * 100);
|
||||
|
@ -992,16 +1000,16 @@ TEST_F(IndexEnv2, testIndex_MultiWrite_and_MultiRead) {
|
|||
}
|
||||
}
|
||||
|
||||
// TEST_F(IndexEnv2, testIndex_restart) {
|
||||
// std::string path = TD_TMP_DIR_PATH "cache_and_tfile";
|
||||
// if (index->Init(path) != 0) {
|
||||
// }
|
||||
// index->SearchOneTarget("tag1", "Hello", 10);
|
||||
// index->SearchOneTarget("tag2", "Test", 10);
|
||||
//}
|
||||
TEST_F(IndexEnv2, testIndex_restart) {
|
||||
std::string path = TD_TMP_DIR_PATH "cache_and_tfile";
|
||||
if (index->Init(path, false) != 0) {
|
||||
}
|
||||
index->SearchOneTarget("tag1", "Hello", 10);
|
||||
index->SearchOneTarget("tag2", "Test", 10);
|
||||
}
|
||||
// TEST_F(IndexEnv2, testIndex_restart1) {
|
||||
// std::string path = TD_TMP_DIR_PATH "cache_and_tfile";
|
||||
// if (index->Init(path) != 0) {
|
||||
// if (index->Init(path, false) != 0) {
|
||||
// }
|
||||
// index->ReadMultiMillonData("tag1", "coding");
|
||||
// index->SearchOneTarget("tag1", "Hello", 10);
|
||||
|
@ -1018,16 +1026,16 @@ TEST_F(IndexEnv2, testIndex_MultiWrite_and_MultiRead) {
|
|||
// std::cout << "reader sz: " << index->SearchOne("tag1", "Hello") << std::endl;
|
||||
// assert(3 == index->SearchOne("tag1", "Hello"));
|
||||
//}
|
||||
// TEST_F(IndexEnv2, testIndexMultiTag) {
|
||||
// std::string path = TD_TMP_DIR_PATH "multi_tag";
|
||||
// if (index->Init(path) != 0) {
|
||||
// }
|
||||
// int64_t st = taosGetTimestampUs();
|
||||
// int32_t num = 1000 * 10000;
|
||||
// index->WriteMultiMillonData("tag1", "xxxxxxxxxxxxxxx", num);
|
||||
// std::cout << "numOfRow: " << num << "\ttime cost:" << taosGetTimestampUs() - st << std::endl;
|
||||
// // index->WriteMultiMillonData("tag2", "xxxxxxxxxxxxxxxxxxxxxxxxx", 100 * 10000);
|
||||
//}
|
||||
TEST_F(IndexEnv2, testIndexMultiTag) {
|
||||
std::string path = TD_TMP_DIR_PATH "multi_tag";
|
||||
if (index->Init(path) != 0) {
|
||||
}
|
||||
int64_t st = taosGetTimestampUs();
|
||||
int32_t num = 100 * 100;
|
||||
index->WriteMultiMillonData("tag1", "xxxxxxxxxxxxxxx", num);
|
||||
std::cout << "numOfRow: " << num << "\ttime cost:" << taosGetTimestampUs() - st << std::endl;
|
||||
// index->WriteMultiMillonData("tag2", "xxxxxxxxxxxxxxxxxxxxxxxxx", 100 * 10000);
|
||||
}
|
||||
TEST_F(IndexEnv2, testLongComVal1) {
|
||||
std::string path = TD_TMP_DIR_PATH "long_colVal";
|
||||
if (index->Init(path) != 0) {
|
||||
|
|
|
@ -264,7 +264,7 @@ int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, SQueryNodeAddr *addr, i
|
|||
SSchJob *schAcquireJob(int64_t refId);
|
||||
int32_t schReleaseJob(int64_t refId);
|
||||
void schFreeFlowCtrl(SSchJob *pJob);
|
||||
int32_t schCheckJobNeedFlowCtrl(SSchJob *pJob, SSchLevel *pLevel);
|
||||
int32_t schChkJobNeedFlowCtrl(SSchJob *pJob, SSchLevel *pLevel);
|
||||
int32_t schDecTaskFlowQuota(SSchJob *pJob, SSchTask *pTask);
|
||||
int32_t schCheckIncTaskFlowQuota(SSchJob *pJob, SSchTask *pTask, bool *enough);
|
||||
int32_t schLaunchTasksInFlowCtrlList(SSchJob *pJob, SSchTask *pTask);
|
||||
|
@ -275,6 +275,32 @@ int32_t schBuildAndSendHbMsg(SQueryNodeEpId *nodeEpId);
|
|||
int32_t schCloneSMsgSendInfo(void *src, void **dst);
|
||||
int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob);
|
||||
void schFreeJobImpl(void *job);
|
||||
int32_t schMakeHbCallbackParam(SSchJob *pJob, SSchTask *pTask, void **pParam);
|
||||
int32_t schMakeHbRpcCtx(SSchJob *pJob, SSchTask *pTask, SRpcCtx *pCtx);
|
||||
int32_t schEnsureHbConnection(SSchJob *pJob, SSchTask *pTask);
|
||||
int32_t schUpdateHbConnection(SQueryNodeEpId *epId, SSchTrans *trans);
|
||||
int32_t schHandleHbCallback(void *param, const SDataBuf *pMsg, int32_t code);
|
||||
void schFreeRpcCtx(SRpcCtx *pCtx);
|
||||
int32_t schGetCallbackFp(int32_t msgType, __async_send_cb_fn_t *fp);
|
||||
bool schJobNeedToStop(SSchJob *pJob, int8_t *pStatus);
|
||||
int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask);
|
||||
int32_t schSaveJobQueryRes(SSchJob *pJob, SResReadyRsp *rsp);
|
||||
int32_t schProcessOnExplainDone(SSchJob *pJob, SSchTask *pTask, SRetrieveTableRsp *pRsp);
|
||||
void schProcessOnDataFetched(SSchJob *job);
|
||||
int32_t schGetTaskFromTaskList(SHashObj *pTaskList, uint64_t taskId, SSchTask **pTask);
|
||||
int32_t schUpdateTaskExecNodeHandle(SSchTask *pTask, void *handle, int32_t rspCode);
|
||||
void schFreeRpcCtxVal(const void *arg);
|
||||
int32_t schMakeBrokenLinkVal(SSchJob *pJob, SSchTask *pTask, SRpcBrokenlinkVal *brokenVal, bool isHb);
|
||||
int32_t schRecordTaskExecNode(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, void *handle);
|
||||
int32_t schExecStaticExplain(void *transport, SArray *pNodeList, SQueryPlan *pDag, int64_t *job, const char *sql,
|
||||
bool syncSchedule);
|
||||
int32_t schExecJobImpl(void *transport, SArray *pNodeList, SQueryPlan *pDag, int64_t *job, const char *sql,
|
||||
int64_t startTs, bool sync);
|
||||
int32_t schChkUpdateJobStatus(SSchJob *pJob, int8_t newStatus);
|
||||
int32_t schCancelJob(SSchJob *pJob);
|
||||
int32_t schProcessOnJobDropped(SSchJob *pJob, int32_t errCode);
|
||||
uint64_t schGenTaskId(void);
|
||||
void schCloseJobRef(void);
|
||||
|
||||
|
||||
#ifdef __cplusplus
|
||||
|
|
|
@ -40,7 +40,7 @@ void schFreeFlowCtrl(SSchJob *pJob) {
|
|||
pJob->flowCtrl = NULL;
|
||||
}
|
||||
|
||||
int32_t schCheckJobNeedFlowCtrl(SSchJob *pJob, SSchLevel *pLevel) {
|
||||
int32_t schChkJobNeedFlowCtrl(SSchJob *pJob, SSchLevel *pLevel) {
|
||||
if (!SCH_IS_QUERY_JOB(pJob)) {
|
||||
SCH_JOB_DLOG("job no need flow ctrl, queryJob:%d", SCH_IS_QUERY_JOB(pJob));
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
|
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
|
@ -0,0 +1,92 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include "catalog.h"
|
||||
#include "command.h"
|
||||
#include "query.h"
|
||||
#include "schedulerInt.h"
|
||||
#include "tmsg.h"
|
||||
#include "tref.h"
|
||||
#include "trpc.h"
|
||||
|
||||
void schCloseJobRef(void) {
|
||||
if (!atomic_load_8((int8_t *)&schMgmt.exit)) {
|
||||
return;
|
||||
}
|
||||
|
||||
SCH_LOCK(SCH_WRITE, &schMgmt.lock);
|
||||
if (atomic_load_32(&schMgmt.jobNum) <= 0 && schMgmt.jobRef >= 0) {
|
||||
taosCloseRef(schMgmt.jobRef);
|
||||
schMgmt.jobRef = -1;
|
||||
}
|
||||
SCH_UNLOCK(SCH_WRITE, &schMgmt.lock);
|
||||
}
|
||||
|
||||
uint64_t schGenTaskId(void) { return atomic_add_fetch_64(&schMgmt.taskId, 1); }
|
||||
|
||||
uint64_t schGenUUID(void) {
|
||||
static uint64_t hashId = 0;
|
||||
static int32_t requestSerialId = 0;
|
||||
|
||||
if (hashId == 0) {
|
||||
char uid[64];
|
||||
int32_t code = taosGetSystemUUID(uid, tListLen(uid));
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("Failed to get the system uid, reason:%s", tstrerror(TAOS_SYSTEM_ERROR(errno)));
|
||||
} else {
|
||||
hashId = MurmurHash3_32(uid, strlen(uid));
|
||||
}
|
||||
}
|
||||
|
||||
int64_t ts = taosGetTimestampMs();
|
||||
uint64_t pid = taosGetPId();
|
||||
int32_t val = atomic_add_fetch_32(&requestSerialId, 1);
|
||||
|
||||
uint64_t id = ((hashId & 0x0FFF) << 52) | ((pid & 0x0FFF) << 40) | ((ts & 0xFFFFFF) << 16) | (val & 0xFFFF);
|
||||
return id;
|
||||
}
|
||||
|
||||
|
||||
void schFreeRpcCtxVal(const void *arg) {
|
||||
if (NULL == arg) {
|
||||
return;
|
||||
}
|
||||
|
||||
SMsgSendInfo *pMsgSendInfo = (SMsgSendInfo *)arg;
|
||||
taosMemoryFreeClear(pMsgSendInfo->param);
|
||||
taosMemoryFreeClear(pMsgSendInfo);
|
||||
}
|
||||
|
||||
void schFreeRpcCtx(SRpcCtx *pCtx) {
|
||||
if (NULL == pCtx) {
|
||||
return;
|
||||
}
|
||||
void *pIter = taosHashIterate(pCtx->args, NULL);
|
||||
while (pIter) {
|
||||
SRpcCtxVal *ctxVal = (SRpcCtxVal *)pIter;
|
||||
|
||||
(*ctxVal->freeFunc)(ctxVal->val);
|
||||
|
||||
pIter = taosHashIterate(pCtx->args, pIter);
|
||||
}
|
||||
|
||||
taosHashCleanup(pCtx->args);
|
||||
|
||||
if (pCtx->brokenVal.freeFunc) {
|
||||
(*pCtx->brokenVal.freeFunc)(pCtx->brokenVal.val);
|
||||
}
|
||||
}
|
||||
|
||||
|
File diff suppressed because it is too large
Load Diff
|
@ -36,8 +36,8 @@ typedef struct SSyncIO {
|
|||
STaosQueue *pMsgQ;
|
||||
STaosQset * pQset;
|
||||
TdThread consumerTid;
|
||||
void *serverRpc;
|
||||
void *clientRpc;
|
||||
void * serverRpc;
|
||||
void * clientRpc;
|
||||
SEpSet myAddr;
|
||||
SMsgCb msgcb;
|
||||
|
||||
|
|
|
@ -147,6 +147,11 @@ typedef struct SSyncNode {
|
|||
// tools
|
||||
SSyncRespMgr* pSyncRespMgr;
|
||||
|
||||
// restore state
|
||||
bool restoreFinish;
|
||||
//sem_t restoreSem;
|
||||
SSnapshot* pSnapshot;
|
||||
|
||||
} SSyncNode;
|
||||
|
||||
// open/close --------------
|
||||
|
|
|
@ -324,7 +324,6 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
|
|||
SRpcMsg rpcMsg;
|
||||
syncEntry2OriginalRpc(pEntry, &rpcMsg);
|
||||
|
||||
// if (ths->pFsm->FpCommitCb != NULL && pEntry->originalRpcType != TDMT_VND_SYNC_NOOP) {
|
||||
if (ths->pFsm->FpCommitCb != NULL && syncUtilUserCommit(pEntry->originalRpcType)) {
|
||||
SFsmCbMeta cbMeta;
|
||||
cbMeta.index = pEntry->index;
|
||||
|
@ -332,7 +331,18 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
|
|||
cbMeta.code = 0;
|
||||
cbMeta.state = ths->state;
|
||||
cbMeta.seqNum = pEntry->seqNum;
|
||||
cbMeta.term = pEntry->term;
|
||||
cbMeta.currentTerm = ths->pRaftStore->currentTerm;
|
||||
ths->pFsm->FpCommitCb(ths->pFsm, &rpcMsg, cbMeta);
|
||||
|
||||
bool needExecute = true;
|
||||
if (ths->pSnapshot != NULL && cbMeta.index <= ths->pSnapshot->lastApplyIndex) {
|
||||
needExecute = false;
|
||||
}
|
||||
|
||||
if (needExecute) {
|
||||
ths->pFsm->FpCommitCb(ths->pFsm, &rpcMsg, cbMeta);
|
||||
}
|
||||
}
|
||||
|
||||
// config change
|
||||
|
@ -349,6 +359,22 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
|
|||
}
|
||||
}
|
||||
|
||||
// restore finish
|
||||
if (pEntry->index == ths->pLogStore->getLastIndex(ths->pLogStore)) {
|
||||
if (ths->restoreFinish == false) {
|
||||
if (ths->pFsm->FpRestoreFinish != NULL) {
|
||||
ths->pFsm->FpRestoreFinish(ths->pFsm);
|
||||
}
|
||||
ths->restoreFinish = true;
|
||||
sInfo("==syncNodeOnAppendEntriesCb== restoreFinish set true %p vgId:%d", ths, ths->vgId);
|
||||
|
||||
/*
|
||||
tsem_post(&ths->restoreSem);
|
||||
sInfo("==syncNodeOnAppendEntriesCb== RestoreFinish tsem_post %p", ths);
|
||||
*/
|
||||
}
|
||||
}
|
||||
|
||||
rpcFreeCont(rpcMsg.pCont);
|
||||
syncEntryDestory(pEntry);
|
||||
}
|
||||
|
|
|
@ -102,7 +102,6 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
|
|||
SRpcMsg rpcMsg;
|
||||
syncEntry2OriginalRpc(pEntry, &rpcMsg);
|
||||
|
||||
// if (pSyncNode->pFsm->FpCommitCb != NULL && pEntry->originalRpcType != TDMT_VND_SYNC_NOOP) {
|
||||
if (pSyncNode->pFsm->FpCommitCb != NULL && syncUtilUserCommit(pEntry->originalRpcType)) {
|
||||
SFsmCbMeta cbMeta;
|
||||
cbMeta.index = pEntry->index;
|
||||
|
@ -110,7 +109,17 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
|
|||
cbMeta.code = 0;
|
||||
cbMeta.state = pSyncNode->state;
|
||||
cbMeta.seqNum = pEntry->seqNum;
|
||||
pSyncNode->pFsm->FpCommitCb(pSyncNode->pFsm, &rpcMsg, cbMeta);
|
||||
cbMeta.term = pEntry->term;
|
||||
cbMeta.currentTerm = pSyncNode->pRaftStore->currentTerm;
|
||||
|
||||
bool needExecute = true;
|
||||
if (pSyncNode->pSnapshot != NULL && cbMeta.index <= pSyncNode->pSnapshot->lastApplyIndex) {
|
||||
needExecute = false;
|
||||
}
|
||||
|
||||
if (needExecute) {
|
||||
pSyncNode->pFsm->FpCommitCb(pSyncNode->pFsm, &rpcMsg, cbMeta);
|
||||
}
|
||||
}
|
||||
|
||||
// config change
|
||||
|
@ -127,6 +136,22 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
|
|||
}
|
||||
}
|
||||
|
||||
// restore finish
|
||||
if (pEntry->index == pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore)) {
|
||||
if (pSyncNode->restoreFinish == false) {
|
||||
if (pSyncNode->pFsm->FpRestoreFinish != NULL) {
|
||||
pSyncNode->pFsm->FpRestoreFinish(pSyncNode->pFsm);
|
||||
}
|
||||
pSyncNode->restoreFinish = true;
|
||||
sInfo("==syncMaybeAdvanceCommitIndex== restoreFinish set true %p vgId:%d", pSyncNode, pSyncNode->vgId);
|
||||
|
||||
/*
|
||||
tsem_post(&pSyncNode->restoreSem);
|
||||
sInfo("==syncMaybeAdvanceCommitIndex== RestoreFinish tsem_post %p", pSyncNode);
|
||||
*/
|
||||
}
|
||||
}
|
||||
|
||||
rpcFreeCont(rpcMsg.pCont);
|
||||
syncEntryDestory(pEntry);
|
||||
}
|
||||
|
@ -162,4 +187,4 @@ bool syncAgree(SSyncNode* pSyncNode, SyncIndex index) {
|
|||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -13,7 +13,6 @@
|
|||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include <stdint.h>
|
||||
#include "sync.h"
|
||||
#include "syncAppendEntries.h"
|
||||
#include "syncAppendEntriesReply.h"
|
||||
|
@ -55,14 +54,17 @@ static void syncFreeNode(void* param);
|
|||
// ---------------------------------
|
||||
|
||||
int32_t syncInit() {
|
||||
int32_t ret;
|
||||
tsNodeRefId = taosOpenRef(200, syncFreeNode);
|
||||
if (tsNodeRefId < 0) {
|
||||
sError("failed to init node ref");
|
||||
syncCleanUp();
|
||||
ret = -1;
|
||||
} else {
|
||||
ret = syncEnvStart();
|
||||
int32_t ret = 0;
|
||||
|
||||
if (!syncEnvIsStart()) {
|
||||
tsNodeRefId = taosOpenRef(200, syncFreeNode);
|
||||
if (tsNodeRefId < 0) {
|
||||
sError("failed to init node ref");
|
||||
syncCleanUp();
|
||||
ret = -1;
|
||||
} else {
|
||||
ret = syncEnvStart();
|
||||
}
|
||||
}
|
||||
|
||||
return ret;
|
||||
|
@ -240,7 +242,7 @@ int32_t syncGetAndDelRespRpc(int64_t rid, uint64_t index, SRpcMsg* msg) {
|
|||
return ret;
|
||||
}
|
||||
|
||||
void syncSetMsgCb(int64_t rid, const SMsgCb *msgcb) {
|
||||
void syncSetMsgCb(int64_t rid, const SMsgCb* msgcb) {
|
||||
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
|
||||
if (pSyncNode == NULL) {
|
||||
sTrace("syncSetQ get pSyncNode is NULL, rid:%ld", rid);
|
||||
|
@ -490,6 +492,15 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) {
|
|||
pSyncNode->pSyncRespMgr = syncRespMgrCreate(NULL, 0);
|
||||
assert(pSyncNode->pSyncRespMgr != NULL);
|
||||
|
||||
// restore state
|
||||
pSyncNode->restoreFinish = false;
|
||||
pSyncNode->pSnapshot = NULL;
|
||||
if (pSyncNode->pFsm->FpGetSnapshot != NULL) {
|
||||
pSyncNode->pSnapshot = taosMemoryMalloc(sizeof(SSnapshot));
|
||||
pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, pSyncNode->pSnapshot);
|
||||
}
|
||||
//tsem_init(&(pSyncNode->restoreSem), 0, 0);
|
||||
|
||||
// start in syncNodeStart
|
||||
// start raft
|
||||
// syncNodeBecomeFollower(pSyncNode);
|
||||
|
@ -509,6 +520,20 @@ void syncNodeStart(SSyncNode* pSyncNode) {
|
|||
// use this now
|
||||
syncNodeAppendNoop(pSyncNode);
|
||||
syncMaybeAdvanceCommitIndex(pSyncNode); // maybe only one replica
|
||||
|
||||
/*
|
||||
sInfo("==syncNodeStart== RestoreFinish begin 1 replica tsem_wait %p", pSyncNode);
|
||||
tsem_wait(&pSyncNode->restoreSem);
|
||||
sInfo("==syncNodeStart== RestoreFinish end 1 replica tsem_wait %p", pSyncNode);
|
||||
*/
|
||||
|
||||
/*
|
||||
while (pSyncNode->restoreFinish != true) {
|
||||
taosMsleep(10);
|
||||
}
|
||||
*/
|
||||
|
||||
sInfo("==syncNodeStart== restoreFinish ok 1 replica %p vgId:%d", pSyncNode, pSyncNode->vgId);
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -518,6 +543,19 @@ void syncNodeStart(SSyncNode* pSyncNode) {
|
|||
int32_t ret = 0;
|
||||
// ret = syncNodeStartPingTimer(pSyncNode);
|
||||
assert(ret == 0);
|
||||
|
||||
/*
|
||||
sInfo("==syncNodeStart== RestoreFinish begin multi replica tsem_wait %p", pSyncNode);
|
||||
tsem_wait(&pSyncNode->restoreSem);
|
||||
sInfo("==syncNodeStart== RestoreFinish end multi replica tsem_wait %p", pSyncNode);
|
||||
*/
|
||||
|
||||
/*
|
||||
while (pSyncNode->restoreFinish != true) {
|
||||
taosMsleep(10);
|
||||
}
|
||||
*/
|
||||
sInfo("==syncNodeStart== restoreFinish ok multi replica %p vgId:%d", pSyncNode, pSyncNode->vgId);
|
||||
}
|
||||
|
||||
void syncNodeStartStandBy(SSyncNode* pSyncNode) {
|
||||
|
@ -554,6 +592,12 @@ void syncNodeClose(SSyncNode* pSyncNode) {
|
|||
taosMemoryFree(pSyncNode->pFsm);
|
||||
}
|
||||
|
||||
if (pSyncNode->pSnapshot != NULL) {
|
||||
taosMemoryFree(pSyncNode->pSnapshot);
|
||||
}
|
||||
|
||||
//tsem_destroy(&pSyncNode->restoreSem);
|
||||
|
||||
// free memory in syncFreeNode
|
||||
// taosMemoryFree(pSyncNode);
|
||||
}
|
||||
|
|
|
@ -73,12 +73,17 @@ int32_t GetSnapshotCb(struct SSyncFSM* pFsm, SSnapshot* pSnapshot) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
void FpRestoreFinishCb(struct SSyncFSM* pFsm) {
|
||||
sTrace("==callback== ==FpRestoreFinishCb==");
|
||||
}
|
||||
|
||||
SSyncFSM* createFsm() {
|
||||
SSyncFSM* pFsm = (SSyncFSM*)taosMemoryMalloc(sizeof(SSyncFSM));
|
||||
pFsm->FpCommitCb = CommitCb;
|
||||
pFsm->FpPreCommitCb = PreCommitCb;
|
||||
pFsm->FpRollBackCb = RollBackCb;
|
||||
pFsm->FpGetSnapshot = GetSnapshotCb;
|
||||
pFsm->FpRestoreFinish = FpRestoreFinishCb;
|
||||
return pFsm;
|
||||
}
|
||||
|
||||
|
|
|
@ -160,6 +160,8 @@ SyncClientRequest *step1(const SRpcMsg *pMsg) {
|
|||
}
|
||||
|
||||
int main(int argc, char **argv) {
|
||||
sprintf(tsTempDir, "%s", ".");
|
||||
|
||||
// taosInitLog((char *)"syncTest.log", 100000, 10);
|
||||
tsAsyncLog = 0;
|
||||
sDebugFlag = 143 + 64;
|
||||
|
|
|
@ -94,7 +94,7 @@ typedef void* queue[2];
|
|||
/* Return the structure holding the given element. */
|
||||
#define QUEUE_DATA(e, type, field) ((type*)((void*)((char*)(e)-offsetof(type, field))))
|
||||
|
||||
#define TRANS_RETRY_COUNT_LIMIT 20 // retry count limit
|
||||
#define TRANS_RETRY_COUNT_LIMIT 100 // retry count limit
|
||||
#define TRANS_RETRY_INTERVAL 15 // ms retry interval
|
||||
#define TRANS_CONN_TIMEOUT 3 // connect timeout
|
||||
|
||||
|
|
|
@ -17,7 +17,7 @@
|
|||
|
||||
#include "transComm.h"
|
||||
|
||||
void* (*taosInitHandle[])(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) = {
|
||||
void* (*taosInitHandle[])(uint32_t ip, uint32_t port, char* label, int32_t numOfThreads, void* fp, void* shandle) = {
|
||||
transInitServer, transInitClient};
|
||||
|
||||
void (*taosCloseHandle[])(void* arg) = {transCloseServer, transCloseClient};
|
||||
|
@ -77,37 +77,38 @@ void rpcClose(void* arg) {
|
|||
taosMemoryFree(pRpc);
|
||||
return;
|
||||
}
|
||||
void* rpcMallocCont(int contLen) {
|
||||
int size = contLen + TRANS_MSG_OVERHEAD;
|
||||
|
||||
char* start = (char*)taosMemoryCalloc(1, (size_t)size);
|
||||
void* rpcMallocCont(int32_t contLen) {
|
||||
int32_t size = contLen + TRANS_MSG_OVERHEAD;
|
||||
char* start = taosMemoryCalloc(1, size);
|
||||
if (start == NULL) {
|
||||
tError("failed to malloc msg, size:%d", size);
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
} else {
|
||||
tTrace("malloc mem:%p size:%d", start, size);
|
||||
}
|
||||
|
||||
return start + sizeof(STransMsgHead);
|
||||
}
|
||||
void rpcFreeCont(void* cont) {
|
||||
// impl
|
||||
if (cont == NULL) {
|
||||
return;
|
||||
}
|
||||
|
||||
void rpcFreeCont(void* cont) {
|
||||
if (cont == NULL) return;
|
||||
taosMemoryFree((char*)cont - TRANS_MSG_OVERHEAD);
|
||||
tTrace("free mem: %p", (char*)cont - TRANS_MSG_OVERHEAD);
|
||||
}
|
||||
void* rpcReallocCont(void* ptr, int contLen) {
|
||||
if (ptr == NULL) {
|
||||
return rpcMallocCont(contLen);
|
||||
}
|
||||
char* st = (char*)ptr - TRANS_MSG_OVERHEAD;
|
||||
int sz = contLen + TRANS_MSG_OVERHEAD;
|
||||
|
||||
void* rpcReallocCont(void* ptr, int32_t contLen) {
|
||||
if (ptr == NULL) return rpcMallocCont(contLen);
|
||||
|
||||
char* st = (char*)ptr - TRANS_MSG_OVERHEAD;
|
||||
int32_t sz = contLen + TRANS_MSG_OVERHEAD;
|
||||
st = taosMemoryRealloc(st, sz);
|
||||
if (st == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
return st + TRANS_MSG_OVERHEAD;
|
||||
}
|
||||
|
||||
|
@ -116,8 +117,8 @@ void rpcSendRedirectRsp(void* thandle, const SEpSet* pEpSet) {
|
|||
assert(0);
|
||||
}
|
||||
|
||||
int rpcReportProgress(void* pConn, char* pCont, int contLen) { return -1; }
|
||||
void rpcCancelRequest(int64_t rid) { return; }
|
||||
int32_t rpcReportProgress(void* pConn, char* pCont, int32_t contLen) { return -1; }
|
||||
void rpcCancelRequest(int64_t rid) { return; }
|
||||
|
||||
void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid) {
|
||||
transSendRequest(shandle, pEpSet, pMsg, NULL);
|
||||
|
@ -129,8 +130,8 @@ void rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pMsg, SRpcMsg* pRsp) {
|
|||
transSendRecv(shandle, pEpSet, pMsg, pRsp);
|
||||
}
|
||||
|
||||
void rpcSendResponse(const SRpcMsg* pMsg) { transSendResponse(pMsg); }
|
||||
int rpcGetConnInfo(void* thandle, SRpcConnInfo* pInfo) { return transGetConnInfo((void*)thandle, pInfo); }
|
||||
void rpcSendResponse(const SRpcMsg* pMsg) { transSendResponse(pMsg); }
|
||||
int32_t rpcGetConnInfo(void* thandle, SRpcConnInfo* pInfo) { return transGetConnInfo((void*)thandle, pInfo); }
|
||||
|
||||
void rpcRefHandle(void* handle, int8_t type) {
|
||||
assert(type == TAOS_CONN_SERVER || type == TAOS_CONN_CLIENT);
|
||||
|
|
|
@ -14,17 +14,17 @@
|
|||
*/
|
||||
|
||||
#define _DEFAULT_SOURCE
|
||||
#include "tcompare.h"
|
||||
#include "os.h"
|
||||
#include "taoserror.h"
|
||||
#include "tcompare.h"
|
||||
#include "tref.h"
|
||||
#include "walInt.h"
|
||||
|
||||
typedef struct {
|
||||
int8_t stop;
|
||||
int8_t inited;
|
||||
uint32_t seq;
|
||||
int32_t refSetId;
|
||||
int8_t stop;
|
||||
int8_t inited;
|
||||
uint32_t seq;
|
||||
int32_t refSetId;
|
||||
TdThread thread;
|
||||
} SWalMgmt;
|
||||
|
||||
|
@ -36,30 +36,42 @@ static void walFreeObj(void *pWal);
|
|||
int64_t walGetSeq() { return (int64_t)atomic_load_32(&tsWal.seq); }
|
||||
|
||||
int32_t walInit() {
|
||||
int8_t old = atomic_val_compare_exchange_8(&tsWal.inited, 0, 1);
|
||||
if (old == 1) return 0;
|
||||
|
||||
tsWal.refSetId = taosOpenRef(TSDB_MIN_VNODES, walFreeObj);
|
||||
|
||||
int32_t code = walCreateThread();
|
||||
if (code != 0) {
|
||||
wError("failed to init wal module since %s", tstrerror(code));
|
||||
atomic_store_8(&tsWal.inited, 0);
|
||||
return code;
|
||||
int8_t old;
|
||||
while (1) {
|
||||
old = atomic_val_compare_exchange_8(&tsWal.inited, 0, 2);
|
||||
if (old != 2) break;
|
||||
}
|
||||
|
||||
if (old == 0) {
|
||||
tsWal.refSetId = taosOpenRef(TSDB_MIN_VNODES, walFreeObj);
|
||||
|
||||
int32_t code = walCreateThread();
|
||||
if (code != 0) {
|
||||
wError("failed to init wal module since %s", tstrerror(code));
|
||||
atomic_store_8(&tsWal.inited, 0);
|
||||
return code;
|
||||
}
|
||||
|
||||
wInfo("wal module is initialized, rsetId:%d", tsWal.refSetId);
|
||||
atomic_store_8(&tsWal.inited, 1);
|
||||
}
|
||||
|
||||
wInfo("wal module is initialized, rsetId:%d", tsWal.refSetId);
|
||||
return 0;
|
||||
}
|
||||
|
||||
void walCleanUp() {
|
||||
int8_t old = atomic_val_compare_exchange_8(&tsWal.inited, 1, 0);
|
||||
if (old == 0) {
|
||||
return;
|
||||
int8_t old;
|
||||
while (1) {
|
||||
old = atomic_val_compare_exchange_8(&tsWal.inited, 1, 2);
|
||||
if (old != 2) break;
|
||||
}
|
||||
|
||||
if (old == 1) {
|
||||
walStopThread();
|
||||
taosCloseRef(tsWal.refSetId);
|
||||
wInfo("wal module is cleaned up");
|
||||
atomic_store_8(&tsWal.inited, 0);
|
||||
}
|
||||
walStopThread();
|
||||
taosCloseRef(tsWal.refSetId);
|
||||
wInfo("wal module is cleaned up");
|
||||
}
|
||||
|
||||
SWal *walOpen(const char *path, SWalCfg *pCfg) {
|
||||
|
@ -126,7 +138,6 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) {
|
|||
}
|
||||
|
||||
if (walCheckAndRepairIdx(pWal) < 0) {
|
||||
|
||||
}
|
||||
|
||||
wDebug("vgId:%d, wal:%p is opened, level:%d fsyncPeriod:%d", pWal->cfg.vgId, pWal, pWal->cfg.level,
|
||||
|
|
|
@ -259,6 +259,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_NOT_EXIST, "Transaction not exist
|
|||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_INVALID_STAGE, "Invalid stage to kill")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_CONFLICT, "Conflict transaction not completed")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_UNKNOW_ERROR, "Unknown transaction error")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_CLOG_IS_NULL, "Transaction commitlog is null")
|
||||
|
||||
// mnode-mq
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOPIC_ALREADY_EXIST, "Topic already exists")
|
||||
|
|
|
@ -262,7 +262,7 @@ void dumpCluster(SSdb *pSdb, SJson *json) {
|
|||
}
|
||||
|
||||
void dumpTrans(SSdb *pSdb, SJson *json) {
|
||||
void *pIter = NULL;
|
||||
void *pIter = NULL;
|
||||
SJson *items = tjsonCreateObject();
|
||||
tjsonAddItemToObject(json, "transactions", items);
|
||||
|
||||
|
@ -294,6 +294,7 @@ void dumpTrans(SSdb *pSdb, SJson *json) {
|
|||
void dumpHeader(SSdb *pSdb, SJson *json) {
|
||||
tjsonAddIntegerToObject(json, "sver", 1);
|
||||
tjsonAddStringToObject(json, "curVer", i642str(pSdb->curVer));
|
||||
tjsonAddStringToObject(json, "curTerm", i642str(pSdb->curTerm));
|
||||
|
||||
SJson *maxIdsJson = tjsonCreateObject();
|
||||
tjsonAddItemToObject(json, "maxIds", maxIdsJson);
|
||||
|
|
Loading…
Reference in New Issue