Merge branch '3.0' into fix/TD-27839-3.0

This commit is contained in:
kailixu 2023-12-14 19:48:13 +08:00
commit b7bfbdd2b3
35 changed files with 341 additions and 190 deletions

View File

@ -107,8 +107,9 @@ extern int32_t tsMonitorMaxLogs;
extern bool tsMonitorComp;
// audit
extern bool tsEnableAudit;
extern bool tsEnableAuditCreateTable;
extern bool tsEnableAudit;
extern bool tsEnableAuditCreateTable;
extern int32_t tsAuditInterval;
// telem
extern bool tsEnableTelem;

View File

@ -23,13 +23,13 @@
#include "tjson.h"
#include "tmsgcb.h"
#include "trpc.h"
#include "mnode.h"
#ifdef __cplusplus
extern "C" {
#endif
#define AUDIT_DETAIL_MAX 65472
#define AUDIT_OPERATION_LEN 20
typedef struct {
const char *server;
@ -37,13 +37,28 @@ typedef struct {
bool comp;
} SAuditCfg;
typedef struct {
int64_t curTime;
char strClusterId[TSDB_CLUSTER_ID_LEN];
char clientAddress[50];
char user[TSDB_USER_LEN];
char operation[AUDIT_OPERATION_LEN];
char target1[TSDB_DB_NAME_LEN]; //put db name
char target2[TSDB_STREAM_NAME_LEN]; //put stb name, table name, topic name, user name, stream name, use max
char* detail;
} SAuditRecord;
int32_t auditInit(const SAuditCfg *pCfg);
void auditCleanup();
void auditSend(SJson *pJson);
void auditRecord(SRpcMsg *pReq, int64_t clusterId, char *operation, char *target1, char *target2,
char *detail, int32_t len);
void auditAddRecord(SRpcMsg *pReq, int64_t clusterId, char *operation, char *target1, char *target2,
char *detail, int32_t len);
void auditSendRecordsInBatch();
#ifdef __cplusplus
}
#endif
#endif /*_TD_MONITOR_H_*/
#endif /*_TD_AUDIT_H_*/

View File

@ -509,11 +509,8 @@ typedef struct SStreamMeta {
SArray* chkpSaved;
SArray* chkpInUse;
SRWLatch chkpDirLock;
void* qHandle;
int32_t pauseTaskNum;
void* bkdChkptMgt;
void* qHandle;
void* bkdChkptMgt;
} SStreamMeta;
int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo);
@ -840,11 +837,10 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int3
int32_t streamMetaGetNumOfTasks(SStreamMeta* pMeta);
SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId);
void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask);
int32_t streamMetaReopen(SStreamMeta* pMeta);
void streamMetaClear(SStreamMeta* pMeta);
void streamMetaInitBackend(SStreamMeta* pMeta);
int32_t streamMetaCommit(SStreamMeta* pMeta);
int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta);
int32_t streamMetaReloadAllTasks(SStreamMeta* pMeta);
int64_t streamMetaGetLatestCheckpointId(SStreamMeta* pMeta);
void streamMetaNotifyClose(SStreamMeta* pMeta);
int32_t streamTaskSetDb(SStreamMeta* pMeta, void* pTask, char* key);

View File

@ -33,11 +33,12 @@ extern "C" {
#define SYNC_MAX_PROGRESS_WAIT_MS 4000
#define SYNC_MAX_START_TIME_RANGE_MS (1000 * 20)
#define SYNC_MAX_RECV_TIME_RANGE_MS 1200
#define SYNC_DEL_WAL_MS (1000 * 60)
#define SYNC_ADD_QUORUM_COUNT 3
#define SYNC_VNODE_LOG_RETENTION (TSDB_SYNC_LOG_BUFFER_RETENTION + 1)
#define SNAPSHOT_WAIT_MS 1000 * 5
#define SYNC_WAL_LOG_RETENTION_SIZE (8LL * 1024 * 1024 * 1024)
#define SYNC_MAX_RETRY_BACKOFF 5
#define SYNC_LOG_REPL_RETRY_WAIT_MS 100
#define SYNC_APPEND_ENTRIES_TIMEOUT_MS 10000
@ -219,6 +220,7 @@ typedef struct SSyncLogStore {
SyncIndex (*syncLogWriteIndex)(struct SSyncLogStore* pLogStore);
SyncIndex (*syncLogLastIndex)(struct SSyncLogStore* pLogStore);
SyncIndex (*syncLogIndexRetention)(struct SSyncLogStore* pLogStore, int64_t bytes);
SyncTerm (*syncLogLastTerm)(struct SSyncLogStore* pLogStore);
int32_t (*syncLogAppendEntry)(struct SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry, bool forcSync);

View File

@ -225,6 +225,7 @@ bool walIsEmpty(SWal *);
int64_t walGetFirstVer(SWal *);
int64_t walGetSnapshotVer(SWal *);
int64_t walGetLastVer(SWal *);
int64_t walGetVerRetention(SWal *pWal, int64_t bytes);
int64_t walGetCommittedVer(SWal *);
int64_t walGetAppliedVer(SWal *);

View File

@ -671,6 +671,13 @@ upload:
continue;
}
if (i > 0 && cp.parts[i - 1].completed) {
if (taosLSeekFile(data->infileFD, cp.parts[i].offset, SEEK_SET) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto clean;
}
}
int seq = cp.parts[i].index + 1;
partData.manager = &manager;

View File

@ -95,8 +95,9 @@ int32_t tsMonitorMaxLogs = 100;
bool tsMonitorComp = false;
// audit
bool tsEnableAudit = true;
bool tsEnableAuditCreateTable = true;
bool tsEnableAudit = true;
bool tsEnableAuditCreateTable = true;
int32_t tsAuditInterval = 5000;
// telem
#ifdef TD_ENTERPRISE
@ -686,6 +687,8 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
if (cfgAddBool(pCfg, "audit", tsEnableAudit, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) return -1;
if (cfgAddBool(pCfg, "auditCreateTable", tsEnableAuditCreateTable, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
if (cfgAddInt32(pCfg, "auditInterval", tsAuditInterval, 500, 200000, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0)
return -1;
if (cfgAddBool(pCfg, "crashReporting", tsEnableCrashReport, CFG_SCOPE_BOTH, CFG_DYN_NONE) != 0) return -1;
if (cfgAddBool(pCfg, "telemetryReporting", tsEnableTelem, CFG_SCOPE_BOTH, CFG_DYN_ENT_SERVER) != 0) return -1;
@ -1137,6 +1140,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
tsEnableAudit = cfgGetItem(pCfg, "audit")->bval;
tsEnableAuditCreateTable = cfgGetItem(pCfg, "auditCreateTable")->bval;
tsAuditInterval = cfgGetItem(pCfg, "auditInterval")->i32;
tsEnableTelem = cfgGetItem(pCfg, "telemetryReporting")->bval;
tsEnableCrashReport = cfgGetItem(pCfg, "crashReporting")->bval;

View File

@ -30,12 +30,14 @@ typedef struct SDnodeMgmt {
TdThread statusThread;
TdThread notifyThread;
TdThread monitorThread;
TdThread auditThread;
TdThread crashReportThread;
SSingleWorker mgmtWorker;
ProcessCreateNodeFp processCreateNodeFp;
ProcessAlterNodeTypeFp processAlterNodeTypeFp;
ProcessDropNodeFp processDropNodeFp;
SendMonitorReportFp sendMonitorReportFp;
SendAuditRecordsFp sendAuditRecordsFp;
GetVnodeLoadsFp getVnodeLoadsFp;
GetVnodeLoadsFp getVnodeLoadsLiteFp;
GetMnodeLoadsFp getMnodeLoadsFp;
@ -62,7 +64,9 @@ void dmStopStatusThread(SDnodeMgmt *pMgmt);
int32_t dmStartNotifyThread(SDnodeMgmt *pMgmt);
void dmStopNotifyThread(SDnodeMgmt *pMgmt);
int32_t dmStartMonitorThread(SDnodeMgmt *pMgmt);
int32_t dmStartAuditThread(SDnodeMgmt *pMgmt);
void dmStopMonitorThread(SDnodeMgmt *pMgmt);
void dmStopAuditThread(SDnodeMgmt *pMgmt);
int32_t dmStartCrashReportThread(SDnodeMgmt *pMgmt);
void dmStopCrashReportThread(SDnodeMgmt *pMgmt);
int32_t dmStartWorker(SDnodeMgmt *pMgmt);

View File

@ -29,6 +29,9 @@ static int32_t dmStartMgmt(SDnodeMgmt *pMgmt) {
if (dmStartMonitorThread(pMgmt) != 0) {
return -1;
}
if (dmStartAuditThread(pMgmt) != 0) {
return -1;
}
if (dmStartCrashReportThread(pMgmt) != 0) {
return -1;
}
@ -38,6 +41,7 @@ static int32_t dmStartMgmt(SDnodeMgmt *pMgmt) {
static void dmStopMgmt(SDnodeMgmt *pMgmt) {
pMgmt->pData->stopped = true;
dmStopMonitorThread(pMgmt);
dmStopAuditThread(pMgmt);
dmStopStatusThread(pMgmt);
#if defined(TD_ENTERPRISE)
dmStopNotifyThread(pMgmt);
@ -60,6 +64,7 @@ static int32_t dmOpenMgmt(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
pMgmt->processAlterNodeTypeFp = pInput->processAlterNodeTypeFp;
pMgmt->processDropNodeFp = pInput->processDropNodeFp;
pMgmt->sendMonitorReportFp = pInput->sendMonitorReportFp;
pMgmt->sendAuditRecordsFp = pInput->sendAuditRecordFp;
pMgmt->getVnodeLoadsFp = pInput->getVnodeLoadsFp;
pMgmt->getVnodeLoadsLiteFp = pInput->getVnodeLoadsLiteFp;
pMgmt->getMnodeLoadsFp = pInput->getMnodeLoadsFp;

View File

@ -99,6 +99,27 @@ static void *dmMonitorThreadFp(void *param) {
return NULL;
}
static void *dmAuditThreadFp(void *param) {
SDnodeMgmt *pMgmt = param;
int64_t lastTime = taosGetTimestampMs();
setThreadName("dnode-audit");
while (1) {
taosMsleep(100);
if (pMgmt->pData->dropped || pMgmt->pData->stopped) break;
int64_t curTime = taosGetTimestampMs();
if (curTime < lastTime) lastTime = curTime;
float interval = curTime - lastTime;
if (interval >= tsAuditInterval) {
(*pMgmt->sendAuditRecordsFp)();
lastTime = curTime;
}
}
return NULL;
}
static void *dmCrashReportThreadFp(void *param) {
SDnodeMgmt *pMgmt = param;
int64_t lastTime = taosGetTimestampMs();
@ -218,6 +239,20 @@ int32_t dmStartMonitorThread(SDnodeMgmt *pMgmt) {
return 0;
}
int32_t dmStartAuditThread(SDnodeMgmt *pMgmt) {
TdThreadAttr thAttr;
taosThreadAttrInit(&thAttr);
taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
if (taosThreadCreate(&pMgmt->auditThread, &thAttr, dmAuditThreadFp, pMgmt) != 0) {
dError("failed to create audit thread since %s", strerror(errno));
return -1;
}
taosThreadAttrDestroy(&thAttr);
tmsgReportStartup("dnode-audit", "initialized");
return 0;
}
void dmStopMonitorThread(SDnodeMgmt *pMgmt) {
if (taosCheckPthreadValid(pMgmt->monitorThread)) {
taosThreadJoin(pMgmt->monitorThread, NULL);
@ -225,6 +260,13 @@ void dmStopMonitorThread(SDnodeMgmt *pMgmt) {
}
}
void dmStopAuditThread(SDnodeMgmt *pMgmt) {
if (taosCheckPthreadValid(pMgmt->auditThread)) {
taosThreadJoin(pMgmt->auditThread, NULL);
taosThreadClear(&pMgmt->auditThread);
}
}
int32_t dmStartCrashReportThread(SDnodeMgmt *pMgmt) {
if (!tsEnableCrashReport) {
return 0;

View File

@ -148,6 +148,7 @@ int32_t dmProcessNodeMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg);
// dmMonitor.c
void dmSendMonitorReport();
void dmSendAuditRecords();
void dmGetVnodeLoads(SMonVloadInfo *pInfo);
void dmGetVnodeLoadsLite(SMonVloadInfo *pInfo);
void dmGetMnodeLoads(SMonMloadInfo *pInfo);

View File

@ -189,6 +189,7 @@ void dmCleanup() {
if (dmCheckRepeatCleanup(pDnode) != 0) return;
dmCleanupDnode(pDnode);
monCleanup();
auditCleanup();
syncCleanUp();
walCleanUp();
udfcClose();
@ -396,6 +397,7 @@ SMgmtInputOpt dmBuildMgmtInputOpt(SMgmtWrapper *pWrapper) {
.processAlterNodeTypeFp = dmProcessAlterNodeTypeReq,
.processDropNodeFp = dmProcessDropNodeReq,
.sendMonitorReportFp = dmSendMonitorReport,
.sendAuditRecordFp = auditSendRecordsInBatch,
.getVnodeLoadsFp = dmGetVnodeLoads,
.getVnodeLoadsLiteFp = dmGetVnodeLoadsLite,
.getMnodeLoadsFp = dmGetMnodeLoads,

View File

@ -16,6 +16,7 @@
#define _DEFAULT_SOURCE
#include "dmMgmt.h"
#include "dmNodes.h"
#include "audit.h"
static void dmGetMonitorBasicInfo(SDnode *pDnode, SMonBasicInfo *pInfo) {
pInfo->protocol = 1;
@ -108,6 +109,11 @@ void dmSendMonitorReport() {
monSendReport();
}
//Todo: put this in seperate file in the future
void dmSendAuditRecords() {
auditSendRecordsInBatch();
}
void dmGetVnodeLoads(SMonVloadInfo *pInfo) {
SDnode *pDnode = dmInstance();
SMgmtWrapper *pWrapper = &pDnode->wrappers[VNODE];

View File

@ -86,6 +86,7 @@ typedef enum {
typedef int32_t (*ProcessCreateNodeFp)(EDndNodeType ntype, SRpcMsg *pMsg);
typedef int32_t (*ProcessDropNodeFp)(EDndNodeType ntype, SRpcMsg *pMsg);
typedef void (*SendMonitorReportFp)();
typedef void (*SendAuditRecordsFp)();
typedef void (*GetVnodeLoadsFp)(SMonVloadInfo *pInfo);
typedef void (*GetMnodeLoadsFp)(SMonMloadInfo *pInfo);
typedef void (*GetQnodeLoadsFp)(SQnodeLoad *pInfo);
@ -120,6 +121,7 @@ typedef struct {
ProcessAlterNodeTypeFp processAlterNodeTypeFp;
ProcessDropNodeFp processDropNodeFp;
SendMonitorReportFp sendMonitorReportFp;
SendAuditRecordsFp sendAuditRecordFp;
GetVnodeLoadsFp getVnodeLoadsFp;
GetVnodeLoadsFp getVnodeLoadsLiteFp;
GetMnodeLoadsFp getMnodeLoadsFp;

View File

@ -704,10 +704,10 @@ static int32_t mndPersistTaskDropReq(SMnode *pMnode, STrans *pTrans, SStreamTask
pReq->streamId = pTask->id.streamId;
STransAction action = {0};
SEpSet epset = {0};
if(pTask->info.nodeId == SNODE_HANDLE){
SEpSet epset = {0};
if (pTask->info.nodeId == SNODE_HANDLE) {
SSnodeObj *pObj = NULL;
void *pIter = NULL;
void *pIter = NULL;
while (1) {
pIter = sdbFetch(pMnode->pSdb, SDB_SNODE, pIter, (void **)&pObj);
if (pIter == NULL) {
@ -717,10 +717,16 @@ static int32_t mndPersistTaskDropReq(SMnode *pMnode, STrans *pTrans, SStreamTask
addEpIntoEpSet(&epset, pObj->pDnode->fqdn, pObj->pDnode->port);
sdbRelease(pMnode->pSdb, pObj);
}
}else{
} else {
SVgObj *pVgObj = mndAcquireVgroup(pMnode, pTask->info.nodeId);
epset = mndGetVgroupEpset(pMnode, pVgObj);
mndReleaseVgroup(pMnode, pVgObj);
if (pVgObj != NULL) {
epset = mndGetVgroupEpset(pMnode, pVgObj);
mndReleaseVgroup(pMnode, pVgObj);
} else {
mDebug("orphaned task:0x%x need to be dropped, nodeId:%d, no redo action", pTask->id.taskId, pTask->info.nodeId);
taosMemoryFree(pReq);
return 0;
}
}
// The epset of nodeId of this task may have been expired now, let's use the newest epset from mnode.
@ -1657,6 +1663,7 @@ static void setTaskAttrInResBlock(SStreamObj *pStream, SStreamTask *pTask, SSDat
STaskStatusEntry *pe = taosHashGet(execInfo.pTaskMap, &id, sizeof(id));
if (pe == NULL) {
mError("task:0x%" PRIx64 " not exists in vnode, no valid status/stage info", id.taskId);
return;
}

View File

@ -48,8 +48,7 @@ int32_t clearFinishedTrans(SMnode* pMnode) {
void* pKey = taosHashGetKey(pEntry, &keyLen);
// key is the name of src/dst db name
SKeyInfo info = {.pKey = pKey, .keyLen = keyLen};
mDebug("transId:%d %s startTs:%" PRId64 "cleared due to finished", pEntry->transId, pEntry->name,
mDebug("transId:%d %s startTs:%" PRId64 " cleared since finished", pEntry->transId, pEntry->name,
pEntry->startTime);
taosArrayPush(pList, &info);
} else {

View File

@ -699,7 +699,23 @@ end:
return ret;
}
void freePtr(void* ptr) { taosMemoryFree(*(void**)ptr); }
static void freePtr(void* ptr) { taosMemoryFree(*(void**)ptr); }
static STaskId replaceStreamTaskId(SStreamTask* pTask) {
ASSERT(pTask->info.fillHistory);
STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
pTask->id.streamId = pTask->streamTaskId.streamId;
pTask->id.taskId = pTask->streamTaskId.taskId;
return id;
}
static void restoreStreamTaskId(SStreamTask* pTask, STaskId* pId) {
ASSERT(pTask->info.fillHistory);
pTask->id.taskId = pId->taskId;
pTask->id.streamId = pId->streamId;
}
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t nextProcessVer) {
int32_t vgId = TD_VID(pTq->pVnode);
@ -713,15 +729,9 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t nextProcessVer) {
streamTaskOpenAllUpstreamInput(pTask);
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
SStreamTask* pStateTask = pTask;
STaskId taskId = {.streamId = 0, .taskId = 0};
STaskId taskId = {0};
if (pTask->info.fillHistory) {
taskId.streamId = pTask->id.streamId;
taskId.taskId = pTask->id.taskId;
pTask->id.streamId = pTask->streamTaskId.streamId;
pTask->id.taskId = pTask->streamTaskId.taskId;
taskId = replaceStreamTaskId(pTask);
}
pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false, -1, -1);
@ -731,9 +741,9 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t nextProcessVer) {
} else {
tqDebug("s-task:%s state:%p", pTask->id.idStr, pTask->pState);
}
if (pTask->info.fillHistory) {
pTask->id.streamId = taskId.streamId;
pTask->id.taskId = taskId.taskId;
restoreStreamTaskId(pTask, &taskId);
}
SReadHandle handle = {
@ -754,15 +764,9 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t nextProcessVer) {
qSetTaskId(pTask->exec.pExecutor, pTask->id.taskId, pTask->id.streamId);
} else if (pTask->info.taskLevel == TASK_LEVEL__AGG) {
SStreamTask* pSateTask = pTask;
// SStreamTask task = {0};
STaskId taskId = {.streamId = 0, .taskId = 0};
STaskId taskId = {0};
if (pTask->info.fillHistory) {
taskId.streamId = pTask->id.streamId;
taskId.taskId = pTask->id.taskId;
pTask->id.streamId = pTask->streamTaskId.streamId;
pTask->id.taskId = pTask->streamTaskId.taskId;
taskId = replaceStreamTaskId(pTask);
}
pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false, -1, -1);
@ -774,15 +778,13 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t nextProcessVer) {
}
if (pTask->info.fillHistory) {
pTask->id.streamId = taskId.streamId;
pTask->id.taskId = taskId.taskId;
restoreStreamTaskId(pTask, &taskId);
}
int32_t numOfVgroups = (int32_t)taosArrayGetSize(pTask->upstreamInfo.pList);
SReadHandle handle = {
.checkpointId = pTask->chkInfo.checkpointId,
.vnode = NULL,
.numOfVgroups = numOfVgroups,
.numOfVgroups = (int32_t)taosArrayGetSize(pTask->upstreamInfo.pList),
.pStateBackend = pTask->pState,
.fillHistory = pTask->info.fillHistory,
.winRange = pTask->dataRange.window,
@ -828,12 +830,6 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t nextProcessVer) {
pTask->exec.pWalReader = walOpenReader(pTq->pVnode->pWal, &cond, pTask->id.taskId);
}
// // reset the task status from unfinished transaction
// if (pTask->status.taskStatus == TASK_STATUS__PAUSE) {
// tqWarn("s-task:%s reset task status to be normal, status kept in taskMeta: Paused", pTask->id.idStr);
// pTask->status.taskStatus = TASK_STATUS__READY;
// }
streamTaskResetUpstreamStageInfo(pTask);
streamSetupScheduleTrigger(pTask);
SCheckpointInfo* pChkInfo = &pTask->chkInfo;

View File

@ -181,5 +181,5 @@ int32_t streamStateRebuildFromSnap(SStreamStateWriter* pWriter, int64_t chkpId)
}
int32_t streamStateLoadTasks(SStreamStateWriter* pWriter) {
return streamMetaReloadAllTasks(pWriter->pTq->pStreamMeta);
return streamMetaLoadAllTasks(pWriter->pTq->pStreamMeta);
}

View File

@ -712,9 +712,9 @@ int32_t resetStreamTaskStatus(SStreamMeta* pMeta) {
}
static int32_t restartStreamTasks(SStreamMeta* pMeta, bool isLeader) {
int32_t vgId = pMeta->vgId;
int32_t code = 0;
int64_t st = taosGetTimestampMs();
int32_t vgId = pMeta->vgId;
int32_t code = 0;
int64_t st = taosGetTimestampMs();
while(1) {
int32_t startVal = atomic_val_compare_exchange_32(&pMeta->startInfo.taskStarting, 0, 1);
@ -736,17 +736,9 @@ static int32_t restartStreamTasks(SStreamMeta* pMeta, bool isLeader) {
}
streamMetaWLock(pMeta);
code = streamMetaReopen(pMeta);
if (code != TSDB_CODE_SUCCESS) {
tqError("vgId:%d failed to reopen stream meta", vgId);
streamMetaWUnLock(pMeta);
code = terrno;
return code;
}
streamMetaClear(pMeta);
streamMetaInitBackend(pMeta);
int64_t el = taosGetTimestampMs() - st;
tqInfo("vgId:%d close&reload state elapsed time:%.3fs", vgId, el/1000.);
code = streamMetaLoadAllTasks(pMeta);
@ -758,10 +750,10 @@ static int32_t restartStreamTasks(SStreamMeta* pMeta, bool isLeader) {
}
if (isLeader && !tsDisableStream) {
tqInfo("vgId:%d restart all stream tasks after all tasks being updated", vgId);
resetStreamTaskStatus(pMeta);
streamMetaWUnLock(pMeta);
tqInfo("vgId:%d restart all stream tasks after all tasks being updated", vgId);
startStreamTasks(pMeta);
} else {
streamMetaResetStartInfo(&pMeta->startInfo);

View File

@ -115,7 +115,7 @@ static int32_t tsdbDataFileRAWWriterDoClose(SDataFileRAWWriter *writer) { return
static int32_t tsdbDataFileRAWWriterCloseCommit(SDataFileRAWWriter *writer, TFileOpArray *opArr) {
int32_t code = 0;
int32_t lino = 0;
ASSERT(writer->ctx->offset == writer->file.size);
ASSERT(writer->ctx->offset <= writer->file.size);
ASSERT(writer->config->fid == writer->file.fid);
STFileOp op = (STFileOp){

View File

@ -67,7 +67,7 @@ static SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond
static int32_t doBuildDataBlock(STsdbReader* pReader);
static TSDBKEY getCurrentKeyInBuf(STableBlockScanInfo* pScanInfo, STsdbReader* pReader);
static bool hasDataInFileBlock(const SBlockData* pBlockData, const SFileBlockDumpInfo* pDumpInfo);
static bool hasDataInSttBlock(SSttBlockReader* pSttBlockReader);
static bool hasDataInSttBlock(STableBlockScanInfo *pInfo);
static void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter);
static int32_t getInitialDelIndex(const SArray* pDelSkyline, int32_t order);
static void resetTableListIndex(SReaderStatus* pStatus);
@ -1466,7 +1466,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
int64_t tsLast = INT64_MIN;
if (hasDataInSttBlock(pSttBlockReader)) {
if (hasDataInSttBlock(pBlockScanInfo)) {
tsLast = getCurrentKeyInSttBlock(pSttBlockReader);
}
@ -1485,7 +1485,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
int64_t minKey = 0;
if (pReader->info.order == TSDB_ORDER_ASC) {
minKey = INT64_MAX; // chosen the minimum value
if (minKey > tsLast && hasDataInSttBlock(pSttBlockReader)) {
if (minKey > tsLast && hasDataInSttBlock(pBlockScanInfo)) {
minKey = tsLast;
}
@ -1498,7 +1498,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
}
} else {
minKey = INT64_MIN;
if (minKey < tsLast && hasDataInSttBlock(pSttBlockReader)) {
if (minKey < tsLast && hasDataInSttBlock(pBlockScanInfo)) {
minKey = tsLast;
}
@ -1705,7 +1705,7 @@ static int32_t mergeFileBlockAndSttBlock(STsdbReader* pReader, SSttBlockReader*
}
bool dataInDataFile = hasDataInFileBlock(pBlockData, pDumpInfo);
bool dataInSttFile = hasDataInSttBlock(pSttBlockReader);
bool dataInSttFile = hasDataInSttBlock(pBlockScanInfo);
if (dataInDataFile && (!dataInSttFile)) {
// no stt file block available, only data block exists
return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader);
@ -1791,7 +1791,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
TSDBROW* piRow = getValidMemRow(&pBlockScanInfo->iiter, pDelList, pReader);
int64_t tsLast = INT64_MIN;
if (hasDataInSttBlock(pSttBlockReader)) {
if (hasDataInSttBlock(pBlockScanInfo)) {
tsLast = getCurrentKeyInSttBlock(pSttBlockReader);
}
@ -1840,7 +1840,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
minKey = key;
}
if (minKey > tsLast && hasDataInSttBlock(pSttBlockReader)) {
if (minKey > tsLast && hasDataInSttBlock(pBlockScanInfo)) {
minKey = tsLast;
}
} else {
@ -1857,7 +1857,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
minKey = key;
}
if (minKey < tsLast && hasDataInSttBlock(pSttBlockReader)) {
if (minKey < tsLast && hasDataInSttBlock(pBlockScanInfo)) {
minKey = tsLast;
}
}
@ -2065,7 +2065,7 @@ static bool initSttBlockReader(SSttBlockReader* pSttBlockReader, STableBlockScan
// the stt block reader has been initialized for this table.
if (pSttBlockReader->uid == pScanInfo->uid) {
return hasDataInSttBlock(pSttBlockReader);
return hasDataInSttBlock(pScanInfo);
}
if (pSttBlockReader->uid != 0) {
@ -2158,7 +2158,9 @@ static bool initSttBlockReader(SSttBlockReader* pSttBlockReader, STableBlockScan
return hasData;
}
static bool hasDataInSttBlock(SSttBlockReader* pSttBlockReader) { return pSttBlockReader->mergeTree.pIter != NULL; }
static bool hasDataInSttBlock(STableBlockScanInfo *pInfo) {
return pInfo->sttKeyInfo.status == STT_FILE_HAS_DATA;
}
bool hasDataInFileBlock(const SBlockData* pBlockData, const SFileBlockDumpInfo* pDumpInfo) {
if ((pBlockData->nRow > 0) && (pBlockData->nRow != pDumpInfo->totalRows)) {
@ -2733,7 +2735,7 @@ static int32_t doLoadSttBlockSequentially(STsdbReader* pReader) {
int64_t st = taosGetTimestampUs();
while (1) {
// no data in stt block and block, no need to proceed.
if (!hasDataInSttBlock(pSttBlockReader)) {
if (!hasDataInSttBlock(pScanInfo)) {
break;
}
@ -2850,7 +2852,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
initSttBlockReader(pSttBlockReader, pScanInfo, pReader);
// no data in stt block, no need to proceed.
while (hasDataInSttBlock(pSttBlockReader)) {
while (hasDataInSttBlock(pScanInfo)) {
ASSERT(pScanInfo->sttKeyInfo.status == STT_FILE_HAS_DATA);
code = buildComposedDataBlockImpl(pReader, pScanInfo, &pReader->status.fileBlockData, pSttBlockReader);

View File

@ -252,6 +252,7 @@ static void doCleanupInfoForNextFileset(STableBlockScanInfo* pScanInfo) {
taosArrayClear(pScanInfo->pFileDelData); // del data from each file set
pScanInfo->cleanSttBlocks = false;
pScanInfo->numOfRowsInStt = 0;
pScanInfo->sttBlockReturned = false;
INIT_TIMEWINDOW(&pScanInfo->sttWindow);
INIT_TIMEWINDOW(&pScanInfo->filesetWindow);
pScanInfo->sttKeyInfo.status = STT_FILE_READER_UNINIT;

View File

@ -1012,7 +1012,7 @@ static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t ver, void *pReq,
char* keyJoined = taosStringBuilderGetResult(&sb, &len);
if(pOriginRpc->info.conn.user != NULL && strlen(pOriginRpc->info.conn.user) > 0){
auditRecord(pOriginRpc, clusterId, "createTable", name.dbname, "", keyJoined, len);
auditAddRecord(pOriginRpc, clusterId, "createTable", name.dbname, "", keyJoined, len);
}
taosStringBuilderDestroy(&sb);
@ -1236,7 +1236,7 @@ static int32_t vnodeProcessDropTbReq(SVnode *pVnode, int64_t ver, void *pReq, in
char *keyJoined = taosStringBuilderGetResult(&sb, &len);
if(pOriginRpc->info.conn.user != NULL && strlen(pOriginRpc->info.conn.user) > 0){
auditRecord(pOriginRpc, clusterId, "dropTable", name.dbname, "", keyJoined, len);
auditAddRecord(pOriginRpc, clusterId, "dropTable", name.dbname, "", keyJoined, len);
}
taosStringBuilderDestroy(&sb);

View File

@ -17,9 +17,12 @@
#define _TD_AUDIT_INT_H_
#include "audit.h"
#include "tarray.h"
typedef struct {
SAuditCfg cfg;
SArray *records;
TdThreadMutex lock;
} SAudit;
#endif /*_TD_AUDIT_INT_H_*/

View File

@ -14,6 +14,8 @@
*/
#define _DEFAULT_SOURCE
#include "tarray.h"
#include "auditInt.h"
#include "taoserror.h"
#include "thttp.h"
@ -21,25 +23,56 @@
#include "tjson.h"
#include "tglobal.h"
#include "mnode.h"
#include "audit.h"
SAudit tsAudit = {0};
char* tsAuditUri = "/audit";
char* tsAuditBatchUri = "/audit-batch";
int32_t auditInit(const SAuditCfg *pCfg) {
tsAudit.cfg = *pCfg;
tsAudit.records = taosArrayInit(0, sizeof(SAuditRecord *));
taosThreadMutexInit(&tsAudit.lock, NULL);
return 0;
}
void auditCleanup() {
tsLogFp = NULL;
taosArrayDestroy(tsAudit.records);
tsAudit.records = NULL;
taosThreadMutexDestroy(&tsAudit.lock);
}
extern void auditRecordImp(SRpcMsg *pReq, int64_t clusterId, char *operation, char *target1, char *target2,
char *detail, int32_t len);
extern void auditAddRecordImp(SRpcMsg *pReq, int64_t clusterId, char *operation, char *target1, char *target2,
char *detail, int32_t len);
extern void auditSendRecordsInBatchImp();
void auditRecord(SRpcMsg *pReq, int64_t clusterId, char *operation, char *target1, char *target2,
char *detail, int32_t len) {
auditRecordImp(pReq, clusterId, operation, target1, target2, detail, len);
}
void auditAddRecord(SRpcMsg *pReq, int64_t clusterId, char *operation, char *target1, char *target2,
char *detail, int32_t len) {
auditAddRecordImp(pReq, clusterId, operation, target1, target2, detail, len);
}
void auditSendRecordsInBatch(){
auditSendRecordsInBatchImp();
}
#ifndef TD_ENTERPRISE
void auditRecordImp(SRpcMsg *pReq, int64_t clusterId, char *operation, char *target1, char *target2,
char *detail, int32_t len) {
}
void auditAddRecordImp(SRpcMsg *pReq, int64_t clusterId, char *operation, char *target1, char *target2,
char *detail, int32_t len) {
}
void auditSendRecordsInBatchImp(){
}
#endif

View File

@ -5466,13 +5466,12 @@ bool blockDistSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo) {
}
int32_t blockDistFunction(SqlFunctionCtx* pCtx) {
const int32_t BLOCK_DIST_RESULT_ROWS = 24;
const int32_t BLOCK_DIST_RESULT_ROWS = 25;
SInputColumnInfoData* pInput = &pCtx->input;
SColumnInfoData* pInputCol = pInput->pData[0];
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
STableBlockDistInfo* pDistInfo = GET_ROWCELL_INTERBUF(pResInfo);
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
STableBlockDistInfo* pDistInfo = GET_ROWCELL_INTERBUF(pResInfo);
STableBlockDistInfo p1 = {0};
tDeserializeBlockDistInfo(varDataVal(pInputCol->pData), varDataLen(pInputCol->pData), &p1);

View File

@ -31,7 +31,6 @@ int32_t streamMetaId = 0;
int32_t taskDbWrapperId = 0;
static void metaHbToMnode(void* param, void* tmrId);
static void streamMetaClear(SStreamMeta* pMeta);
static int32_t streamMetaBegin(SStreamMeta* pMeta);
static void streamMetaCloseImpl(void* arg);
@ -395,41 +394,6 @@ _err:
return NULL;
}
int32_t streamMetaReopen(SStreamMeta* pMeta) {
streamMetaClear(pMeta);
// NOTE: role should not be changed during reopen meta
pMeta->streamBackendRid = -1;
pMeta->streamBackend = NULL;
char* defaultPath = taosMemoryCalloc(1, strlen(pMeta->path) + 128);
sprintf(defaultPath, "%s%s%s", pMeta->path, TD_DIRSEP, "state");
taosRemoveDir(defaultPath);
char* newPath = taosMemoryCalloc(1, strlen(pMeta->path) + 128);
sprintf(newPath, "%s%s%s", pMeta->path, TD_DIRSEP, "received");
int32_t code = taosStatFile(newPath, NULL, NULL, NULL);
if (code == 0) {
// directory exists
code = taosRenameFile(newPath, defaultPath);
if (code != 0) {
terrno = TAOS_SYSTEM_ERROR(code);
stError("vgId:%d failed to rename file, from %s to %s, code:%s", pMeta->vgId, newPath, defaultPath,
tstrerror(terrno));
taosMemoryFree(defaultPath);
taosMemoryFree(newPath);
return -1;
}
}
taosMemoryFree(defaultPath);
taosMemoryFree(newPath);
return 0;
}
// todo refactor: the lock shoud be restricted in one function
void streamMetaInitBackend(SStreamMeta* pMeta) {
pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId, pMeta->vgId);
@ -829,28 +793,27 @@ static void doClear(void* pKey, void* pVal, TBC* pCur, SArray* pRecycleList) {
taosArrayDestroy(pRecycleList);
}
int32_t streamMetaReloadAllTasks(SStreamMeta* pMeta) {
if (pMeta == NULL) return 0;
return streamMetaLoadAllTasks(pMeta);
}
int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) {
TBC* pCur = NULL;
int32_t vgId = pMeta->vgId;
stInfo("vgId:%d load stream tasks from meta files", vgId);
if (tdbTbcOpen(pMeta->pTaskDb, &pCur, NULL) < 0) {
stError("vgId:%d failed to open stream meta, code:%s", vgId, tstrerror(terrno));
return -1;
}
void* pKey = NULL;
int32_t kLen = 0;
void* pVal = NULL;
int32_t vLen = 0;
SDecoder decoder;
SArray* pRecycleList = taosArrayInit(4, sizeof(STaskId));
if (pMeta == NULL) {
return TSDB_CODE_SUCCESS;
}
SArray* pRecycleList = taosArrayInit(4, sizeof(STaskId));
int32_t vgId = pMeta->vgId;
stInfo("vgId:%d load stream tasks from meta files", vgId);
if (tdbTbcOpen(pMeta->pTaskDb, &pCur, NULL) < 0) {
stError("vgId:%d failed to open stream meta, code:%s", vgId, tstrerror(terrno));
taosArrayDestroy(pRecycleList);
return -1;
}
tdbTbcMoveToFirst(pCur);
while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) {

View File

@ -46,6 +46,7 @@ SyncIndex raftLogBeginIndex(struct SSyncLogStore* pLogStore);
SyncIndex raftLogEndIndex(struct SSyncLogStore* pLogStore);
int32_t raftLogEntryCount(struct SSyncLogStore* pLogStore);
SyncIndex raftLogLastIndex(struct SSyncLogStore* pLogStore);
SyncIndex raftLogIndexRetention(struct SSyncLogStore* pLogStore, int64_t bytes);
SyncTerm raftLogLastTerm(struct SSyncLogStore* pLogStore);
int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index, SSyncRaftEntry** ppEntry);

View File

@ -63,7 +63,6 @@ typedef struct SSyncSnapshotSender {
int64_t sendingMS;
SyncTerm term;
int64_t startTime;
int64_t waitTime;
int64_t lastSendTime;
bool finish;

View File

@ -305,6 +305,10 @@ SyncIndex syncMinMatchIndex(SSyncNode* pSyncNode) {
return minMatchIndex;
}
static SyncIndex syncLogRetentionIndex(SSyncNode* pSyncNode, int64_t bytes) {
return pSyncNode->pLogStore->syncLogIndexRetention(pSyncNode->pLogStore, bytes);
}
int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) {
SSyncNode* pSyncNode = syncNodeAcquire(rid);
if (pSyncNode == NULL) {
@ -331,7 +335,6 @@ int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) {
} else {
// vnode
if (pSyncNode->replicaNum > 1) {
// multi replicas
logRetention = SYNC_VNODE_LOG_RETENTION;
}
}
@ -344,7 +347,9 @@ int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) {
syncNodeRelease(pSyncNode);
return 0;
}
logRetention = TMAX(logRetention, lastApplyIndex - pSyncNode->minMatchIndex + logRetention);
SyncIndex retentionIndex =
TMAX(pSyncNode->minMatchIndex, syncLogRetentionIndex(pSyncNode, SYNC_WAL_LOG_RETENTION_SIZE));
logRetention += TMAX(0, lastApplyIndex - retentionIndex);
}
_DEL_WAL:

View File

@ -70,6 +70,7 @@ SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode) {
pLogStore->syncLogIsEmpty = raftLogIsEmpty;
pLogStore->syncLogEntryCount = raftLogEntryCount;
pLogStore->syncLogLastIndex = raftLogLastIndex;
pLogStore->syncLogIndexRetention = raftLogIndexRetention;
pLogStore->syncLogLastTerm = raftLogLastTerm;
pLogStore->syncLogAppendEntry = raftLogAppendEntry;
pLogStore->syncLogGetEntry = raftLogGetEntry;
@ -154,6 +155,15 @@ SyncIndex raftLogLastIndex(struct SSyncLogStore* pLogStore) {
return lastVer;
}
SyncIndex raftLogIndexRetention(struct SSyncLogStore* pLogStore, int64_t bytes) {
SyncIndex lastIndex;
SSyncLogStoreData* pData = pLogStore->data;
SWal* pWal = pData->pWal;
SyncIndex lastVer = walGetVerRetention(pWal, bytes);
return lastVer;
}
SyncIndex raftLogWriteIndex(struct SSyncLogStore* pLogStore) {
SSyncLogStoreData* pData = pLogStore->data;
SWal* pWal = pData->pWal;

View File

@ -23,8 +23,9 @@
#include "syncReplication.h"
#include "syncUtil.h"
static SyncIndex syncNodeGetSnapBeginIndex(SSyncNode *ths);
static void syncSnapBufferReset(SSyncSnapBuffer *pBuf) {
taosThreadMutexLock(&pBuf->mutex);
for (int64_t i = pBuf->start; i < pBuf->end; ++i) {
if (pBuf->entryDeleteCb) {
pBuf->entryDeleteCb(pBuf->entries[i % pBuf->size]);
@ -34,7 +35,6 @@ static void syncSnapBufferReset(SSyncSnapBuffer *pBuf) {
pBuf->start = SYNC_SNAPSHOT_SEQ_BEGIN + 1;
pBuf->end = pBuf->start;
pBuf->cursor = pBuf->start - 1;
taosThreadMutexUnlock(&pBuf->mutex);
}
static void syncSnapBufferDestroy(SSyncSnapBuffer **ppBuf) {
@ -81,7 +81,6 @@ SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaI
pSender->replicaIndex = replicaIndex;
pSender->term = raftStoreGetTerm(pSyncNode);
pSender->startTime = -1;
pSender->waitTime = -1;
pSender->pSyncNode->pFsm->FpGetSnapshotInfo(pSender->pSyncNode->pFsm, &pSender->snapshot);
pSender->finish = false;
@ -108,6 +107,19 @@ void syncSnapBlockDestroy(void *ptr) {
taosMemoryFree(pBlk);
}
static int32_t snapshotSenderClearInfoData(SSyncSnapshotSender *pSender) {
if (pSender->snapshotParam.data) {
taosMemoryFree(pSender->snapshotParam.data);
pSender->snapshotParam.data = NULL;
}
if (pSender->snapshot.data) {
taosMemoryFree(pSender->snapshot.data);
pSender->snapshot.data = NULL;
}
return 0;
}
void snapshotSenderDestroy(SSyncSnapshotSender *pSender) {
if (pSender == NULL) return;
@ -122,10 +134,8 @@ void snapshotSenderDestroy(SSyncSnapshotSender *pSender) {
syncSnapBufferDestroy(&pSender->pSndBuf);
}
if (pSender->snapshotParam.data) {
taosMemoryFree(pSender->snapshotParam.data);
pSender->snapshotParam.data = NULL;
}
snapshotSenderClearInfoData(pSender);
// free sender
taosMemoryFree(pSender);
}
@ -198,20 +208,24 @@ void snapshotSenderStop(SSyncSnapshotSender *pSender, bool finish) {
// update flag
int8_t stopped = !atomic_val_compare_exchange_8(&pSender->start, true, false);
if (stopped) return;
taosThreadMutexLock(&pSender->pSndBuf->mutex);
{
pSender->finish = finish;
pSender->finish = finish;
pSender->waitTime = -1;
// close reader
if (pSender->pReader != NULL) {
pSender->pSyncNode->pFsm->FpSnapshotStopRead(pSender->pSyncNode->pFsm, pSender->pReader);
pSender->pReader = NULL;
}
// close reader
if (pSender->pReader != NULL) {
pSender->pSyncNode->pFsm->FpSnapshotStopRead(pSender->pSyncNode->pFsm, pSender->pReader);
pSender->pReader = NULL;
syncSnapBufferReset(pSender->pSndBuf);
snapshotSenderClearInfoData(pSender);
SRaftId destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
sSInfo(pSender, "snapshot sender stop, to dnode:%d, finish:%d", DID(&destId), finish);
}
syncSnapBufferReset(pSender->pSndBuf);
SRaftId destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
sSInfo(pSender, "snapshot sender stop, to dnode:%d, finish:%d", DID(&destId), finish);
taosThreadMutexUnlock(&pSender->pSndBuf->mutex);
}
int32_t syncSnapSendMsg(SSyncSnapshotSender *pSender, int32_t seq, void *pBlock, int32_t blockLen, int32_t typ) {
@ -324,6 +338,9 @@ int32_t snapshotReSend(SSyncSnapshotSender *pSender) {
SSyncSnapBuffer *pSndBuf = pSender->pSndBuf;
int32_t code = -1;
taosThreadMutexLock(&pSndBuf->mutex);
if (pSender->pReader == NULL || pSender->finish || !snapshotSenderIsStart(pSender)) {
goto _out;
}
for (int32_t seq = pSndBuf->cursor + 1; seq < pSndBuf->end; ++seq) {
SyncSnapBlock *pBlk = pSndBuf->entries[seq % pSndBuf->size];
@ -338,6 +355,12 @@ int32_t snapshotReSend(SSyncSnapshotSender *pSender) {
pBlk->sendTimeMs = nowMs;
}
if (pSender->seq != SYNC_SNAPSHOT_SEQ_END && pSndBuf->end <= pSndBuf->start) {
if (snapshotSend(pSender) != 0) {
goto _out;
}
}
if (pSender->seq == SYNC_SNAPSHOT_SEQ_END && pSndBuf->end <= pSndBuf->start) {
if (syncSnapSendMsg(pSender, pSender->seq, NULL, 0, 0) != 0) {
goto _out;
@ -361,14 +384,7 @@ int32_t syncNodeStartSnapshot(SSyncNode *pSyncNode, SRaftId *pDestId) {
return 0;
}
int64_t timeNow = taosGetTimestampMs();
if (pSender->waitTime <= 0) {
pSender->waitTime = timeNow + SNAPSHOT_WAIT_MS;
}
if (timeNow < pSender->waitTime) {
sSDebug(pSender, "snapshot sender waitTime not expired yet, ignore");
return 0;
}
taosMsleep(1);
int32_t code = snapshotSenderStart(pSender);
if (code != 0) {
@ -416,6 +432,19 @@ SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, SRaftId from
return pReceiver;
}
static int32_t snapshotReceiverClearInfoData(SSyncSnapshotReceiver *pReceiver) {
if (pReceiver->snapshotParam.data) {
taosMemoryFree(pReceiver->snapshotParam.data);
pReceiver->snapshotParam.data = NULL;
}
if (pReceiver->snapshot.data) {
taosMemoryFree(pReceiver->snapshot.data);
pReceiver->snapshot.data = NULL;
}
return 0;
}
void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver) {
if (pReceiver == NULL) return;
@ -429,22 +458,13 @@ void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver) {
pReceiver->pWriter = NULL;
}
// free data of snapshot info
if (pReceiver->snapshotParam.data) {
taosMemoryFree(pReceiver->snapshotParam.data);
pReceiver->snapshotParam.data = NULL;
}
if (pReceiver->snapshot.data) {
taosMemoryFree(pReceiver->snapshot.data);
pReceiver->snapshot.data = NULL;
}
// free snap buf
if (pReceiver->pRcvBuf) {
syncSnapBufferDestroy(&pReceiver->pRcvBuf);
}
snapshotReceiverClearInfoData(pReceiver);
// free receiver
taosMemoryFree(pReceiver);
}
@ -504,7 +524,9 @@ void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *p
pReceiver->term = pPreMsg->term;
pReceiver->fromId = pPreMsg->srcId;
pReceiver->startTime = pPreMsg->startTime;
ASSERT(pReceiver->startTime);
pReceiver->snapshotParam.start = syncNodeGetSnapBeginIndex(pReceiver->pSyncNode);
pReceiver->snapshotParam.end = -1;
sRInfo(pReceiver, "snapshot receiver start, from dnode:%d.", DID(&pReceiver->fromId));
}
@ -514,19 +536,24 @@ void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver) {
int8_t stopped = !atomic_val_compare_exchange_8(&pReceiver->start, true, false);
if (stopped) return;
if (pReceiver->pWriter != NULL) {
int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, false,
&pReceiver->snapshot);
if (ret != 0) {
sRError(pReceiver, "snapshot receiver stop write failed since %s", terrstr());
taosThreadMutexLock(&pReceiver->pRcvBuf->mutex);
{
if (pReceiver->pWriter != NULL) {
int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter,
false, &pReceiver->snapshot);
if (ret != 0) {
sRError(pReceiver, "snapshot receiver stop write failed since %s", terrstr());
}
pReceiver->pWriter = NULL;
} else {
sRInfo(pReceiver, "snapshot receiver stop, writer is null");
}
pReceiver->pWriter = NULL;
} else {
sRInfo(pReceiver, "snapshot receiver stop, writer is null");
}
syncSnapBufferReset(pReceiver->pRcvBuf);
syncSnapBufferReset(pReceiver->pRcvBuf);
snapshotReceiverClearInfoData(pReceiver);
}
taosThreadMutexUnlock(&pReceiver->pRcvBuf->mutex);
}
static int32_t snapshotReceiverFinish(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg) {
@ -656,7 +683,10 @@ static int32_t syncSnapReceiverExchgSnapInfo(SSyncNode *pSyncNode, SSyncSnapshot
memcpy(pInfo->data, pMsg->data, pMsg->dataLen);
// exchange snap info
pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, pInfo);
if (pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, pInfo) != 0) {
sRError(pReceiver, "failed to get snapshot info. type: %d", pMsg->payloadType);
goto _out;
}
SSyncTLV *datHead = pInfo->data;
if (datHead->typ != TDMT_SYNC_PREP_SNAPSHOT_REPLY) {
sRError(pReceiver, "unexpected data typ in data of snapshot info. typ: %d", datHead->typ);

View File

@ -654,6 +654,23 @@ _err:
return -1;
}
int64_t walGetVerRetention(SWal* pWal, int64_t bytes) {
int64_t ver = -1;
int64_t totSize = 0;
taosThreadMutexLock(&pWal->mutex);
int32_t fileIdx = taosArrayGetSize(pWal->fileInfoSet);
while (--fileIdx) {
SWalFileInfo* pInfo = taosArrayGet(pWal->fileInfoSet, fileIdx);
if (totSize >= bytes) {
ver = pInfo->lastVer;
break;
}
totSize += pInfo->fileSize;
}
taosThreadMutexUnlock(&pWal->mutex);
return ver + 1;
}
int walCheckAndRepairIdx(SWal* pWal) {
int32_t sz = taosArrayGetSize(pWal->fileInfoSet);
int32_t fileIdx = sz;

View File

@ -1097,7 +1097,8 @@ int32_t patternMatch(const char *pattern, size_t psize, const char *str, size_t
c1 = str[j++];
++nMatchChar;
if (c == '\\' && pattern[i] == '_' && c1 == '_') {
if (c == '\\' && pattern[i] == c1 &&
(c1 == '_' || c1 == '%')) {
i++;
continue;
}

View File

@ -204,6 +204,11 @@ TEST(utilTest, char_pattern_match_test) {
const char* str12 = NULL;
ret = patternMatch(pattern12, 4, str12, 0, &pInfo);
ASSERT_EQ(ret, TSDB_PATTERN_NOMATCH);
const char* pattern13 = "a\\%c";
const char* str13 = "a%c";
ret = patternMatch(pattern13, 5, str13, strlen(str13), &pInfo);
ASSERT_EQ(ret, TSDB_PATTERN_MATCH);
}
TEST(utilTest, char_pattern_match_no_terminated) {