Merge remote-tracking branch 'origin/3.0' into feat/TD-27337
This commit is contained in:
commit
7171d38415
|
@ -109,6 +109,7 @@ extern bool tsMonitorComp;
|
||||||
// audit
|
// audit
|
||||||
extern bool tsEnableAudit;
|
extern bool tsEnableAudit;
|
||||||
extern bool tsEnableAuditCreateTable;
|
extern bool tsEnableAuditCreateTable;
|
||||||
|
extern int32_t tsAuditInterval;
|
||||||
|
|
||||||
// telem
|
// telem
|
||||||
extern bool tsEnableTelem;
|
extern bool tsEnableTelem;
|
||||||
|
|
|
@ -23,13 +23,13 @@
|
||||||
#include "tjson.h"
|
#include "tjson.h"
|
||||||
#include "tmsgcb.h"
|
#include "tmsgcb.h"
|
||||||
#include "trpc.h"
|
#include "trpc.h"
|
||||||
#include "mnode.h"
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
extern "C" {
|
extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
#define AUDIT_DETAIL_MAX 65472
|
#define AUDIT_DETAIL_MAX 65472
|
||||||
|
#define AUDIT_OPERATION_LEN 20
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
const char *server;
|
const char *server;
|
||||||
|
@ -37,13 +37,28 @@ typedef struct {
|
||||||
bool comp;
|
bool comp;
|
||||||
} SAuditCfg;
|
} SAuditCfg;
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
int64_t curTime;
|
||||||
|
char strClusterId[TSDB_CLUSTER_ID_LEN];
|
||||||
|
char clientAddress[50];
|
||||||
|
char user[TSDB_USER_LEN];
|
||||||
|
char operation[AUDIT_OPERATION_LEN];
|
||||||
|
char target1[TSDB_DB_NAME_LEN]; //put db name
|
||||||
|
char target2[TSDB_STREAM_NAME_LEN]; //put stb name, table name, topic name, user name, stream name, use max
|
||||||
|
char* detail;
|
||||||
|
} SAuditRecord;
|
||||||
|
|
||||||
int32_t auditInit(const SAuditCfg *pCfg);
|
int32_t auditInit(const SAuditCfg *pCfg);
|
||||||
|
void auditCleanup();
|
||||||
void auditSend(SJson *pJson);
|
void auditSend(SJson *pJson);
|
||||||
void auditRecord(SRpcMsg *pReq, int64_t clusterId, char *operation, char *target1, char *target2,
|
void auditRecord(SRpcMsg *pReq, int64_t clusterId, char *operation, char *target1, char *target2,
|
||||||
char *detail, int32_t len);
|
char *detail, int32_t len);
|
||||||
|
void auditAddRecord(SRpcMsg *pReq, int64_t clusterId, char *operation, char *target1, char *target2,
|
||||||
|
char *detail, int32_t len);
|
||||||
|
void auditSendRecordsInBatch();
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
#endif /*_TD_MONITOR_H_*/
|
#endif /*_TD_AUDIT_H_*/
|
||||||
|
|
|
@ -33,11 +33,12 @@ extern "C" {
|
||||||
#define SYNC_MAX_PROGRESS_WAIT_MS 4000
|
#define SYNC_MAX_PROGRESS_WAIT_MS 4000
|
||||||
#define SYNC_MAX_START_TIME_RANGE_MS (1000 * 20)
|
#define SYNC_MAX_START_TIME_RANGE_MS (1000 * 20)
|
||||||
#define SYNC_MAX_RECV_TIME_RANGE_MS 1200
|
#define SYNC_MAX_RECV_TIME_RANGE_MS 1200
|
||||||
#define SYNC_DEL_WAL_MS (1000 * 60)
|
|
||||||
#define SYNC_ADD_QUORUM_COUNT 3
|
#define SYNC_ADD_QUORUM_COUNT 3
|
||||||
#define SYNC_VNODE_LOG_RETENTION (TSDB_SYNC_LOG_BUFFER_RETENTION + 1)
|
#define SYNC_VNODE_LOG_RETENTION (TSDB_SYNC_LOG_BUFFER_RETENTION + 1)
|
||||||
#define SNAPSHOT_WAIT_MS 1000 * 5
|
#define SNAPSHOT_WAIT_MS 1000 * 5
|
||||||
|
|
||||||
|
#define SYNC_WAL_LOG_RETENTION_SIZE (8LL * 1024 * 1024 * 1024)
|
||||||
|
|
||||||
#define SYNC_MAX_RETRY_BACKOFF 5
|
#define SYNC_MAX_RETRY_BACKOFF 5
|
||||||
#define SYNC_LOG_REPL_RETRY_WAIT_MS 100
|
#define SYNC_LOG_REPL_RETRY_WAIT_MS 100
|
||||||
#define SYNC_APPEND_ENTRIES_TIMEOUT_MS 10000
|
#define SYNC_APPEND_ENTRIES_TIMEOUT_MS 10000
|
||||||
|
@ -219,6 +220,7 @@ typedef struct SSyncLogStore {
|
||||||
|
|
||||||
SyncIndex (*syncLogWriteIndex)(struct SSyncLogStore* pLogStore);
|
SyncIndex (*syncLogWriteIndex)(struct SSyncLogStore* pLogStore);
|
||||||
SyncIndex (*syncLogLastIndex)(struct SSyncLogStore* pLogStore);
|
SyncIndex (*syncLogLastIndex)(struct SSyncLogStore* pLogStore);
|
||||||
|
SyncIndex (*syncLogIndexRetention)(struct SSyncLogStore* pLogStore, int64_t bytes);
|
||||||
SyncTerm (*syncLogLastTerm)(struct SSyncLogStore* pLogStore);
|
SyncTerm (*syncLogLastTerm)(struct SSyncLogStore* pLogStore);
|
||||||
|
|
||||||
int32_t (*syncLogAppendEntry)(struct SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry, bool forcSync);
|
int32_t (*syncLogAppendEntry)(struct SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry, bool forcSync);
|
||||||
|
|
|
@ -225,6 +225,7 @@ bool walIsEmpty(SWal *);
|
||||||
int64_t walGetFirstVer(SWal *);
|
int64_t walGetFirstVer(SWal *);
|
||||||
int64_t walGetSnapshotVer(SWal *);
|
int64_t walGetSnapshotVer(SWal *);
|
||||||
int64_t walGetLastVer(SWal *);
|
int64_t walGetLastVer(SWal *);
|
||||||
|
int64_t walGetVerRetention(SWal *pWal, int64_t bytes);
|
||||||
int64_t walGetCommittedVer(SWal *);
|
int64_t walGetCommittedVer(SWal *);
|
||||||
int64_t walGetAppliedVer(SWal *);
|
int64_t walGetAppliedVer(SWal *);
|
||||||
|
|
||||||
|
|
|
@ -671,6 +671,13 @@ upload:
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (i > 0 && cp.parts[i - 1].completed) {
|
||||||
|
if (taosLSeekFile(data->infileFD, cp.parts[i].offset, SEEK_SET) < 0) {
|
||||||
|
code = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
goto clean;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
int seq = cp.parts[i].index + 1;
|
int seq = cp.parts[i].index + 1;
|
||||||
|
|
||||||
partData.manager = &manager;
|
partData.manager = &manager;
|
||||||
|
|
|
@ -97,6 +97,7 @@ bool tsMonitorComp = false;
|
||||||
// audit
|
// audit
|
||||||
bool tsEnableAudit = true;
|
bool tsEnableAudit = true;
|
||||||
bool tsEnableAuditCreateTable = true;
|
bool tsEnableAuditCreateTable = true;
|
||||||
|
int32_t tsAuditInterval = 500;
|
||||||
|
|
||||||
// telem
|
// telem
|
||||||
#ifdef TD_ENTERPRISE
|
#ifdef TD_ENTERPRISE
|
||||||
|
@ -686,6 +687,8 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
|
||||||
|
|
||||||
if (cfgAddBool(pCfg, "audit", tsEnableAudit, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) return -1;
|
if (cfgAddBool(pCfg, "audit", tsEnableAudit, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) return -1;
|
||||||
if (cfgAddBool(pCfg, "auditCreateTable", tsEnableAuditCreateTable, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
|
if (cfgAddBool(pCfg, "auditCreateTable", tsEnableAuditCreateTable, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
|
||||||
|
if (cfgAddInt32(pCfg, "auditInterval", tsAuditInterval, 500, 200000, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0)
|
||||||
|
return -1;
|
||||||
|
|
||||||
if (cfgAddBool(pCfg, "crashReporting", tsEnableCrashReport, CFG_SCOPE_BOTH, CFG_DYN_NONE) != 0) return -1;
|
if (cfgAddBool(pCfg, "crashReporting", tsEnableCrashReport, CFG_SCOPE_BOTH, CFG_DYN_NONE) != 0) return -1;
|
||||||
if (cfgAddBool(pCfg, "telemetryReporting", tsEnableTelem, CFG_SCOPE_BOTH, CFG_DYN_ENT_SERVER) != 0) return -1;
|
if (cfgAddBool(pCfg, "telemetryReporting", tsEnableTelem, CFG_SCOPE_BOTH, CFG_DYN_ENT_SERVER) != 0) return -1;
|
||||||
|
@ -1137,6 +1140,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
|
||||||
|
|
||||||
tsEnableAudit = cfgGetItem(pCfg, "audit")->bval;
|
tsEnableAudit = cfgGetItem(pCfg, "audit")->bval;
|
||||||
tsEnableAuditCreateTable = cfgGetItem(pCfg, "auditCreateTable")->bval;
|
tsEnableAuditCreateTable = cfgGetItem(pCfg, "auditCreateTable")->bval;
|
||||||
|
tsAuditInterval = cfgGetItem(pCfg, "auditInterval")->i32;
|
||||||
|
|
||||||
tsEnableTelem = cfgGetItem(pCfg, "telemetryReporting")->bval;
|
tsEnableTelem = cfgGetItem(pCfg, "telemetryReporting")->bval;
|
||||||
tsEnableCrashReport = cfgGetItem(pCfg, "crashReporting")->bval;
|
tsEnableCrashReport = cfgGetItem(pCfg, "crashReporting")->bval;
|
||||||
|
|
|
@ -30,12 +30,14 @@ typedef struct SDnodeMgmt {
|
||||||
TdThread statusThread;
|
TdThread statusThread;
|
||||||
TdThread notifyThread;
|
TdThread notifyThread;
|
||||||
TdThread monitorThread;
|
TdThread monitorThread;
|
||||||
|
TdThread auditThread;
|
||||||
TdThread crashReportThread;
|
TdThread crashReportThread;
|
||||||
SSingleWorker mgmtWorker;
|
SSingleWorker mgmtWorker;
|
||||||
ProcessCreateNodeFp processCreateNodeFp;
|
ProcessCreateNodeFp processCreateNodeFp;
|
||||||
ProcessAlterNodeTypeFp processAlterNodeTypeFp;
|
ProcessAlterNodeTypeFp processAlterNodeTypeFp;
|
||||||
ProcessDropNodeFp processDropNodeFp;
|
ProcessDropNodeFp processDropNodeFp;
|
||||||
SendMonitorReportFp sendMonitorReportFp;
|
SendMonitorReportFp sendMonitorReportFp;
|
||||||
|
SendAuditRecordsFp sendAuditRecordsFp;
|
||||||
GetVnodeLoadsFp getVnodeLoadsFp;
|
GetVnodeLoadsFp getVnodeLoadsFp;
|
||||||
GetVnodeLoadsFp getVnodeLoadsLiteFp;
|
GetVnodeLoadsFp getVnodeLoadsLiteFp;
|
||||||
GetMnodeLoadsFp getMnodeLoadsFp;
|
GetMnodeLoadsFp getMnodeLoadsFp;
|
||||||
|
@ -62,7 +64,9 @@ void dmStopStatusThread(SDnodeMgmt *pMgmt);
|
||||||
int32_t dmStartNotifyThread(SDnodeMgmt *pMgmt);
|
int32_t dmStartNotifyThread(SDnodeMgmt *pMgmt);
|
||||||
void dmStopNotifyThread(SDnodeMgmt *pMgmt);
|
void dmStopNotifyThread(SDnodeMgmt *pMgmt);
|
||||||
int32_t dmStartMonitorThread(SDnodeMgmt *pMgmt);
|
int32_t dmStartMonitorThread(SDnodeMgmt *pMgmt);
|
||||||
|
int32_t dmStartAuditThread(SDnodeMgmt *pMgmt);
|
||||||
void dmStopMonitorThread(SDnodeMgmt *pMgmt);
|
void dmStopMonitorThread(SDnodeMgmt *pMgmt);
|
||||||
|
void dmStopAuditThread(SDnodeMgmt *pMgmt);
|
||||||
int32_t dmStartCrashReportThread(SDnodeMgmt *pMgmt);
|
int32_t dmStartCrashReportThread(SDnodeMgmt *pMgmt);
|
||||||
void dmStopCrashReportThread(SDnodeMgmt *pMgmt);
|
void dmStopCrashReportThread(SDnodeMgmt *pMgmt);
|
||||||
int32_t dmStartWorker(SDnodeMgmt *pMgmt);
|
int32_t dmStartWorker(SDnodeMgmt *pMgmt);
|
||||||
|
|
|
@ -29,6 +29,9 @@ static int32_t dmStartMgmt(SDnodeMgmt *pMgmt) {
|
||||||
if (dmStartMonitorThread(pMgmt) != 0) {
|
if (dmStartMonitorThread(pMgmt) != 0) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
if (dmStartAuditThread(pMgmt) != 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
if (dmStartCrashReportThread(pMgmt) != 0) {
|
if (dmStartCrashReportThread(pMgmt) != 0) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -38,6 +41,7 @@ static int32_t dmStartMgmt(SDnodeMgmt *pMgmt) {
|
||||||
static void dmStopMgmt(SDnodeMgmt *pMgmt) {
|
static void dmStopMgmt(SDnodeMgmt *pMgmt) {
|
||||||
pMgmt->pData->stopped = true;
|
pMgmt->pData->stopped = true;
|
||||||
dmStopMonitorThread(pMgmt);
|
dmStopMonitorThread(pMgmt);
|
||||||
|
dmStopAuditThread(pMgmt);
|
||||||
dmStopStatusThread(pMgmt);
|
dmStopStatusThread(pMgmt);
|
||||||
#if defined(TD_ENTERPRISE)
|
#if defined(TD_ENTERPRISE)
|
||||||
dmStopNotifyThread(pMgmt);
|
dmStopNotifyThread(pMgmt);
|
||||||
|
@ -60,6 +64,7 @@ static int32_t dmOpenMgmt(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
|
||||||
pMgmt->processAlterNodeTypeFp = pInput->processAlterNodeTypeFp;
|
pMgmt->processAlterNodeTypeFp = pInput->processAlterNodeTypeFp;
|
||||||
pMgmt->processDropNodeFp = pInput->processDropNodeFp;
|
pMgmt->processDropNodeFp = pInput->processDropNodeFp;
|
||||||
pMgmt->sendMonitorReportFp = pInput->sendMonitorReportFp;
|
pMgmt->sendMonitorReportFp = pInput->sendMonitorReportFp;
|
||||||
|
pMgmt->sendAuditRecordsFp = pInput->sendAuditRecordFp;
|
||||||
pMgmt->getVnodeLoadsFp = pInput->getVnodeLoadsFp;
|
pMgmt->getVnodeLoadsFp = pInput->getVnodeLoadsFp;
|
||||||
pMgmt->getVnodeLoadsLiteFp = pInput->getVnodeLoadsLiteFp;
|
pMgmt->getVnodeLoadsLiteFp = pInput->getVnodeLoadsLiteFp;
|
||||||
pMgmt->getMnodeLoadsFp = pInput->getMnodeLoadsFp;
|
pMgmt->getMnodeLoadsFp = pInput->getMnodeLoadsFp;
|
||||||
|
|
|
@ -99,6 +99,27 @@ static void *dmMonitorThreadFp(void *param) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void *dmAuditThreadFp(void *param) {
|
||||||
|
SDnodeMgmt *pMgmt = param;
|
||||||
|
int64_t lastTime = taosGetTimestampMs();
|
||||||
|
setThreadName("dnode-audit");
|
||||||
|
|
||||||
|
while (1) {
|
||||||
|
taosMsleep(100);
|
||||||
|
if (pMgmt->pData->dropped || pMgmt->pData->stopped) break;
|
||||||
|
|
||||||
|
int64_t curTime = taosGetTimestampMs();
|
||||||
|
if (curTime < lastTime) lastTime = curTime;
|
||||||
|
float interval = curTime - lastTime;
|
||||||
|
if (interval >= tsAuditInterval) {
|
||||||
|
(*pMgmt->sendAuditRecordsFp)();
|
||||||
|
lastTime = curTime;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
static void *dmCrashReportThreadFp(void *param) {
|
static void *dmCrashReportThreadFp(void *param) {
|
||||||
SDnodeMgmt *pMgmt = param;
|
SDnodeMgmt *pMgmt = param;
|
||||||
int64_t lastTime = taosGetTimestampMs();
|
int64_t lastTime = taosGetTimestampMs();
|
||||||
|
@ -218,6 +239,20 @@ int32_t dmStartMonitorThread(SDnodeMgmt *pMgmt) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t dmStartAuditThread(SDnodeMgmt *pMgmt) {
|
||||||
|
TdThreadAttr thAttr;
|
||||||
|
taosThreadAttrInit(&thAttr);
|
||||||
|
taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
|
||||||
|
if (taosThreadCreate(&pMgmt->auditThread, &thAttr, dmAuditThreadFp, pMgmt) != 0) {
|
||||||
|
dError("failed to create audit thread since %s", strerror(errno));
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
taosThreadAttrDestroy(&thAttr);
|
||||||
|
tmsgReportStartup("dnode-audit", "initialized");
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
void dmStopMonitorThread(SDnodeMgmt *pMgmt) {
|
void dmStopMonitorThread(SDnodeMgmt *pMgmt) {
|
||||||
if (taosCheckPthreadValid(pMgmt->monitorThread)) {
|
if (taosCheckPthreadValid(pMgmt->monitorThread)) {
|
||||||
taosThreadJoin(pMgmt->monitorThread, NULL);
|
taosThreadJoin(pMgmt->monitorThread, NULL);
|
||||||
|
@ -225,6 +260,13 @@ void dmStopMonitorThread(SDnodeMgmt *pMgmt) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void dmStopAuditThread(SDnodeMgmt *pMgmt) {
|
||||||
|
if (taosCheckPthreadValid(pMgmt->auditThread)) {
|
||||||
|
taosThreadJoin(pMgmt->auditThread, NULL);
|
||||||
|
taosThreadClear(&pMgmt->auditThread);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
int32_t dmStartCrashReportThread(SDnodeMgmt *pMgmt) {
|
int32_t dmStartCrashReportThread(SDnodeMgmt *pMgmt) {
|
||||||
if (!tsEnableCrashReport) {
|
if (!tsEnableCrashReport) {
|
||||||
return 0;
|
return 0;
|
||||||
|
|
|
@ -124,6 +124,7 @@ int32_t dmProcessNodeMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg);
|
||||||
|
|
||||||
// dmMonitor.c
|
// dmMonitor.c
|
||||||
void dmSendMonitorReport();
|
void dmSendMonitorReport();
|
||||||
|
void dmSendAuditRecords();
|
||||||
void dmGetVnodeLoads(SMonVloadInfo *pInfo);
|
void dmGetVnodeLoads(SMonVloadInfo *pInfo);
|
||||||
void dmGetVnodeLoadsLite(SMonVloadInfo *pInfo);
|
void dmGetVnodeLoadsLite(SMonVloadInfo *pInfo);
|
||||||
void dmGetMnodeLoads(SMonMloadInfo *pInfo);
|
void dmGetMnodeLoads(SMonMloadInfo *pInfo);
|
||||||
|
|
|
@ -189,6 +189,7 @@ void dmCleanup() {
|
||||||
if (dmCheckRepeatCleanup(pDnode) != 0) return;
|
if (dmCheckRepeatCleanup(pDnode) != 0) return;
|
||||||
dmCleanupDnode(pDnode);
|
dmCleanupDnode(pDnode);
|
||||||
monCleanup();
|
monCleanup();
|
||||||
|
auditCleanup();
|
||||||
syncCleanUp();
|
syncCleanUp();
|
||||||
walCleanUp();
|
walCleanUp();
|
||||||
udfcClose();
|
udfcClose();
|
||||||
|
@ -396,6 +397,7 @@ SMgmtInputOpt dmBuildMgmtInputOpt(SMgmtWrapper *pWrapper) {
|
||||||
.processAlterNodeTypeFp = dmProcessAlterNodeTypeReq,
|
.processAlterNodeTypeFp = dmProcessAlterNodeTypeReq,
|
||||||
.processDropNodeFp = dmProcessDropNodeReq,
|
.processDropNodeFp = dmProcessDropNodeReq,
|
||||||
.sendMonitorReportFp = dmSendMonitorReport,
|
.sendMonitorReportFp = dmSendMonitorReport,
|
||||||
|
.sendAuditRecordFp = auditSendRecordsInBatch,
|
||||||
.getVnodeLoadsFp = dmGetVnodeLoads,
|
.getVnodeLoadsFp = dmGetVnodeLoads,
|
||||||
.getVnodeLoadsLiteFp = dmGetVnodeLoadsLite,
|
.getVnodeLoadsLiteFp = dmGetVnodeLoadsLite,
|
||||||
.getMnodeLoadsFp = dmGetMnodeLoads,
|
.getMnodeLoadsFp = dmGetMnodeLoads,
|
||||||
|
|
|
@ -16,6 +16,7 @@
|
||||||
#define _DEFAULT_SOURCE
|
#define _DEFAULT_SOURCE
|
||||||
#include "dmMgmt.h"
|
#include "dmMgmt.h"
|
||||||
#include "dmNodes.h"
|
#include "dmNodes.h"
|
||||||
|
#include "audit.h"
|
||||||
|
|
||||||
static void dmGetMonitorBasicInfo(SDnode *pDnode, SMonBasicInfo *pInfo) {
|
static void dmGetMonitorBasicInfo(SDnode *pDnode, SMonBasicInfo *pInfo) {
|
||||||
pInfo->protocol = 1;
|
pInfo->protocol = 1;
|
||||||
|
@ -108,6 +109,11 @@ void dmSendMonitorReport() {
|
||||||
monSendReport();
|
monSendReport();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//Todo: put this in seperate file in the future
|
||||||
|
void dmSendAuditRecords() {
|
||||||
|
auditSendRecordsInBatch();
|
||||||
|
}
|
||||||
|
|
||||||
void dmGetVnodeLoads(SMonVloadInfo *pInfo) {
|
void dmGetVnodeLoads(SMonVloadInfo *pInfo) {
|
||||||
SDnode *pDnode = dmInstance();
|
SDnode *pDnode = dmInstance();
|
||||||
SMgmtWrapper *pWrapper = &pDnode->wrappers[VNODE];
|
SMgmtWrapper *pWrapper = &pDnode->wrappers[VNODE];
|
||||||
|
|
|
@ -86,6 +86,7 @@ typedef enum {
|
||||||
typedef int32_t (*ProcessCreateNodeFp)(EDndNodeType ntype, SRpcMsg *pMsg);
|
typedef int32_t (*ProcessCreateNodeFp)(EDndNodeType ntype, SRpcMsg *pMsg);
|
||||||
typedef int32_t (*ProcessDropNodeFp)(EDndNodeType ntype, SRpcMsg *pMsg);
|
typedef int32_t (*ProcessDropNodeFp)(EDndNodeType ntype, SRpcMsg *pMsg);
|
||||||
typedef void (*SendMonitorReportFp)();
|
typedef void (*SendMonitorReportFp)();
|
||||||
|
typedef void (*SendAuditRecordsFp)();
|
||||||
typedef void (*GetVnodeLoadsFp)(SMonVloadInfo *pInfo);
|
typedef void (*GetVnodeLoadsFp)(SMonVloadInfo *pInfo);
|
||||||
typedef void (*GetMnodeLoadsFp)(SMonMloadInfo *pInfo);
|
typedef void (*GetMnodeLoadsFp)(SMonMloadInfo *pInfo);
|
||||||
typedef void (*GetQnodeLoadsFp)(SQnodeLoad *pInfo);
|
typedef void (*GetQnodeLoadsFp)(SQnodeLoad *pInfo);
|
||||||
|
@ -120,6 +121,7 @@ typedef struct {
|
||||||
ProcessAlterNodeTypeFp processAlterNodeTypeFp;
|
ProcessAlterNodeTypeFp processAlterNodeTypeFp;
|
||||||
ProcessDropNodeFp processDropNodeFp;
|
ProcessDropNodeFp processDropNodeFp;
|
||||||
SendMonitorReportFp sendMonitorReportFp;
|
SendMonitorReportFp sendMonitorReportFp;
|
||||||
|
SendAuditRecordsFp sendAuditRecordFp;
|
||||||
GetVnodeLoadsFp getVnodeLoadsFp;
|
GetVnodeLoadsFp getVnodeLoadsFp;
|
||||||
GetVnodeLoadsFp getVnodeLoadsLiteFp;
|
GetVnodeLoadsFp getVnodeLoadsLiteFp;
|
||||||
GetMnodeLoadsFp getMnodeLoadsFp;
|
GetMnodeLoadsFp getMnodeLoadsFp;
|
||||||
|
|
|
@ -705,7 +705,7 @@ static int32_t mndPersistTaskDropReq(SMnode *pMnode, STrans *pTrans, SStreamTask
|
||||||
|
|
||||||
STransAction action = {0};
|
STransAction action = {0};
|
||||||
SEpSet epset = {0};
|
SEpSet epset = {0};
|
||||||
if(pTask->info.nodeId == SNODE_HANDLE){
|
if (pTask->info.nodeId == SNODE_HANDLE) {
|
||||||
SSnodeObj *pObj = NULL;
|
SSnodeObj *pObj = NULL;
|
||||||
void *pIter = NULL;
|
void *pIter = NULL;
|
||||||
while (1) {
|
while (1) {
|
||||||
|
@ -717,10 +717,16 @@ static int32_t mndPersistTaskDropReq(SMnode *pMnode, STrans *pTrans, SStreamTask
|
||||||
addEpIntoEpSet(&epset, pObj->pDnode->fqdn, pObj->pDnode->port);
|
addEpIntoEpSet(&epset, pObj->pDnode->fqdn, pObj->pDnode->port);
|
||||||
sdbRelease(pMnode->pSdb, pObj);
|
sdbRelease(pMnode->pSdb, pObj);
|
||||||
}
|
}
|
||||||
}else{
|
} else {
|
||||||
SVgObj *pVgObj = mndAcquireVgroup(pMnode, pTask->info.nodeId);
|
SVgObj *pVgObj = mndAcquireVgroup(pMnode, pTask->info.nodeId);
|
||||||
|
if (pVgObj != NULL) {
|
||||||
epset = mndGetVgroupEpset(pMnode, pVgObj);
|
epset = mndGetVgroupEpset(pMnode, pVgObj);
|
||||||
mndReleaseVgroup(pMnode, pVgObj);
|
mndReleaseVgroup(pMnode, pVgObj);
|
||||||
|
} else {
|
||||||
|
mDebug("orphaned task:0x%x need to be dropped, nodeId:%d, no redo action", pTask->id.taskId, pTask->info.nodeId);
|
||||||
|
taosMemoryFree(pReq);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// The epset of nodeId of this task may have been expired now, let's use the newest epset from mnode.
|
// The epset of nodeId of this task may have been expired now, let's use the newest epset from mnode.
|
||||||
|
@ -1657,6 +1663,7 @@ static void setTaskAttrInResBlock(SStreamObj *pStream, SStreamTask *pTask, SSDat
|
||||||
|
|
||||||
STaskStatusEntry *pe = taosHashGet(execInfo.pTaskMap, &id, sizeof(id));
|
STaskStatusEntry *pe = taosHashGet(execInfo.pTaskMap, &id, sizeof(id));
|
||||||
if (pe == NULL) {
|
if (pe == NULL) {
|
||||||
|
mError("task:0x%" PRIx64 " not exists in vnode, no valid status/stage info", id.taskId);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -48,8 +48,7 @@ int32_t clearFinishedTrans(SMnode* pMnode) {
|
||||||
void* pKey = taosHashGetKey(pEntry, &keyLen);
|
void* pKey = taosHashGetKey(pEntry, &keyLen);
|
||||||
// key is the name of src/dst db name
|
// key is the name of src/dst db name
|
||||||
SKeyInfo info = {.pKey = pKey, .keyLen = keyLen};
|
SKeyInfo info = {.pKey = pKey, .keyLen = keyLen};
|
||||||
|
mDebug("transId:%d %s startTs:%" PRId64 " cleared since finished", pEntry->transId, pEntry->name,
|
||||||
mDebug("transId:%d %s startTs:%" PRId64 "cleared due to finished", pEntry->transId, pEntry->name,
|
|
||||||
pEntry->startTime);
|
pEntry->startTime);
|
||||||
taosArrayPush(pList, &info);
|
taosArrayPush(pList, &info);
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -115,7 +115,7 @@ static int32_t tsdbDataFileRAWWriterDoClose(SDataFileRAWWriter *writer) { return
|
||||||
static int32_t tsdbDataFileRAWWriterCloseCommit(SDataFileRAWWriter *writer, TFileOpArray *opArr) {
|
static int32_t tsdbDataFileRAWWriterCloseCommit(SDataFileRAWWriter *writer, TFileOpArray *opArr) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int32_t lino = 0;
|
int32_t lino = 0;
|
||||||
ASSERT(writer->ctx->offset == writer->file.size);
|
ASSERT(writer->ctx->offset <= writer->file.size);
|
||||||
ASSERT(writer->config->fid == writer->file.fid);
|
ASSERT(writer->config->fid == writer->file.fid);
|
||||||
|
|
||||||
STFileOp op = (STFileOp){
|
STFileOp op = (STFileOp){
|
||||||
|
|
|
@ -67,7 +67,7 @@ static SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond
|
||||||
static int32_t doBuildDataBlock(STsdbReader* pReader);
|
static int32_t doBuildDataBlock(STsdbReader* pReader);
|
||||||
static TSDBKEY getCurrentKeyInBuf(STableBlockScanInfo* pScanInfo, STsdbReader* pReader);
|
static TSDBKEY getCurrentKeyInBuf(STableBlockScanInfo* pScanInfo, STsdbReader* pReader);
|
||||||
static bool hasDataInFileBlock(const SBlockData* pBlockData, const SFileBlockDumpInfo* pDumpInfo);
|
static bool hasDataInFileBlock(const SBlockData* pBlockData, const SFileBlockDumpInfo* pDumpInfo);
|
||||||
static bool hasDataInSttBlock(SSttBlockReader* pSttBlockReader);
|
static bool hasDataInSttBlock(STableBlockScanInfo *pInfo);
|
||||||
static void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter);
|
static void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter);
|
||||||
static int32_t getInitialDelIndex(const SArray* pDelSkyline, int32_t order);
|
static int32_t getInitialDelIndex(const SArray* pDelSkyline, int32_t order);
|
||||||
static void resetTableListIndex(SReaderStatus* pStatus);
|
static void resetTableListIndex(SReaderStatus* pStatus);
|
||||||
|
@ -1466,7 +1466,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
|
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
|
||||||
|
|
||||||
int64_t tsLast = INT64_MIN;
|
int64_t tsLast = INT64_MIN;
|
||||||
if (hasDataInSttBlock(pSttBlockReader)) {
|
if (hasDataInSttBlock(pBlockScanInfo)) {
|
||||||
tsLast = getCurrentKeyInSttBlock(pSttBlockReader);
|
tsLast = getCurrentKeyInSttBlock(pSttBlockReader);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1485,7 +1485,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
int64_t minKey = 0;
|
int64_t minKey = 0;
|
||||||
if (pReader->info.order == TSDB_ORDER_ASC) {
|
if (pReader->info.order == TSDB_ORDER_ASC) {
|
||||||
minKey = INT64_MAX; // chosen the minimum value
|
minKey = INT64_MAX; // chosen the minimum value
|
||||||
if (minKey > tsLast && hasDataInSttBlock(pSttBlockReader)) {
|
if (minKey > tsLast && hasDataInSttBlock(pBlockScanInfo)) {
|
||||||
minKey = tsLast;
|
minKey = tsLast;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1498,7 +1498,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
minKey = INT64_MIN;
|
minKey = INT64_MIN;
|
||||||
if (minKey < tsLast && hasDataInSttBlock(pSttBlockReader)) {
|
if (minKey < tsLast && hasDataInSttBlock(pBlockScanInfo)) {
|
||||||
minKey = tsLast;
|
minKey = tsLast;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1705,7 +1705,7 @@ static int32_t mergeFileBlockAndSttBlock(STsdbReader* pReader, SSttBlockReader*
|
||||||
}
|
}
|
||||||
|
|
||||||
bool dataInDataFile = hasDataInFileBlock(pBlockData, pDumpInfo);
|
bool dataInDataFile = hasDataInFileBlock(pBlockData, pDumpInfo);
|
||||||
bool dataInSttFile = hasDataInSttBlock(pSttBlockReader);
|
bool dataInSttFile = hasDataInSttBlock(pBlockScanInfo);
|
||||||
if (dataInDataFile && (!dataInSttFile)) {
|
if (dataInDataFile && (!dataInSttFile)) {
|
||||||
// no stt file block available, only data block exists
|
// no stt file block available, only data block exists
|
||||||
return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader);
|
return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader);
|
||||||
|
@ -1791,7 +1791,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
TSDBROW* piRow = getValidMemRow(&pBlockScanInfo->iiter, pDelList, pReader);
|
TSDBROW* piRow = getValidMemRow(&pBlockScanInfo->iiter, pDelList, pReader);
|
||||||
|
|
||||||
int64_t tsLast = INT64_MIN;
|
int64_t tsLast = INT64_MIN;
|
||||||
if (hasDataInSttBlock(pSttBlockReader)) {
|
if (hasDataInSttBlock(pBlockScanInfo)) {
|
||||||
tsLast = getCurrentKeyInSttBlock(pSttBlockReader);
|
tsLast = getCurrentKeyInSttBlock(pSttBlockReader);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1840,7 +1840,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
minKey = key;
|
minKey = key;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (minKey > tsLast && hasDataInSttBlock(pSttBlockReader)) {
|
if (minKey > tsLast && hasDataInSttBlock(pBlockScanInfo)) {
|
||||||
minKey = tsLast;
|
minKey = tsLast;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
@ -1857,7 +1857,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
minKey = key;
|
minKey = key;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (minKey < tsLast && hasDataInSttBlock(pSttBlockReader)) {
|
if (minKey < tsLast && hasDataInSttBlock(pBlockScanInfo)) {
|
||||||
minKey = tsLast;
|
minKey = tsLast;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -2065,7 +2065,7 @@ static bool initSttBlockReader(SSttBlockReader* pSttBlockReader, STableBlockScan
|
||||||
|
|
||||||
// the stt block reader has been initialized for this table.
|
// the stt block reader has been initialized for this table.
|
||||||
if (pSttBlockReader->uid == pScanInfo->uid) {
|
if (pSttBlockReader->uid == pScanInfo->uid) {
|
||||||
return hasDataInSttBlock(pSttBlockReader);
|
return hasDataInSttBlock(pScanInfo);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pSttBlockReader->uid != 0) {
|
if (pSttBlockReader->uid != 0) {
|
||||||
|
@ -2158,7 +2158,9 @@ static bool initSttBlockReader(SSttBlockReader* pSttBlockReader, STableBlockScan
|
||||||
return hasData;
|
return hasData;
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool hasDataInSttBlock(SSttBlockReader* pSttBlockReader) { return pSttBlockReader->mergeTree.pIter != NULL; }
|
static bool hasDataInSttBlock(STableBlockScanInfo *pInfo) {
|
||||||
|
return pInfo->sttKeyInfo.status == STT_FILE_HAS_DATA;
|
||||||
|
}
|
||||||
|
|
||||||
bool hasDataInFileBlock(const SBlockData* pBlockData, const SFileBlockDumpInfo* pDumpInfo) {
|
bool hasDataInFileBlock(const SBlockData* pBlockData, const SFileBlockDumpInfo* pDumpInfo) {
|
||||||
if ((pBlockData->nRow > 0) && (pBlockData->nRow != pDumpInfo->totalRows)) {
|
if ((pBlockData->nRow > 0) && (pBlockData->nRow != pDumpInfo->totalRows)) {
|
||||||
|
@ -2733,7 +2735,7 @@ static int32_t doLoadSttBlockSequentially(STsdbReader* pReader) {
|
||||||
int64_t st = taosGetTimestampUs();
|
int64_t st = taosGetTimestampUs();
|
||||||
while (1) {
|
while (1) {
|
||||||
// no data in stt block and block, no need to proceed.
|
// no data in stt block and block, no need to proceed.
|
||||||
if (!hasDataInSttBlock(pSttBlockReader)) {
|
if (!hasDataInSttBlock(pScanInfo)) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2850,7 +2852,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
|
||||||
initSttBlockReader(pSttBlockReader, pScanInfo, pReader);
|
initSttBlockReader(pSttBlockReader, pScanInfo, pReader);
|
||||||
|
|
||||||
// no data in stt block, no need to proceed.
|
// no data in stt block, no need to proceed.
|
||||||
while (hasDataInSttBlock(pSttBlockReader)) {
|
while (hasDataInSttBlock(pScanInfo)) {
|
||||||
ASSERT(pScanInfo->sttKeyInfo.status == STT_FILE_HAS_DATA);
|
ASSERT(pScanInfo->sttKeyInfo.status == STT_FILE_HAS_DATA);
|
||||||
|
|
||||||
code = buildComposedDataBlockImpl(pReader, pScanInfo, &pReader->status.fileBlockData, pSttBlockReader);
|
code = buildComposedDataBlockImpl(pReader, pScanInfo, &pReader->status.fileBlockData, pSttBlockReader);
|
||||||
|
|
|
@ -252,6 +252,7 @@ static void doCleanupInfoForNextFileset(STableBlockScanInfo* pScanInfo) {
|
||||||
taosArrayClear(pScanInfo->pFileDelData); // del data from each file set
|
taosArrayClear(pScanInfo->pFileDelData); // del data from each file set
|
||||||
pScanInfo->cleanSttBlocks = false;
|
pScanInfo->cleanSttBlocks = false;
|
||||||
pScanInfo->numOfRowsInStt = 0;
|
pScanInfo->numOfRowsInStt = 0;
|
||||||
|
pScanInfo->sttBlockReturned = false;
|
||||||
INIT_TIMEWINDOW(&pScanInfo->sttWindow);
|
INIT_TIMEWINDOW(&pScanInfo->sttWindow);
|
||||||
INIT_TIMEWINDOW(&pScanInfo->filesetWindow);
|
INIT_TIMEWINDOW(&pScanInfo->filesetWindow);
|
||||||
pScanInfo->sttKeyInfo.status = STT_FILE_READER_UNINIT;
|
pScanInfo->sttKeyInfo.status = STT_FILE_READER_UNINIT;
|
||||||
|
|
|
@ -17,9 +17,12 @@
|
||||||
#define _TD_AUDIT_INT_H_
|
#define _TD_AUDIT_INT_H_
|
||||||
|
|
||||||
#include "audit.h"
|
#include "audit.h"
|
||||||
|
#include "tarray.h"
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
SAuditCfg cfg;
|
SAuditCfg cfg;
|
||||||
|
SArray *records;
|
||||||
|
TdThreadMutex lock;
|
||||||
} SAudit;
|
} SAudit;
|
||||||
|
|
||||||
#endif /*_TD_AUDIT_INT_H_*/
|
#endif /*_TD_AUDIT_INT_H_*/
|
||||||
|
|
|
@ -14,6 +14,8 @@
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#define _DEFAULT_SOURCE
|
#define _DEFAULT_SOURCE
|
||||||
|
|
||||||
|
#include "tarray.h"
|
||||||
#include "auditInt.h"
|
#include "auditInt.h"
|
||||||
#include "taoserror.h"
|
#include "taoserror.h"
|
||||||
#include "thttp.h"
|
#include "thttp.h"
|
||||||
|
@ -21,25 +23,56 @@
|
||||||
#include "tjson.h"
|
#include "tjson.h"
|
||||||
#include "tglobal.h"
|
#include "tglobal.h"
|
||||||
#include "mnode.h"
|
#include "mnode.h"
|
||||||
|
#include "audit.h"
|
||||||
|
|
||||||
SAudit tsAudit = {0};
|
SAudit tsAudit = {0};
|
||||||
char* tsAuditUri = "/audit";
|
char* tsAuditUri = "/audit";
|
||||||
|
char* tsAuditBatchUri = "/audit-batch";
|
||||||
|
|
||||||
int32_t auditInit(const SAuditCfg *pCfg) {
|
int32_t auditInit(const SAuditCfg *pCfg) {
|
||||||
tsAudit.cfg = *pCfg;
|
tsAudit.cfg = *pCfg;
|
||||||
|
tsAudit.records = taosArrayInit(0, sizeof(SAuditRecord *));
|
||||||
|
taosThreadMutexInit(&tsAudit.lock, NULL);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void auditCleanup() {
|
||||||
|
tsLogFp = NULL;
|
||||||
|
taosArrayDestroy(tsAudit.records);
|
||||||
|
tsAudit.records = NULL;
|
||||||
|
taosThreadMutexDestroy(&tsAudit.lock);
|
||||||
|
}
|
||||||
|
|
||||||
extern void auditRecordImp(SRpcMsg *pReq, int64_t clusterId, char *operation, char *target1, char *target2,
|
extern void auditRecordImp(SRpcMsg *pReq, int64_t clusterId, char *operation, char *target1, char *target2,
|
||||||
char *detail, int32_t len);
|
char *detail, int32_t len);
|
||||||
|
extern void auditAddRecordImp(SRpcMsg *pReq, int64_t clusterId, char *operation, char *target1, char *target2,
|
||||||
|
char *detail, int32_t len);
|
||||||
|
extern void auditSendRecordsInBatchImp();
|
||||||
|
|
||||||
void auditRecord(SRpcMsg *pReq, int64_t clusterId, char *operation, char *target1, char *target2,
|
void auditRecord(SRpcMsg *pReq, int64_t clusterId, char *operation, char *target1, char *target2,
|
||||||
char *detail, int32_t len) {
|
char *detail, int32_t len) {
|
||||||
auditRecordImp(pReq, clusterId, operation, target1, target2, detail, len);
|
auditRecordImp(pReq, clusterId, operation, target1, target2, detail, len);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void auditAddRecord(SRpcMsg *pReq, int64_t clusterId, char *operation, char *target1, char *target2,
|
||||||
|
char *detail, int32_t len) {
|
||||||
|
auditAddRecordImp(pReq, clusterId, operation, target1, target2, detail, len);
|
||||||
|
}
|
||||||
|
|
||||||
|
void auditSendRecordsInBatch(){
|
||||||
|
auditSendRecordsInBatchImp();
|
||||||
|
}
|
||||||
|
|
||||||
#ifndef TD_ENTERPRISE
|
#ifndef TD_ENTERPRISE
|
||||||
void auditRecordImp(SRpcMsg *pReq, int64_t clusterId, char *operation, char *target1, char *target2,
|
void auditRecordImp(SRpcMsg *pReq, int64_t clusterId, char *operation, char *target1, char *target2,
|
||||||
char *detail, int32_t len) {
|
char *detail, int32_t len) {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void auditAddRecordImp(SRpcMsg *pReq, int64_t clusterId, char *operation, char *target1, char *target2,
|
||||||
|
char *detail, int32_t len) {
|
||||||
|
}
|
||||||
|
|
||||||
|
void auditSendRecordsInBatchImp(){
|
||||||
|
|
||||||
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -46,6 +46,7 @@ SyncIndex raftLogBeginIndex(struct SSyncLogStore* pLogStore);
|
||||||
SyncIndex raftLogEndIndex(struct SSyncLogStore* pLogStore);
|
SyncIndex raftLogEndIndex(struct SSyncLogStore* pLogStore);
|
||||||
int32_t raftLogEntryCount(struct SSyncLogStore* pLogStore);
|
int32_t raftLogEntryCount(struct SSyncLogStore* pLogStore);
|
||||||
SyncIndex raftLogLastIndex(struct SSyncLogStore* pLogStore);
|
SyncIndex raftLogLastIndex(struct SSyncLogStore* pLogStore);
|
||||||
|
SyncIndex raftLogIndexRetention(struct SSyncLogStore* pLogStore, int64_t bytes);
|
||||||
SyncTerm raftLogLastTerm(struct SSyncLogStore* pLogStore);
|
SyncTerm raftLogLastTerm(struct SSyncLogStore* pLogStore);
|
||||||
int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index, SSyncRaftEntry** ppEntry);
|
int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index, SSyncRaftEntry** ppEntry);
|
||||||
|
|
||||||
|
|
|
@ -63,7 +63,6 @@ typedef struct SSyncSnapshotSender {
|
||||||
int64_t sendingMS;
|
int64_t sendingMS;
|
||||||
SyncTerm term;
|
SyncTerm term;
|
||||||
int64_t startTime;
|
int64_t startTime;
|
||||||
int64_t waitTime;
|
|
||||||
int64_t lastSendTime;
|
int64_t lastSendTime;
|
||||||
bool finish;
|
bool finish;
|
||||||
|
|
||||||
|
|
|
@ -305,6 +305,10 @@ SyncIndex syncMinMatchIndex(SSyncNode* pSyncNode) {
|
||||||
return minMatchIndex;
|
return minMatchIndex;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static SyncIndex syncLogRetentionIndex(SSyncNode* pSyncNode, int64_t bytes) {
|
||||||
|
return pSyncNode->pLogStore->syncLogIndexRetention(pSyncNode->pLogStore, bytes);
|
||||||
|
}
|
||||||
|
|
||||||
int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) {
|
int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) {
|
||||||
SSyncNode* pSyncNode = syncNodeAcquire(rid);
|
SSyncNode* pSyncNode = syncNodeAcquire(rid);
|
||||||
if (pSyncNode == NULL) {
|
if (pSyncNode == NULL) {
|
||||||
|
@ -331,7 +335,6 @@ int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) {
|
||||||
} else {
|
} else {
|
||||||
// vnode
|
// vnode
|
||||||
if (pSyncNode->replicaNum > 1) {
|
if (pSyncNode->replicaNum > 1) {
|
||||||
// multi replicas
|
|
||||||
logRetention = SYNC_VNODE_LOG_RETENTION;
|
logRetention = SYNC_VNODE_LOG_RETENTION;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -344,7 +347,9 @@ int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) {
|
||||||
syncNodeRelease(pSyncNode);
|
syncNodeRelease(pSyncNode);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
logRetention = TMAX(logRetention, lastApplyIndex - pSyncNode->minMatchIndex + logRetention);
|
SyncIndex retentionIndex =
|
||||||
|
TMAX(pSyncNode->minMatchIndex, syncLogRetentionIndex(pSyncNode, SYNC_WAL_LOG_RETENTION_SIZE));
|
||||||
|
logRetention += TMAX(0, lastApplyIndex - retentionIndex);
|
||||||
}
|
}
|
||||||
|
|
||||||
_DEL_WAL:
|
_DEL_WAL:
|
||||||
|
|
|
@ -70,6 +70,7 @@ SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode) {
|
||||||
pLogStore->syncLogIsEmpty = raftLogIsEmpty;
|
pLogStore->syncLogIsEmpty = raftLogIsEmpty;
|
||||||
pLogStore->syncLogEntryCount = raftLogEntryCount;
|
pLogStore->syncLogEntryCount = raftLogEntryCount;
|
||||||
pLogStore->syncLogLastIndex = raftLogLastIndex;
|
pLogStore->syncLogLastIndex = raftLogLastIndex;
|
||||||
|
pLogStore->syncLogIndexRetention = raftLogIndexRetention;
|
||||||
pLogStore->syncLogLastTerm = raftLogLastTerm;
|
pLogStore->syncLogLastTerm = raftLogLastTerm;
|
||||||
pLogStore->syncLogAppendEntry = raftLogAppendEntry;
|
pLogStore->syncLogAppendEntry = raftLogAppendEntry;
|
||||||
pLogStore->syncLogGetEntry = raftLogGetEntry;
|
pLogStore->syncLogGetEntry = raftLogGetEntry;
|
||||||
|
@ -154,6 +155,15 @@ SyncIndex raftLogLastIndex(struct SSyncLogStore* pLogStore) {
|
||||||
return lastVer;
|
return lastVer;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
SyncIndex raftLogIndexRetention(struct SSyncLogStore* pLogStore, int64_t bytes) {
|
||||||
|
SyncIndex lastIndex;
|
||||||
|
SSyncLogStoreData* pData = pLogStore->data;
|
||||||
|
SWal* pWal = pData->pWal;
|
||||||
|
SyncIndex lastVer = walGetVerRetention(pWal, bytes);
|
||||||
|
|
||||||
|
return lastVer;
|
||||||
|
}
|
||||||
|
|
||||||
SyncIndex raftLogWriteIndex(struct SSyncLogStore* pLogStore) {
|
SyncIndex raftLogWriteIndex(struct SSyncLogStore* pLogStore) {
|
||||||
SSyncLogStoreData* pData = pLogStore->data;
|
SSyncLogStoreData* pData = pLogStore->data;
|
||||||
SWal* pWal = pData->pWal;
|
SWal* pWal = pData->pWal;
|
||||||
|
|
|
@ -23,8 +23,9 @@
|
||||||
#include "syncReplication.h"
|
#include "syncReplication.h"
|
||||||
#include "syncUtil.h"
|
#include "syncUtil.h"
|
||||||
|
|
||||||
|
static SyncIndex syncNodeGetSnapBeginIndex(SSyncNode *ths);
|
||||||
|
|
||||||
static void syncSnapBufferReset(SSyncSnapBuffer *pBuf) {
|
static void syncSnapBufferReset(SSyncSnapBuffer *pBuf) {
|
||||||
taosThreadMutexLock(&pBuf->mutex);
|
|
||||||
for (int64_t i = pBuf->start; i < pBuf->end; ++i) {
|
for (int64_t i = pBuf->start; i < pBuf->end; ++i) {
|
||||||
if (pBuf->entryDeleteCb) {
|
if (pBuf->entryDeleteCb) {
|
||||||
pBuf->entryDeleteCb(pBuf->entries[i % pBuf->size]);
|
pBuf->entryDeleteCb(pBuf->entries[i % pBuf->size]);
|
||||||
|
@ -34,7 +35,6 @@ static void syncSnapBufferReset(SSyncSnapBuffer *pBuf) {
|
||||||
pBuf->start = SYNC_SNAPSHOT_SEQ_BEGIN + 1;
|
pBuf->start = SYNC_SNAPSHOT_SEQ_BEGIN + 1;
|
||||||
pBuf->end = pBuf->start;
|
pBuf->end = pBuf->start;
|
||||||
pBuf->cursor = pBuf->start - 1;
|
pBuf->cursor = pBuf->start - 1;
|
||||||
taosThreadMutexUnlock(&pBuf->mutex);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static void syncSnapBufferDestroy(SSyncSnapBuffer **ppBuf) {
|
static void syncSnapBufferDestroy(SSyncSnapBuffer **ppBuf) {
|
||||||
|
@ -81,7 +81,6 @@ SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaI
|
||||||
pSender->replicaIndex = replicaIndex;
|
pSender->replicaIndex = replicaIndex;
|
||||||
pSender->term = raftStoreGetTerm(pSyncNode);
|
pSender->term = raftStoreGetTerm(pSyncNode);
|
||||||
pSender->startTime = -1;
|
pSender->startTime = -1;
|
||||||
pSender->waitTime = -1;
|
|
||||||
pSender->pSyncNode->pFsm->FpGetSnapshotInfo(pSender->pSyncNode->pFsm, &pSender->snapshot);
|
pSender->pSyncNode->pFsm->FpGetSnapshotInfo(pSender->pSyncNode->pFsm, &pSender->snapshot);
|
||||||
pSender->finish = false;
|
pSender->finish = false;
|
||||||
|
|
||||||
|
@ -198,9 +197,9 @@ void snapshotSenderStop(SSyncSnapshotSender *pSender, bool finish) {
|
||||||
// update flag
|
// update flag
|
||||||
int8_t stopped = !atomic_val_compare_exchange_8(&pSender->start, true, false);
|
int8_t stopped = !atomic_val_compare_exchange_8(&pSender->start, true, false);
|
||||||
if (stopped) return;
|
if (stopped) return;
|
||||||
|
taosThreadMutexLock(&pSender->pSndBuf->mutex);
|
||||||
|
{
|
||||||
pSender->finish = finish;
|
pSender->finish = finish;
|
||||||
pSender->waitTime = -1;
|
|
||||||
|
|
||||||
// close reader
|
// close reader
|
||||||
if (pSender->pReader != NULL) {
|
if (pSender->pReader != NULL) {
|
||||||
|
@ -212,6 +211,8 @@ void snapshotSenderStop(SSyncSnapshotSender *pSender, bool finish) {
|
||||||
|
|
||||||
SRaftId destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
|
SRaftId destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
|
||||||
sSInfo(pSender, "snapshot sender stop, to dnode:%d, finish:%d", DID(&destId), finish);
|
sSInfo(pSender, "snapshot sender stop, to dnode:%d, finish:%d", DID(&destId), finish);
|
||||||
|
}
|
||||||
|
taosThreadMutexUnlock(&pSender->pSndBuf->mutex);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t syncSnapSendMsg(SSyncSnapshotSender *pSender, int32_t seq, void *pBlock, int32_t blockLen, int32_t typ) {
|
int32_t syncSnapSendMsg(SSyncSnapshotSender *pSender, int32_t seq, void *pBlock, int32_t blockLen, int32_t typ) {
|
||||||
|
@ -324,6 +325,9 @@ int32_t snapshotReSend(SSyncSnapshotSender *pSender) {
|
||||||
SSyncSnapBuffer *pSndBuf = pSender->pSndBuf;
|
SSyncSnapBuffer *pSndBuf = pSender->pSndBuf;
|
||||||
int32_t code = -1;
|
int32_t code = -1;
|
||||||
taosThreadMutexLock(&pSndBuf->mutex);
|
taosThreadMutexLock(&pSndBuf->mutex);
|
||||||
|
if (pSender->pReader == NULL || pSender->finish || !snapshotSenderIsStart(pSender)) {
|
||||||
|
goto _out;
|
||||||
|
}
|
||||||
|
|
||||||
for (int32_t seq = pSndBuf->cursor + 1; seq < pSndBuf->end; ++seq) {
|
for (int32_t seq = pSndBuf->cursor + 1; seq < pSndBuf->end; ++seq) {
|
||||||
SyncSnapBlock *pBlk = pSndBuf->entries[seq % pSndBuf->size];
|
SyncSnapBlock *pBlk = pSndBuf->entries[seq % pSndBuf->size];
|
||||||
|
@ -338,6 +342,12 @@ int32_t snapshotReSend(SSyncSnapshotSender *pSender) {
|
||||||
pBlk->sendTimeMs = nowMs;
|
pBlk->sendTimeMs = nowMs;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (pSender->seq != SYNC_SNAPSHOT_SEQ_END && pSndBuf->end <= pSndBuf->start) {
|
||||||
|
if (snapshotSend(pSender) != 0) {
|
||||||
|
goto _out;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (pSender->seq == SYNC_SNAPSHOT_SEQ_END && pSndBuf->end <= pSndBuf->start) {
|
if (pSender->seq == SYNC_SNAPSHOT_SEQ_END && pSndBuf->end <= pSndBuf->start) {
|
||||||
if (syncSnapSendMsg(pSender, pSender->seq, NULL, 0, 0) != 0) {
|
if (syncSnapSendMsg(pSender, pSender->seq, NULL, 0, 0) != 0) {
|
||||||
goto _out;
|
goto _out;
|
||||||
|
@ -361,14 +371,7 @@ int32_t syncNodeStartSnapshot(SSyncNode *pSyncNode, SRaftId *pDestId) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t timeNow = taosGetTimestampMs();
|
taosMsleep(1);
|
||||||
if (pSender->waitTime <= 0) {
|
|
||||||
pSender->waitTime = timeNow + SNAPSHOT_WAIT_MS;
|
|
||||||
}
|
|
||||||
if (timeNow < pSender->waitTime) {
|
|
||||||
sSDebug(pSender, "snapshot sender waitTime not expired yet, ignore");
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t code = snapshotSenderStart(pSender);
|
int32_t code = snapshotSenderStart(pSender);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
|
@ -504,7 +507,9 @@ void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *p
|
||||||
pReceiver->term = pPreMsg->term;
|
pReceiver->term = pPreMsg->term;
|
||||||
pReceiver->fromId = pPreMsg->srcId;
|
pReceiver->fromId = pPreMsg->srcId;
|
||||||
pReceiver->startTime = pPreMsg->startTime;
|
pReceiver->startTime = pPreMsg->startTime;
|
||||||
ASSERT(pReceiver->startTime);
|
|
||||||
|
pReceiver->snapshotParam.start = syncNodeGetSnapBeginIndex(pReceiver->pSyncNode);
|
||||||
|
pReceiver->snapshotParam.end = -1;
|
||||||
|
|
||||||
sRInfo(pReceiver, "snapshot receiver start, from dnode:%d.", DID(&pReceiver->fromId));
|
sRInfo(pReceiver, "snapshot receiver start, from dnode:%d.", DID(&pReceiver->fromId));
|
||||||
}
|
}
|
||||||
|
@ -514,10 +519,11 @@ void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver) {
|
||||||
|
|
||||||
int8_t stopped = !atomic_val_compare_exchange_8(&pReceiver->start, true, false);
|
int8_t stopped = !atomic_val_compare_exchange_8(&pReceiver->start, true, false);
|
||||||
if (stopped) return;
|
if (stopped) return;
|
||||||
|
taosThreadMutexLock(&pReceiver->pRcvBuf->mutex);
|
||||||
|
{
|
||||||
if (pReceiver->pWriter != NULL) {
|
if (pReceiver->pWriter != NULL) {
|
||||||
int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, false,
|
int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter,
|
||||||
&pReceiver->snapshot);
|
false, &pReceiver->snapshot);
|
||||||
if (ret != 0) {
|
if (ret != 0) {
|
||||||
sRError(pReceiver, "snapshot receiver stop write failed since %s", terrstr());
|
sRError(pReceiver, "snapshot receiver stop write failed since %s", terrstr());
|
||||||
}
|
}
|
||||||
|
@ -527,6 +533,8 @@ void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver) {
|
||||||
}
|
}
|
||||||
|
|
||||||
syncSnapBufferReset(pReceiver->pRcvBuf);
|
syncSnapBufferReset(pReceiver->pRcvBuf);
|
||||||
|
}
|
||||||
|
taosThreadMutexUnlock(&pReceiver->pRcvBuf->mutex);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t snapshotReceiverFinish(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg) {
|
static int32_t snapshotReceiverFinish(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg) {
|
||||||
|
|
|
@ -654,6 +654,23 @@ _err:
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int64_t walGetVerRetention(SWal* pWal, int64_t bytes) {
|
||||||
|
int64_t ver = -1;
|
||||||
|
int64_t totSize = 0;
|
||||||
|
taosThreadMutexLock(&pWal->mutex);
|
||||||
|
int32_t fileIdx = taosArrayGetSize(pWal->fileInfoSet);
|
||||||
|
while (--fileIdx) {
|
||||||
|
SWalFileInfo* pInfo = taosArrayGet(pWal->fileInfoSet, fileIdx);
|
||||||
|
if (totSize >= bytes) {
|
||||||
|
ver = pInfo->lastVer;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
totSize += pInfo->fileSize;
|
||||||
|
}
|
||||||
|
taosThreadMutexUnlock(&pWal->mutex);
|
||||||
|
return ver + 1;
|
||||||
|
}
|
||||||
|
|
||||||
int walCheckAndRepairIdx(SWal* pWal) {
|
int walCheckAndRepairIdx(SWal* pWal) {
|
||||||
int32_t sz = taosArrayGetSize(pWal->fileInfoSet);
|
int32_t sz = taosArrayGetSize(pWal->fileInfoSet);
|
||||||
int32_t fileIdx = sz;
|
int32_t fileIdx = sz;
|
||||||
|
|
Loading…
Reference in New Issue