Merge pull request #23341 from taosdata/enh/refactorBackend

Enh/refactor backend
This commit is contained in:
Haojun Liao 2023-11-30 13:48:18 +08:00 committed by GitHub
commit e9ead3bd28
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
33 changed files with 3320 additions and 1655 deletions

View File

@ -151,6 +151,7 @@ IF(${BUILD_S3})
IF(${BUILD_WITH_S3})
add_definitions(-DUSE_S3)
option(BUILD_WITH_COS "If build with cos" OFF)
ELSE ()

View File

@ -43,6 +43,7 @@ int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t size,
int32_t s3GetObjectsByPrefix(const char *prefix, const char *path);
void s3EvictCache(const char *path, long object_size);
long s3Size(const char *object_name);
int32_t s3GetObjectToFile(const char *object_name, char *fileName);
#ifdef __cplusplus
}

View File

@ -75,6 +75,9 @@ extern int32_t tsElectInterval;
extern int32_t tsHeartbeatInterval;
extern int32_t tsHeartbeatTimeout;
// vnode
extern int64_t tsVndCommitMaxIntervalMs;
// snode
extern int32_t tsRsyncPort;
extern char tsCheckpointBackupDir[];

View File

@ -186,6 +186,7 @@ enum { // WARN: new msg should be appended to segment tail
TD_DEF_MSG_TYPE(TDMT_MND_RESUME_STREAM, "resume-stream", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_STREAM_CHECKPOINT_TIMER, "stream-checkpoint-tmr", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_STREAM_BEGIN_CHECKPOINT, "stream-begin-checkpoint", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_STREAM_CHECKPOINT_CANDIDITATE, "stream-checkpoint-remain", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_STREAM_NODECHANGE_CHECK, "stream-nodechange-check", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_TRIM_DB_TIMER, "trim-db-tmr", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_GRANT_NOTIFY, "grant-notify", NULL, NULL)

View File

@ -35,6 +35,7 @@ int32_t streamStateBegin(SStreamState* pState);
int32_t streamStateCommit(SStreamState* pState);
void streamStateDestroy(SStreamState* pState, bool remove);
int32_t streamStateDeleteCheckPoint(SStreamState* pState, TSKEY mark);
int32_t streamStateDelTaskDb(SStreamState* pState);
int32_t streamStateFuncPut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen);
int32_t streamStateFuncGet(SStreamState* pState, const SWinKey* key, void** ppVal, int32_t* pVLen);

View File

@ -59,6 +59,8 @@ typedef struct SStreamQueue SStreamQueue;
typedef struct SStreamTaskSM SStreamTaskSM;
#define SSTREAM_TASK_VER 2
#define SSTREAM_TASK_INCOMPATIBLE_VER 1
#define SSTREAM_TASK_NEED_CONVERT_VER 2
enum {
STREAM_STATUS__NORMAL = 0,
@ -110,6 +112,7 @@ typedef enum {
TASK_LEVEL__SOURCE = 1,
TASK_LEVEL__AGG,
TASK_LEVEL__SINK,
TASK_LEVEL_SMA,
} ETASK_LEVEL;
enum {
@ -304,11 +307,16 @@ typedef struct SStreamTaskId {
typedef struct SCheckpointInfo {
int64_t startTs;
int64_t checkpointId;
int64_t checkpointVer; // latest checkpointId version
int64_t processedVer; // already processed ver, that has generated results version.
int64_t processedVer;
int64_t nextProcessVer; // current offset in WAL, not serialize it
int64_t failedId; // record the latest failed checkpoint id
int64_t checkpointingId;
int32_t downstreamAlignNum;
int32_t checkpointNotReadyTasks;
bool dispatchCheckpointTrigger;
int64_t msgVer;
} SCheckpointInfo;
typedef struct SStreamStatus {
@ -447,12 +455,11 @@ struct SStreamTask {
int64_t checkReqId;
SArray* checkReqIds; // shuffle
int32_t refCnt;
int64_t checkpointingId;
int32_t checkpointAlignCnt;
int32_t checkpointNotReadyTasks;
int32_t transferStateAlignCnt;
struct SStreamMeta* pMeta;
SSHashObj* pNameMap;
void* pBackend;
int64_t backendRefId;
char reserve[256];
};
@ -490,20 +497,25 @@ typedef struct SStreamMeta {
int32_t walScanCounter;
void* streamBackend;
int64_t streamBackendRid;
SHashObj* pTaskBackendUnique;
SHashObj* pTaskDbUnique;
TdThreadMutex backendMutex;
SMetaHbInfo* pHbInfo;
STaskUpdateInfo updateInfo;
SHashObj* pUpdateTaskSet;
int32_t numOfStreamTasks; // this value should be increased when a new task is added into the meta
int32_t numOfPausedTasks;
int32_t chkptNotReadyTasks;
int64_t rid;
int64_t chkpId;
int32_t chkpCap;
SArray* chkpSaved;
SArray* chkpInUse;
int32_t chkpCap;
SRWLatch chkpDirLock;
void* qHandle;
int32_t pauseTaskNum;
void* bkdChkptMgt;
} SStreamMeta;
int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo);
@ -834,8 +846,10 @@ int32_t streamMetaReopen(SStreamMeta* pMeta);
void streamMetaInitBackend(SStreamMeta* pMeta);
int32_t streamMetaCommit(SStreamMeta* pMeta);
int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta);
int32_t streamMetaReloadAllTasks(SStreamMeta* pMeta);
int64_t streamMetaGetLatestCheckpointId(SStreamMeta* pMeta);
void streamMetaNotifyClose(SStreamMeta* pMeta);
int32_t streamTaskSetDb(SStreamMeta* pMeta, void* pTask, char* key);
void streamMetaStartHb(SStreamMeta* pMeta);
bool streamMetaTaskInTimer(SStreamMeta* pMeta);
int32_t streamMetaUpdateTaskDownstreamStatus(SStreamTask* pTask, int64_t startTs, int64_t endTs, bool succ);
@ -857,6 +871,8 @@ int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHa
int32_t buildCheckpointSourceRsp(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SRpcMsg* pMsg,
int8_t isSucceed);
SStreamTaskSM* streamCreateStateMachine(SStreamTask* pTask);
void* streamDestroyStateMachine(SStreamTaskSM* pSM);
#ifdef __cplusplus
}
#endif

View File

@ -467,6 +467,7 @@ int32_t s3PutObjectFromFile2(const char *file, const char *object) {
// data.infileFD = NULL;
// data.noStatus = noStatus;
// uError("ERROR: %s stat file %s: ", __func__, file);
if (taosStatFile(file, &contentLength, NULL, NULL) < 0) {
uError("ERROR: %s Failed to stat file %s: ", __func__, file);
code = TAOS_SYSTEM_ERROR(errno);
@ -1354,5 +1355,6 @@ int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t size,
void s3EvictCache(const char *path, long object_size) {}
long s3Size(const char *object_name) { return 0; }
int32_t s3GetObjectsByPrefix(const char *prefix, const char *path) { return 0; }
int32_t s3GetObjectToFile(const char *object_name, char *fileName) { return 0; }
#endif

View File

@ -361,9 +361,17 @@ static int32_t taosLoadCfg(SConfig *pCfg, const char **envCmd, const char *input
}
if (taosIsDir(cfgDir)) {
#ifdef CUS_PROMPT
snprintf(cfgFile, sizeof(cfgFile), "%s" "%s" "%s.cfg", cfgDir, tdDirsep, CUS_PROMPT);
snprintf(cfgFile, sizeof(cfgFile),
"%s"
"%s"
"%s.cfg",
cfgDir, tdDirsep, CUS_PROMPT);
#else
snprintf(cfgFile, sizeof(cfgFile), "%s" "%s" "taos.cfg", cfgDir, tdDirsep);
snprintf(cfgFile, sizeof(cfgFile),
"%s"
"%s"
"taos.cfg",
cfgDir, tdDirsep);
#endif
} else {
tstrncpy(cfgFile, cfgDir, sizeof(cfgDir));
@ -729,6 +737,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
if (cfgAddBool(pCfg, "disableStream", tsDisableStream, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) return -1;
if (cfgAddInt64(pCfg, "streamBufferSize", tsStreamBufferSize, 0, INT64_MAX, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0)
return -1;
if (cfgAddInt32(pCfg, "checkpointInterval", tsStreamCheckpointInterval, 60, 1200, CFG_SCOPE_SERVER,
CFG_DYN_ENT_SERVER) != 0)
return -1;

View File

@ -700,6 +700,11 @@ typedef struct {
} SStreamObj;
typedef struct SStreamSeq {
char name[24];
uint64_t seq;
SRWLatch lock;
} SStreamSeq;
int32_t tEncodeSStreamObj(SEncoder* pEncoder, const SStreamObj* pObj);
int32_t tDecodeSStreamObj(SDecoder* pDecoder, SStreamObj* pObj, int32_t sver);
void tFreeStreamObj(SStreamObj* pObj);
@ -738,7 +743,6 @@ int32_t tEncodeSViewObj(SEncoder* pEncoder, const SViewObj* pObj);
int32_t tDecodeSViewObj(SDecoder* pDecoder, SViewObj* pObj, int32_t sver);
void tFreeSViewObj(SViewObj* pObj);
#ifdef __cplusplus
}
#endif

View File

@ -28,8 +28,18 @@ typedef struct SStreamTransInfo {
const char *name;
} SStreamTransInfo;
// time to generated the checkpoint, if now() - checkpointTs >= tsCheckpointInterval, this checkpoint will be discard
// to avoid too many checkpoints for a taskk in the waiting list
typedef struct SCheckpointCandEntry {
char * pName;
int64_t streamId;
int64_t checkpointTs;
int64_t checkpointId;
} SCheckpointCandEntry;
typedef struct SStreamTransMgmt {
SHashObj *pDBTrans;
SHashObj *pWaitingList; // stream id list, of which timed checkpoint failed to be issued due to the trans conflict.
} SStreamTransMgmt;
typedef struct SStreamExecInfo {
@ -52,6 +62,7 @@ int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb);
int32_t mndPersistStream(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
int32_t mndStreamRegisterTrans(STrans *pTrans, const char *pName, const char *pSrcDb, const char *pDstDb);
int32_t mndAddtoCheckpointWaitingList(SStreamObj *pStream, int64_t checkpointId);
bool streamTransConflictOtherTrans(SMnode *pMnode, const char *pSrcDb, const char *pDstDb, bool lock);
// for sma

View File

@ -146,6 +146,15 @@ static void mndStreamCheckpointTick(SMnode *pMnode, int64_t sec) {
}
}
static void mndStreamCheckpointRemain(SMnode* pMnode) {
int32_t contLen = 0;
void *pReq = mndBuildCheckpointTickMsg(&contLen, 0);
if (pReq != NULL) {
SRpcMsg rpcMsg = {.msgType = TDMT_MND_STREAM_CHECKPOINT_CANDIDITATE, .pCont = pReq, .contLen = contLen};
tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg);
}
}
static void mndStreamCheckNode(SMnode* pMnode) {
int32_t contLen = 0;
void *pReq = mndBuildTimerMsg(&contLen);
@ -286,6 +295,10 @@ static void *mndThreadFp(void *param) {
mndStreamCheckpointTick(pMnode, sec);
}
if (sec % 5 == 0) {
mndStreamCheckpointRemain(pMnode);
}
if (sec % tsStreamNodeCheckInterval == 0) {
mndStreamCheckNode(pMnode);
}

View File

@ -18,14 +18,12 @@
#include "mndDb.h"
#include "mndDnode.h"
#include "mndMnode.h"
#include "mndSnode.h"
#include "mndPrivilege.h"
#include "mndScheduler.h"
#include "mndShow.h"
#include "mndSnode.h"
#include "mndStb.h"
#include "mndTopic.h"
#include "mndTrans.h"
#include "mndUser.h"
#include "mndVgroup.h"
#include "osMemory.h"
#include "parser.h"
@ -65,6 +63,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq);
static int32_t mndProcessDropStreamReq(SRpcMsg *pReq);
static int32_t mndProcessStreamCheckpointTmr(SRpcMsg *pReq);
static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq);
static int32_t mndProcessStreamCheckpointInCandid(SRpcMsg *pReq);
static int32_t mndProcessStreamHb(SRpcMsg *pReq);
static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
static void mndCancelGetNextStream(SMnode *pMnode, void *pIter);
@ -93,10 +92,17 @@ static void saveStreamTasksInfo(SStreamObj *pStream, SStreamExecInfo *pExecNo
static int32_t removeExpirednodeEntryAndTask(SArray *pNodeSnapshot);
static int32_t killActiveCheckpointTrans(SMnode *pMnode, const char *pDbName, size_t len);
static int32_t setNodeEpsetExpiredFlag(const SArray *pNodeList);
static void freeCheckpointCandEntry(void *);
static SSdbRaw *mndStreamActionEncode(SStreamObj *pStream);
static SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw);
SSdbRaw * mndStreamSeqActionEncode(SStreamObj *pStream);
SSdbRow * mndStreamSeqActionDecode(SSdbRaw *pRaw);
static int32_t mndStreamSeqActionInsert(SSdb *pSdb, SStreamSeq *pStream);
static int32_t mndStreamSeqActionDelete(SSdb *pSdb, SStreamSeq *pStream);
static int32_t mndStreamSeqActionUpdate(SSdb *pSdb, SStreamSeq *pOldStream, SStreamSeq *pNewStream);
int32_t mndInitStream(SMnode *pMnode) {
SSdbTable table = {
.sdbType = SDB_STREAM,
@ -107,6 +113,15 @@ int32_t mndInitStream(SMnode *pMnode) {
.updateFp = (SdbUpdateFp)mndStreamActionUpdate,
.deleteFp = (SdbDeleteFp)mndStreamActionDelete,
};
SSdbTable tableSeq = {
.sdbType = SDB_STREAM_SEQ,
.keyType = SDB_KEY_BINARY,
.encodeFp = (SdbEncodeFp)mndStreamSeqActionEncode,
.decodeFp = (SdbDecodeFp)mndStreamSeqActionDecode,
.insertFp = (SdbInsertFp)mndStreamSeqActionInsert,
.updateFp = (SdbUpdateFp)mndStreamSeqActionUpdate,
.deleteFp = (SdbDeleteFp)mndStreamSeqActionDelete,
};
mndSetMsgHandle(pMnode, TDMT_MND_CREATE_STREAM, mndProcessCreateStreamReq);
mndSetMsgHandle(pMnode, TDMT_MND_DROP_STREAM, mndProcessDropStreamReq);
@ -123,6 +138,7 @@ int32_t mndInitStream(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_VND_STREAM_CHECK_POINT_SOURCE_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_MND_STREAM_CHECKPOINT_TIMER, mndProcessStreamCheckpointTmr);
mndSetMsgHandle(pMnode, TDMT_MND_STREAM_BEGIN_CHECKPOINT, mndProcessStreamDoCheckpoint);
mndSetMsgHandle(pMnode, TDMT_MND_STREAM_CHECKPOINT_CANDIDITATE, mndProcessStreamCheckpointInCandid);
mndSetMsgHandle(pMnode, TDMT_MND_STREAM_HEARTBEAT, mndProcessStreamHb);
mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_REPORT_CHECKPOINT, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_MND_STREAM_NODECHANGE_CHECK, mndProcessNodeCheckReq);
@ -141,14 +157,23 @@ int32_t mndInitStream(SMnode *pMnode) {
execInfo.pTaskList = taosArrayInit(4, sizeof(STaskId));
execInfo.pTaskMap = taosHashInit(64, fn, true, HASH_NO_LOCK);
execInfo.transMgmt.pDBTrans = taosHashInit(32, fn, true, HASH_NO_LOCK);
execInfo.transMgmt.pWaitingList = taosHashInit(32, fn, true, HASH_NO_LOCK);
taosHashSetFreeFp(execInfo.transMgmt.pWaitingList, freeCheckpointCandEntry);
return sdbSetTable(pMnode->pSdb, table);
if (sdbSetTable(pMnode->pSdb, table) != 0) {
return -1;
}
if (sdbSetTable(pMnode->pSdb, tableSeq) != 0) {
return -1;
}
return 0;
}
void mndCleanupStream(SMnode *pMnode) {
taosArrayDestroy(execInfo.pTaskList);
taosHashCleanup(execInfo.pTaskMap);
taosHashCleanup(execInfo.transMgmt.pDBTrans);
taosHashCleanup(execInfo.transMgmt.pWaitingList);
taosThreadMutexDestroy(&execInfo.lock);
mDebug("mnd stream exec info cleanup");
}
@ -195,7 +220,8 @@ STREAM_ENCODE_OVER:
return NULL;
}
mTrace("stream:%s, encode to raw:%p, row:%p", pStream->name, pRaw, pStream);
mTrace("stream:%s, encode to raw:%p, row:%p, checkpoint:%" PRId64 "", pStream->name, pRaw, pStream,
pStream->checkpointId);
return pRaw;
}
@ -248,7 +274,8 @@ STREAM_DECODE_OVER:
return NULL;
}
mTrace("stream:%s, decode from raw:%p, row:%p", pStream->name, pRaw, pStream);
mTrace("stream:%s, decode from raw:%p, row:%p, checkpoint:%" PRId64 "", pStream->name, pRaw, pStream,
pStream->checkpointId);
return pRow;
}
@ -274,6 +301,8 @@ static int32_t mndStreamActionUpdate(SSdb *pSdb, SStreamObj *pOldStream, SStream
pOldStream->status = pNewStream->status;
pOldStream->updateTime = pNewStream->updateTime;
pOldStream->checkpointId = pNewStream->checkpointId;
pOldStream->checkpointFreq = pNewStream->checkpointFreq;
taosWUnLockLatch(&pOldStream->lock);
return 0;
@ -308,6 +337,12 @@ static void mndShowStreamStatus(char *dst, SStreamObj *pStream) {
}
}
SSdbRaw *mndStreamSeqActionEncode(SStreamObj *pStream) { return NULL; }
SSdbRow *mndStreamSeqActionDecode(SSdbRaw *pRaw) { return NULL; }
int32_t mndStreamSeqActionInsert(SSdb *pSdb, SStreamSeq *pStream) { return 0; }
int32_t mndStreamSeqActionDelete(SSdb *pSdb, SStreamSeq *pStream) { return 0; }
int32_t mndStreamSeqActionUpdate(SSdb *pSdb, SStreamSeq *pOldStream, SStreamSeq *pNewStream) { return 0; }
static void mndShowStreamTrigger(char *dst, SStreamObj *pStream) {
int8_t trigger = pStream->conf.trigger;
if (trigger == STREAM_TRIGGER_AT_ONCE) {
@ -918,8 +953,11 @@ int64_t mndStreamGenChkpId(SMnode *pMnode) {
if (pIter == NULL) break;
maxChkpId = TMAX(maxChkpId, pStream->checkpointId);
mDebug("stream %p checkpoint %" PRId64 "", pStream, pStream->checkpointId);
sdbRelease(pSdb, pStream);
}
mDebug("generated checkpoint %" PRId64 "", maxChkpId + 1);
return maxChkpId + 1;
}
@ -939,6 +977,22 @@ static int32_t mndProcessStreamCheckpointTmr(SRpcMsg *pReq) {
return 0;
}
static int32_t mndProcessStreamRemainChkptTmr(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
SSdb * pSdb = pMnode->pSdb;
if (sdbGetSize(pSdb, SDB_STREAM) <= 0) {
return 0;
}
SMStreamDoCheckpointMsg *pMsg = rpcMallocCont(sizeof(SMStreamDoCheckpointMsg));
pMsg->checkpointId = 0;
int32_t size = sizeof(SMStreamDoCheckpointMsg);
SRpcMsg rpcMsg = {.msgType = TDMT_MND_STREAM_CHECKPOINT_CANDIDITATE, .pCont = pMsg, .contLen = size};
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
return 0;
}
static int32_t mndBuildStreamCheckpointSourceReq2(void **pBuf, int32_t *pLen, int32_t nodeId, int64_t checkpointId,
int64_t streamId, int32_t taskId) {
SStreamCheckpointSourceReq req = {0};
@ -981,107 +1035,104 @@ static int32_t mndBuildStreamCheckpointSourceReq2(void **pBuf, int32_t *pLen, in
return 0;
}
// static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStream, int64_t checkpointId) {
// int64_t timestampMs = taosGetTimestampMs();
// if (timestampMs - pStream->checkpointFreq < tsStreamCheckpointInterval * 1000) {
// return -1;
// }
// STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, NULL, "stream-checkpoint");
// if (pTrans == NULL) return -1;
// mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetDb);
// if (mndTrancCheckConflict(pMnode, pTrans) != 0) {
// mError("failed to checkpoint of stream name%s, checkpointId: %" PRId64 ", reason:%s", pStream->name,
// checkpointId,
// tstrerror(TSDB_CODE_MND_TRANS_CONFLICT));
// mndTransDrop(pTrans);
// return -1;
// }
// mDebug("start to trigger checkpoint for stream:%s, checkpoint: %" PRId64 "", pStream->name, checkpointId);
// atomic_store_64(&pStream->currentTick, 1);
// taosWLockLatch(&pStream->lock);
// // 1. redo action: broadcast checkpoint source msg for all source vg
// int32_t totLevel = taosArrayGetSize(pStream->tasks);
// for (int32_t i = 0; i < totLevel; i++) {
// SArray *pLevel = taosArrayGetP(pStream->tasks, i);
// SStreamTask *pTask = taosArrayGetP(pLevel, 0);
// if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
// int32_t sz = taosArrayGetSize(pLevel);
// for (int32_t j = 0; j < sz; j++) {
// SStreamTask *pTask = taosArrayGetP(pLevel, j);
// /*A(pTask->info.nodeId > 0);*/
// SVgObj *pVgObj = mndAcquireVgroup(pMnode, pTask->info.nodeId);
// if (pVgObj == NULL) {
// taosWUnLockLatch(&pStream->lock);
// mndTransDrop(pTrans);
// return -1;
// }
static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStream, int64_t checkpointId) {
int32_t code = -1;
int64_t timestampMs = taosGetTimestampMs();
if (timestampMs - pStream->checkpointFreq < tsStreamCheckpointInterval * 1000) {
return -1;
}
// void *buf;
// int32_t tlen;
// if (mndBuildStreamCheckpointSourceReq2(&buf, &tlen, pTask->info.nodeId, checkpointId, pTask->id.streamId,
// pTask->id.taskId) < 0) {
// mndReleaseVgroup(pMnode, pVgObj);
// taosWUnLockLatch(&pStream->lock);
// mndTransDrop(pTrans);
// return -1;
// }
bool conflict = streamTransConflictOtherTrans(pMnode, pStream->sourceDb, pStream->targetDb, true);
if (conflict) {
mndAddtoCheckpointWaitingList(pStream, checkpointId);
mWarn("checkpoint conflict with other trans in %s, ignore the checkpoint for stream:%s %" PRIx64, pStream->sourceDb,
pStream->name, pStream->uid);
return -1;
}
// STransAction action = {0};
// action.epSet = mndGetVgroupEpset(pMnode, pVgObj);
// action.pCont = buf;
// action.contLen = tlen;
// action.msgType = TDMT_VND_STREAM_CHECK_POINT_SOURCE;
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, NULL, MND_STREAM_CHECKPOINT_NAME);
if (pTrans == NULL) {
return -1;
}
// mndReleaseVgroup(pMnode, pVgObj);
mndStreamRegisterTrans(pTrans, MND_STREAM_CHECKPOINT_NAME, pStream->sourceDb, pStream->targetDb);
// if (mndTransAppendRedoAction(pTrans, &action) != 0) {
// taosMemoryFree(buf);
// taosWUnLockLatch(&pStream->lock);
// mndReleaseStream(pMnode, pStream);
// mndTransDrop(pTrans);
// return -1;
// }
// }
// }
// }
// // 2. reset tick
// pStream->checkpointFreq = checkpointId;
// pStream->checkpointId = checkpointId;
// pStream->checkpointFreq = taosGetTimestampMs();
// atomic_store_64(&pStream->currentTick, 0);
// // 3. commit log: stream checkpoint info
// pStream->version = pStream->version + 1;
// taosWUnLockLatch(&pStream->lock);
mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetDb);
if (mndTrancCheckConflict(pMnode, pTrans) != 0) {
mError("failed to checkpoint of stream name%s, checkpointId: %" PRId64 ", reason:%s", pStream->name, checkpointId,
tstrerror(TSDB_CODE_MND_TRANS_CONFLICT));
goto _ERR;
}
// // // code condtion
mDebug("start to trigger checkpoint for stream:%s, checkpoint: %" PRId64 "", pStream->name, checkpointId);
// SSdbRaw *pCommitRaw = mndStreamActionEncode(pStream);
// if (pCommitRaw == NULL) {
// mError("failed to prepare trans rebalance since %s", terrstr());
// goto _ERR;
// }
// if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
// sdbFreeRaw(pCommitRaw);
// mError("failed to prepare trans rebalance since %s", terrstr());
// goto _ERR;
// }
// if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY) != 0) {
// sdbFreeRaw(pCommitRaw);
// mError("failed to prepare trans rebalance since %s", terrstr());
// goto _ERR;
// }
taosWLockLatch(&pStream->lock);
pStream->currentTick = 1;
// if (mndTransPrepare(pMnode, pTrans) != 0) {
// mError("failed to prepare trans rebalance since %s", terrstr());
// goto _ERR;
// }
// mndTransDrop(pTrans);
// return 0;
// _ERR:
// mndTransDrop(pTrans);
// return -1;
// }
// 1. redo action: broadcast checkpoint source msg for all source vg
int32_t totLevel = taosArrayGetSize(pStream->tasks);
for (int32_t i = 0; i < totLevel; i++) {
SArray * pLevel = taosArrayGetP(pStream->tasks, i);
SStreamTask *p = taosArrayGetP(pLevel, 0);
if (p->info.taskLevel == TASK_LEVEL__SOURCE) {
int32_t sz = taosArrayGetSize(pLevel);
for (int32_t j = 0; j < sz; j++) {
SStreamTask *pTask = taosArrayGetP(pLevel, j);
SVgObj *pVgObj = mndAcquireVgroup(pMnode, pTask->info.nodeId);
if (pVgObj == NULL) {
taosWUnLockLatch(&pStream->lock);
goto _ERR;
}
void * buf;
int32_t tlen;
if (mndBuildStreamCheckpointSourceReq2(&buf, &tlen, pTask->info.nodeId, checkpointId, pTask->id.streamId,
pTask->id.taskId) < 0) {
mndReleaseVgroup(pMnode, pVgObj);
taosWUnLockLatch(&pStream->lock);
goto _ERR;
}
STransAction act = {0};
SEpSet epset = mndGetVgroupEpset(pMnode, pVgObj);
mndReleaseVgroup(pMnode, pVgObj);
initTransAction(&act, buf, tlen, TDMT_VND_STREAM_CHECK_POINT_SOURCE, &epset, TSDB_CODE_SYN_PROPOSE_NOT_READY);
if (mndTransAppendRedoAction(pTrans, &act) != 0) {
taosMemoryFree(buf);
taosWUnLockLatch(&pStream->lock);
goto _ERR;
}
}
}
}
// 2. reset tick
pStream->checkpointId = checkpointId;
pStream->checkpointFreq = taosGetTimestampMs();
pStream->currentTick = 0;
// 3. commit log: stream checkpoint info
pStream->version = pStream->version + 1;
taosWUnLockLatch(&pStream->lock);
if ((code = mndPersistTransLog(pStream, pTrans)) != TSDB_CODE_SUCCESS) {
return code;
}
if ((code = mndTransPrepare(pMnode, pTrans)) != TSDB_CODE_SUCCESS) {
mError("failed to prepare trans rebalance since %s", terrstr());
goto _ERR;
}
code = 0;
_ERR:
mndTransDrop(pTrans);
return code;
}
static int32_t mndAddStreamCheckpointToTrans(STrans *pTrans, SStreamObj *pStream, SMnode *pMnode, int64_t chkptId) {
taosWLockLatch(&pStream->lock);
@ -1157,22 +1208,6 @@ static int32_t mndAddStreamCheckpointToTrans(STrans *pTrans, SStreamObj *pStream
return 0;
}
static const char *mndGetStreamDB(SMnode *pMnode) {
SSdb * pSdb = pMnode->pSdb;
SStreamObj *pStream = NULL;
void * pIter = NULL;
pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
if (pIter == NULL) {
return NULL;
}
const char *p = taosStrdup(pStream->sourceDb);
mndReleaseStream(pMnode, pStream);
sdbCancelFetch(pSdb, pIter);
return p;
}
static int32_t initStreamNodeList(SMnode *pMnode) {
if (execInfo.pNodeList == NULL || (taosArrayGetSize(execInfo.pNodeList) == 0)) {
execInfo.pNodeList = taosArrayDestroy(execInfo.pNodeList);
@ -1185,8 +1220,8 @@ static int32_t initStreamNodeList(SMnode* pMnode) {
static bool taskNodeIsUpdated(SMnode *pMnode) {
// check if the node update happens or not
taosThreadMutexLock(&execInfo.lock);
int32_t numOfNodes = initStreamNodeList(pMnode);
int32_t numOfNodes = initStreamNodeList(pMnode);
if (numOfNodes == 0) {
mDebug("stream task node change checking done, no vgroups exist, do nothing");
execInfo.ts = taosGetTimestampSec();
@ -1226,29 +1261,17 @@ static bool taskNodeIsUpdated(SMnode* pMnode) {
return nodeUpdated;
}
static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) {
SMnode * pMnode = pReq->info.node;
SSdb * pSdb = pMnode->pSdb;
void * pIter = NULL;
SStreamObj *pStream = NULL;
int32_t code = 0;
// check if the node update happens or not
bool updated = taskNodeIsUpdated(pMnode);
if (updated) {
mWarn("checkpoint ignore, stream task nodes update detected");
static int32_t mndCheckNodeStatus(SMnode *pMnode) {
bool ready = true;
int64_t ts = taosGetTimestampSec();
if (taskNodeIsUpdated(pMnode)) {
return -1;
}
{ // check if all tasks are in TASK_STATUS__READY status
bool ready = true;
taosThreadMutexLock(&execInfo.lock);
// no streams exists, abort
int32_t numOfTasks = taosArrayGetSize(execInfo.pTaskList);
if (numOfTasks <= 0) {
taosThreadMutexUnlock(&execInfo.lock);
return 0;
if (taosArrayGetSize(execInfo.pNodeList) == 0) {
mDebug("stream task node change checking done, no vgroups exist, do nothing");
execInfo.ts = ts;
}
for (int32_t i = 0; i < taosArrayGetSize(execInfo.pTaskList); ++i) {
@ -1265,58 +1288,77 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) {
break;
}
}
taosThreadMutexUnlock(&execInfo.lock);
if (!ready) {
return 0;
return ready ? 0 : -1;
}
static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) {
SMnode * pMnode = pReq->info.node;
SSdb * pSdb = pMnode->pSdb;
void * pIter = NULL;
SStreamObj *pStream = NULL;
int32_t code = 0;
if ((code = mndCheckNodeStatus(pMnode)) != 0) {
return code;
}
SMStreamDoCheckpointMsg *pMsg = (SMStreamDoCheckpointMsg *)pReq->pCont;
int64_t checkpointId = pMsg->checkpointId;
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, NULL, MND_STREAM_CHECKPOINT_NAME);
if (pTrans == NULL) {
mError("failed to trigger checkpoint, reason: %s", tstrerror(TSDB_CODE_OUT_OF_MEMORY));
return -1;
}
mDebug("start to trigger checkpoint, checkpointId: %" PRId64, checkpointId);
const char *pDb = mndGetStreamDB(pMnode);
mndTransSetDbName(pTrans, pDb, pDb);
mndStreamRegisterTrans(pTrans, MND_STREAM_CHECKPOINT_NAME, pDb, pDb);
taosMemoryFree((void *)pDb);
if (mndTransCheckConflict(pMnode, pTrans) != 0) {
mError("failed to trigger checkpoint, checkpointId: %" PRId64 ", reason:%s", checkpointId,
tstrerror(TSDB_CODE_MND_TRANS_CONFLICT));
mndTransDrop(pTrans);
return -1;
}
while (1) {
pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
if (pIter == NULL) break;
code = mndAddStreamCheckpointToTrans(pTrans, pStream, pMnode, checkpointId);
while ((pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream)) != NULL) {
code = mndProcessStreamCheckpointTrans(pMnode, pStream, pMsg->checkpointId);
sdbRelease(pSdb, pStream);
if (code == -1) {
break;
}
}
if (code == 0) {
if (mndTransPrepare(pMnode, pTrans) != 0) {
mError("failed to prepare trans rebalance since %s", terrstr());
}
return code;
}
mndTransDrop(pTrans);
static int32_t mndProcessStreamCheckpointInCandid(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
void * pIter = NULL;
int32_t code = 0;
// only one trans here
taosThreadMutexLock(&execInfo.lock);
execInfo.activeCheckpoint = checkpointId;
int32_t num = taosHashGetSize(execInfo.transMgmt.pWaitingList);
taosThreadMutexUnlock(&execInfo.lock);
if (num == 0) {
return code;
}
if ((code = mndCheckNodeStatus(pMnode)) != 0) {
return code;
}
SArray *pList = taosArrayInit(4, sizeof(int64_t));
while ((pIter = taosHashIterate(execInfo.transMgmt.pWaitingList, pIter)) != NULL) {
SCheckpointCandEntry *pEntry = pIter;
SStreamObj *ps = mndAcquireStream(pMnode, pEntry->pName);
if (ps == NULL) {
continue;
}
mDebug("start to launch checkpoint for stream:%s %" PRIx64 " in candidate list", pEntry->pName, pEntry->streamId);
code = mndProcessStreamCheckpointTrans(pMnode, ps, pEntry->checkpointId);
mndReleaseStream(pMnode, ps);
if (code == TSDB_CODE_SUCCESS) {
taosArrayPush(pList, &pEntry->streamId);
}
}
for (int32_t i = 0; i < taosArrayGetSize(pList); ++i) {
int64_t *pId = taosArrayGet(pList, i);
taosHashRemove(execInfo.transMgmt.pWaitingList, pId, sizeof(*pId));
}
int32_t remain = taosHashGetSize(execInfo.transMgmt.pWaitingList);
mDebug("%d in candidate list generated checkpoint, remaining:%d", (int32_t)taosArrayGetSize(pList), remain);
taosArrayDestroy(pList);
return code;
}
@ -2155,8 +2197,8 @@ static bool isNodeEpsetChanged(const SEpSet *pPrevEpset, const SEpSet *pCurrent)
// 1. increase the replica does not affect the stream process.
// 2. decreasing the replica may affect the stream task execution in the way that there is one or more running stream
// tasks on the will be removed replica.
// 3. vgroup redistribution is an combination operation of first increase replica and then decrease replica. So we will
// handle it as mentioned in 1 & 2 items.
// 3. vgroup redistribution is an combination operation of first increase replica and then decrease replica. So we
// will handle it as mentioned in 1 & 2 items.
static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeList, const SArray *pNodeList) {
SVgroupChangeInfo info = {
.pUpdateNodeList = taosArrayInit(4, sizeof(SNodeUpdateInfo)),
@ -2184,8 +2226,6 @@ static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pP
epsetAssign(&updateInfo.prevEp, &pPrevEntry->epset);
epsetAssign(&updateInfo.newEp, &pCurrent->epset);
taosArrayPush(info.pUpdateNodeList, &updateInfo);
}
if (pCurrent->nodeId != SNODE_HANDLE) {
SVgObj *pVgroup = mndAcquireVgroup(pMnode, pCurrent->nodeId);
@ -2400,7 +2440,6 @@ static int32_t doRemoveTasks(SStreamExecInfo *pExecNode, STaskId *pRemovedId) {
if (p == NULL) {
return TSDB_CODE_SUCCESS;
}
taosHashRemove(pExecNode->pTaskMap, pRemovedId, sizeof(*pRemovedId));
for (int32_t k = 0; k < taosArrayGetSize(pExecNode->pTaskList); ++k) {
@ -2476,6 +2515,7 @@ int32_t removeExpirednodeEntryAndTask(SArray *pNodeSnapshot) {
return 0;
}
// kill all trans in the dst DB
static void killAllCheckpointTrans(SMnode *pMnode, SVgroupChangeInfo *pChangeInfo) {
void *pIter = NULL;
while ((pIter = taosHashIterate(pChangeInfo->pDBMap, pIter)) != NULL) {
@ -2675,6 +2715,7 @@ int32_t createStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
mError("failed to malloc in reset stream, size:%" PRIzu ", code:%s", sizeof(SVResetStreamTaskReq),
tstrerror(TSDB_CODE_OUT_OF_MEMORY));
taosWUnLockLatch(&pStream->lock);
return terrno;
}
@ -2761,8 +2802,8 @@ static int32_t mndResetStatusFromCheckpoint(SMnode *pMnode, int32_t transId) {
bool conflict = streamTransConflictOtherTrans(pMnode, pStream->sourceDb, pStream->targetDb, false);
if (conflict) {
mError("stream:%s other trans exists in DB:%s & %s failed to start reset-status trans",
pStream->name, pStream->sourceDb, pStream->targetDb);
mError("stream:%s other trans exists in DB:%s & %s failed to start reset-status trans", pStream->name,
pStream->sourceDb, pStream->targetDb);
continue;
}
@ -2773,7 +2814,6 @@ static int32_t mndResetStatusFromCheckpoint(SMnode *pMnode, int32_t transId) {
return code;
}
}
return 0;
}
@ -2890,8 +2930,8 @@ static void updateStageInfo(STaskStatusEntry* pTaskEntry, int64_t stage) {
for (int32_t j = 0; j < numOfNodes; ++j) {
SNodeEntry *pNodeEntry = taosArrayGet(execInfo.pNodeList, j);
if (pNodeEntry->nodeId == pTaskEntry->nodeId) {
mInfo("vgId:%d stage updated from %"PRId64 " to %"PRId64 ", nodeUpdate trigger by s-task:0x%" PRIx64, pTaskEntry->nodeId,
pTaskEntry->stage, stage, pTaskEntry->id.taskId);
mInfo("vgId:%d stage updated from %" PRId64 " to %" PRId64 ", nodeUpdate trigger by s-task:0x%" PRIx64,
pTaskEntry->nodeId, pTaskEntry->stage, stage, pTaskEntry->id.taskId);
pNodeEntry->stageUpdated = true;
pTaskEntry->stage = stage;
@ -2947,9 +2987,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
if (pTaskEntry->stage != p->stage && pTaskEntry->stage != -1) {
updateStageInfo(pTaskEntry, p->stage);
if(pTaskEntry->nodeId == SNODE_HANDLE) {
snodeChanged = true;
}
if (pTaskEntry->nodeId == SNODE_HANDLE) snodeChanged = true;
} else {
// task is idle for more than 50 sec.
if (fabs(pTaskEntry->inputQUsed - p->inputQUsed) <= DBL_EPSILON) {
@ -3026,6 +3064,10 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
return TSDB_CODE_SUCCESS;
}
void freeCheckpointCandEntry(void *param) {
SCheckpointCandEntry *pEntry = param;
taosMemoryFreeClear(pEntry->pName);
}
SStreamObj *mndGetStreamObj(SMnode *pMnode, int64_t streamId) {
void * pIter = NULL;
SSdb * pSdb = pMnode->pSdb;

View File

@ -13,8 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "mndTrans.h"
#include "mndStream.h"
#include "mndTrans.h"
typedef struct SKeyInfo {
void* pKey;
@ -110,5 +110,24 @@ bool streamTransConflictOtherTrans(SMnode* pMnode, const char* pSrcDb, const cha
return false;
}
int32_t mndAddtoCheckpointWaitingList(SStreamObj* pStream, int64_t checkpointId) {
SCheckpointCandEntry* pEntry = taosHashGet(execInfo.transMgmt.pWaitingList, &pStream->uid, sizeof(pStream->uid));
if (pEntry == NULL) {
SCheckpointCandEntry entry = {.streamId = pStream->uid,
.checkpointTs = taosGetTimestampMs(),
.checkpointId = checkpointId,
.pName = taosStrdup(pStream->name)};
taosHashPut(execInfo.transMgmt.pWaitingList, &pStream->uid, sizeof(pStream->uid), &entry, sizeof(entry));
int32_t size = taosHashGetSize(execInfo.transMgmt.pWaitingList);
mDebug("stream:%" PRIx64 " add into waiting list due to conflict, ts:%" PRId64 " , checkpointId: %" PRId64
", total in waitingList:%d",
pStream->uid, entry.checkpointTs, checkpointId, size);
} else {
mDebug("stream:%" PRIx64 " ts:%" PRId64 ", checkpointId:%" PRId64 " already in waiting list, no need to add into",
pStream->uid, pEntry->checkpointTs, checkpointId);
}
return TSDB_CODE_SUCCESS;
}

View File

@ -149,7 +149,8 @@ typedef enum {
SDB_FUNC = 20,
SDB_IDX = 21,
SDB_VIEW = 22,
SDB_MAX = 23
SDB_STREAM_SEQ = 23,
SDB_MAX = 24
} ESdbType;
typedef struct SSdbRaw {

View File

@ -13,8 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "rsync.h"
#include "executor.h"
#include "rsync.h"
#include "sndInt.h"
#include "tqCommon.h"
#include "tuuid.h"
@ -46,6 +46,7 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t nextProcessVer
if (code != TSDB_CODE_SUCCESS) {
return code;
}
pTask->pBackend = NULL;
streamTaskOpenAllUpstreamInput(pTask);

View File

@ -297,6 +297,8 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat
sprintf(pStreamTask->exec.qmsg, "%s", RSMA_EXEC_TASK_FLAG);
pStreamTask->chkInfo.checkpointId = streamMetaGetLatestCheckpointId(pStreamTask->pMeta);
tdRSmaTaskInit(pStreamTask->pMeta, pItem, &pStreamTask->id);
pStreamTask->status.pSM = streamCreateStateMachine(pStreamTask);
pStreamState = streamStateOpen(taskInfDir, pStreamTask, true, -1, -1);
if (!pStreamState) {
terrno = TSDB_CODE_RSMA_STREAM_STATE_OPEN;
@ -651,9 +653,7 @@ static int32_t tdRSmaProcessDelReq(SSma *pSma, int64_t suid, int8_t level, SBatc
((SMsgHead *)pBuf)->vgId = TD_VID(pSma->pVnode);
SRpcMsg delMsg = {.msgType = TDMT_VND_BATCH_DEL,
.pCont = pBuf,
.contLen = len + sizeof(SMsgHead)};
SRpcMsg delMsg = {.msgType = TDMT_VND_BATCH_DEL, .pCont = pBuf, .contLen = len + sizeof(SMsgHead)};
code = tmsgPutToQueue(&pSma->pVnode->msgCb, WRITE_QUEUE, &delMsg);
TSDB_CHECK_CODE(code, lino, _exit);
}
@ -870,8 +870,8 @@ static int32_t tdRsmaPrintSubmitReq(SSma *pSma, SSubmitReq *pReq) {
* @param level
* @return int32_t
*/
static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t msgSize, int64_t version, int32_t inputType, SRSmaInfo *pInfo,
ERsmaExecType type, int8_t level) {
static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t msgSize, int64_t version, int32_t inputType,
SRSmaInfo *pInfo, ERsmaExecType type, int8_t level) {
int32_t idx = level - 1;
void * qTaskInfo = RSMA_INFO_QTASK(pInfo, idx);
SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pInfo, idx);
@ -887,8 +887,9 @@ static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t msgSize,
return TSDB_CODE_FAILED;
}
smaDebug("vgId:%d, execute rsma %" PRIi8 " task for qTaskInfo:%p, suid:%" PRIu64 ", nMsg:%d, submitReqVer:%" PRIi64 ", inputType:%d", SMA_VID(pSma), level,
RSMA_INFO_QTASK(pInfo, idx), pInfo->suid, msgSize, version, inputType);
smaDebug("vgId:%d, execute rsma %" PRIi8 " task for qTaskInfo:%p, suid:%" PRIu64 ", nMsg:%d, submitReqVer:%" PRIi64
", inputType:%d",
SMA_VID(pSma), level, RSMA_INFO_QTASK(pInfo, idx), pInfo->suid, msgSize, version, inputType);
if ((terrno = qSetSMAInput(qTaskInfo, pMsg, msgSize, inputType)) < 0) {
smaError("vgId:%d, rsma %" PRIi8 " qSetStreamInput failed since %s", SMA_VID(pSma), level, tstrerror(terrno));
@ -1282,11 +1283,12 @@ _checkpoint:
SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pRSmaInfo, i);
if (pItem && pItem->pStreamTask) {
SStreamTask *pTask = pItem->pStreamTask;
atomic_store_32(&pTask->pMeta->chkptNotReadyTasks, 1);
pTask->checkpointingId = checkpointId;
pTask->chkInfo.checkpointId = pTask->checkpointingId;
// atomic_store_32(&pTask->pMeta->chkptNotReadyTasks, 1);
pTask->chkInfo.checkpointingId = checkpointId;
pTask->chkInfo.checkpointId = checkpointId; // 1pTask->checkpointingId;
pTask->chkInfo.checkpointVer = pItem->submitReqVer;
pTask->info.triggerParam = pItem->fetchResultVer;
pTask->info.taskLevel = TASK_LEVEL_SMA;
if (!checkpointBuilt) {
// the stream states share one checkpoint

View File

@ -750,21 +750,27 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t nextProcessVer) {
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
SStreamTask* pStateTask = pTask;
SStreamTask task = {0};
STaskId taskId = {.streamId = 0, .taskId = 0};
if (pTask->info.fillHistory) {
task.id.streamId = pTask->streamTaskId.streamId;
task.id.taskId = pTask->streamTaskId.taskId;
task.pMeta = pTask->pMeta;
pStateTask = &task;
taskId.streamId = pTask->id.streamId;
taskId.taskId = pTask->id.taskId;
pTask->id.streamId = pTask->streamTaskId.streamId;
pTask->id.taskId = pTask->streamTaskId.taskId;
}
pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pStateTask, false, -1, -1);
pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false, -1, -1);
if (pTask->pState == NULL) {
tqError("s-task:%s (vgId:%d) failed to open state for task", pTask->id.idStr, vgId);
return -1;
} else {
tqDebug("s-task:%s state:%p", pTask->id.idStr, pTask->pState);
}
if (pTask->info.fillHistory) {
pTask->id.streamId = taskId.streamId;
pTask->id.taskId = taskId.taskId;
}
SReadHandle handle = {
.checkpointId = pTask->chkInfo.checkpointId,
@ -785,15 +791,17 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t nextProcessVer) {
qSetTaskId(pTask->exec.pExecutor, pTask->id.taskId, pTask->id.streamId);
} else if (pTask->info.taskLevel == TASK_LEVEL__AGG) {
SStreamTask* pSateTask = pTask;
SStreamTask task = {0};
// SStreamTask task = {0};
STaskId taskId = {.streamId = 0, .taskId = 0};
if (pTask->info.fillHistory) {
task.id.streamId = pTask->streamTaskId.streamId;
task.id.taskId = pTask->streamTaskId.taskId;
task.pMeta = pTask->pMeta;
pSateTask = &task;
taskId.streamId = pTask->id.streamId;
taskId.taskId = pTask->id.taskId;
pTask->id.streamId = pTask->streamTaskId.streamId;
pTask->id.taskId = pTask->streamTaskId.taskId;
}
pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pSateTask, false, -1, -1);
pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false, -1, -1);
if (pTask->pState == NULL) {
tqError("s-task:%s (vgId:%d) failed to open state for task", pTask->id.idStr, vgId);
return -1;
@ -801,6 +809,11 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t nextProcessVer) {
tqDebug("s-task:%s state:%p", pTask->id.idStr, pTask->pState);
}
if (pTask->info.fillHistory) {
pTask->id.streamId = taskId.streamId;
pTask->id.taskId = taskId.taskId;
}
int32_t numOfVgroups = (int32_t)taosArrayGetSize(pTask->upstreamInfo.pList);
SReadHandle handle = {
.checkpointId = pTask->chkInfo.checkpointId,
@ -1280,12 +1293,11 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
return TSDB_CODE_SUCCESS;
}
// downstream not ready, current the stream tasks are not all ready. Ignore this checkpoint req.
if (pTask->status.downstreamReady != 1) {
pTask->chkInfo.failedId = req.checkpointId; // record the latest failed checkpoint id
pTask->checkpointingId = req.checkpointId;
pTask->chkInfo.checkpointingId = req.checkpointId;
qError("s-task:%s not ready for checkpoint, since downstream not ready, ignore this checkpoint:%" PRId64
tqError("s-task:%s not ready for checkpoint, since downstream not ready, ignore this checkpoint:%" PRId64
", set it failure",
pTask->id.idStr, req.checkpointId);
streamMetaReleaseTask(pMeta, pTask);
@ -1316,10 +1328,10 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
// check if the checkpoint msg already sent or not.
if (status == TASK_STATUS__CK) {
ASSERT(pTask->checkpointingId == req.checkpointId);
ASSERT(pTask->chkInfo.checkpointingId == req.checkpointId);
tqWarn("s-task:%s recv checkpoint-source msg again checkpointId:%" PRId64
" already received, ignore this msg and continue process checkpoint",
pTask->id.idStr, pTask->checkpointingId);
pTask->id.idStr, pTask->chkInfo.checkpointingId);
taosThreadMutexUnlock(&pTask->lock);
streamMetaReleaseTask(pMeta, pTask);
@ -1335,10 +1347,6 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
// set the initial value for generating check point
// set the mgmt epset info according to the checkout source msg from mnode, todo update mgmt epset if needed
if (pMeta->chkptNotReadyTasks == 0) {
pMeta->chkptNotReadyTasks = pMeta->numOfStreamTasks;
}
total = pMeta->numOfStreamTasks;
streamMetaWUnLock(pMeta);

View File

@ -104,8 +104,8 @@ int32_t streamStateSnapRead(SStreamStateReader* pReader, uint8_t** ppData) {
pHdr->type = SNAP_DATA_STREAM_STATE_BACKEND;
pHdr->size = len;
memcpy(pHdr->data, rowData, len);
tqDebug("vgId:%d, vnode stream-state snapshot read data success", TD_VID(pReader->pTq->pVnode));
taosMemoryFree(rowData);
tqDebug("vgId:%d, vnode stream-state snapshot read data success", TD_VID(pReader->pTq->pVnode));
return code;
_err:
@ -139,7 +139,7 @@ int32_t streamStateSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamS
pWriter->sver = sver;
pWriter->ever = ever;
sprintf(tdir, "%s%s%s%s%s", pTq->path, TD_DIRSEP, VNODE_TQ_STREAM, TD_DIRSEP, "received");
sprintf(tdir, "%s%s%s", pTq->path, TD_DIRSEP, VNODE_TQ_STREAM);
taosMkDir(tdir);
SStreamSnapWriter* pSnapWriter = NULL;
@ -167,25 +167,19 @@ int32_t streamStateSnapWriterClose(SStreamStateWriter* pWriter, int8_t rollback)
return code;
}
int32_t streamStateRebuildFromSnap(SStreamStateWriter* pWriter, int64_t chkpId) {
tqDebug("vgId:%d, vnode %s start to rebuild stream-state", TD_VID(pWriter->pTq->pVnode), STREAM_STATE_TRANSFER);
streamMetaWLock(pWriter->pTq->pStreamMeta);
int32_t code = streamMetaReopen(pWriter->pTq->pStreamMeta);
if (code == 0) {
streamMetaInitBackend(pWriter->pTq->pStreamMeta);
code = streamStateLoadTasks(pWriter);
}
streamMetaWUnLock(pWriter->pTq->pStreamMeta);
tqDebug("vgId:%d, vnode %s succ to rebuild stream-state", TD_VID(pWriter->pTq->pVnode), STREAM_STATE_TRANSFER);
taosMemoryFree(pWriter);
return code;
}
int32_t streamStateLoadTasks(SStreamStateWriter* pWriter) { return streamMetaLoadAllTasks(pWriter->pTq->pStreamMeta); }
int32_t streamStateSnapWrite(SStreamStateWriter* pWriter, uint8_t* pData, uint32_t nData) {
tqDebug("vgId:%d, vnode %s snapshot write data", TD_VID(pWriter->pTq->pVnode), STREAM_STATE_TRANSFER);
return streamSnapWrite(pWriter->pWriterImpl, pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr));
}
int32_t streamStateRebuildFromSnap(SStreamStateWriter* pWriter, int64_t chkpId) {
tqDebug("vgId:%d, vnode %s start to rebuild stream-state", TD_VID(pWriter->pTq->pVnode), STREAM_STATE_TRANSFER);
int32_t code = streamStateLoadTasks(pWriter);
tqDebug("vgId:%d, vnode %s succ to rebuild stream-state", TD_VID(pWriter->pTq->pVnode), STREAM_STATE_TRANSFER);
taosMemoryFree(pWriter);
return code;
}
int32_t streamStateLoadTasks(SStreamStateWriter* pWriter) {
return streamMetaReloadAllTasks(pWriter->pTq->pStreamMeta);
}

View File

@ -238,7 +238,6 @@ int32_t streamTaskSnapWrite(SStreamTaskWriter* pWriter, uint8_t* pData, uint32_t
goto _err;
}
tDecoderClear(&decoder);
// tdbTbInsert(TTB *pTb, const void *pKey, int keyLen, const void *pVal, int valLen, TXN *pTxn)
int64_t key[2] = {taskId.streamId, taskId.taskId};
taosWLockLatch(&pTq->pStreamMeta->lock);

View File

@ -1402,11 +1402,13 @@ void streamIntervalReloadState(SOperatorInfo* pOperator) {
void* pBuf = NULL;
int32_t code = pInfo->stateStore.streamStateGetInfo(pInfo->pState, STREAM_INTERVAL_OP_STATE_NAME,
strlen(STREAM_INTERVAL_OP_STATE_NAME), &pBuf, &size);
if (code == 0) {
TSKEY ts = *(TSKEY*)pBuf;
taosMemoryFree(pBuf);
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, ts);
pInfo->stateStore.streamStateReloadInfo(pInfo->pState, ts);
}
}
SOperatorInfo* downstream = pOperator->pDownstream[0];
if (downstream->fpSet.reloadStreamStateFn) {
downstream->fpSet.reloadStreamStateFn(downstream);
@ -1822,9 +1824,9 @@ void removeSessionResults(SStreamAggSupporter* pAggSup, SSHashObj* pHashMap, SAr
}
}
int32_t updateSessionWindowInfo(SStreamAggSupporter* pAggSup, SResultWindowInfo* pWinInfo, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t groupId,
int32_t rows, int32_t start, int64_t gap, SSHashObj* pResultRows, SSHashObj* pStUpdated,
SSHashObj* pStDeleted) {
int32_t updateSessionWindowInfo(SStreamAggSupporter* pAggSup, SResultWindowInfo* pWinInfo, TSKEY* pStartTs,
TSKEY* pEndTs, uint64_t groupId, int32_t rows, int32_t start, int64_t gap,
SSHashObj* pResultRows, SSHashObj* pStUpdated, SSHashObj* pStDeleted) {
for (int32_t i = start; i < rows; ++i) {
if (!isInWindow(pWinInfo, pStartTs[i], gap) && (!pEndTs || !isInWindow(pWinInfo, pEndTs[i], gap))) {
return i - start;
@ -1981,9 +1983,10 @@ static void compactSessionSemiWindow(SOperatorInfo* pOperator, SResultWindowInfo
}
int32_t saveSessionOutputBuf(SStreamAggSupporter* pAggSup, SResultWindowInfo* pWinInfo) {
qDebug("===stream===try save session result skey:%" PRId64 ", ekey:%" PRId64 ".pos%d",
pWinInfo->sessionWin.win.skey, pWinInfo->sessionWin.win.ekey, pWinInfo->pStatePos->needFree);
return pAggSup->stateStore.streamStateSessionPut(pAggSup->pState, &pWinInfo->sessionWin, pWinInfo->pStatePos, pAggSup->resultRowSize);
qDebug("===stream===try save session result skey:%" PRId64 ", ekey:%" PRId64 ".pos%d", pWinInfo->sessionWin.win.skey,
pWinInfo->sessionWin.win.ekey, pWinInfo->pStatePos->needFree);
return pAggSup->stateStore.streamStateSessionPut(pAggSup->pState, &pWinInfo->sessionWin, pWinInfo->pStatePos,
pAggSup->resultRowSize);
}
static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBlock, SSHashObj* pStUpdated,
@ -2045,7 +2048,8 @@ static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSData
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE && pStUpdated) {
code = saveResult(winInfo, pStUpdated);
if (code != TSDB_CODE_SUCCESS) {
qError("%s do stream session aggregate impl, set result error, code %s", GET_TASKID(pTaskInfo), tstrerror(code));
qError("%s do stream session aggregate impl, set result error, code %s", GET_TASKID(pTaskInfo),
tstrerror(code));
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
}
}
@ -2716,7 +2720,8 @@ void resetWinRange(STimeWindow* winRange) {
void getSessionWindowInfoByKey(SStreamAggSupporter* pAggSup, SSessionKey* pKey, SResultWindowInfo* pWinInfo) {
int32_t rowSize = pAggSup->resultRowSize;
int32_t code = pAggSup->stateStore.streamStateSessionGet(pAggSup->pState, pKey, (void**)&pWinInfo->pStatePos, &rowSize);
int32_t code =
pAggSup->stateStore.streamStateSessionGet(pAggSup->pState, pKey, (void**)&pWinInfo->pStatePos, &rowSize);
if (code == TSDB_CODE_SUCCESS) {
pWinInfo->sessionWin = *pKey;
pWinInfo->isOutput = true;
@ -2924,7 +2929,8 @@ static void clearStreamSessionOperator(SStreamSessionAggOperatorInfo* pInfo) {
pInfo->streamAggSup.stateStore.streamStateSessionClear(pInfo->streamAggSup.pState);
}
void deleteSessionWinState(SStreamAggSupporter* pAggSup, SSDataBlock* pBlock, SSHashObj* pMapUpdate, SSHashObj* pMapDelete) {
void deleteSessionWinState(SStreamAggSupporter* pAggSup, SSDataBlock* pBlock, SSHashObj* pMapUpdate,
SSHashObj* pMapDelete) {
SArray* pWins = taosArrayInit(16, sizeof(SSessionKey));
doDeleteTimeWindows(pAggSup, pBlock, pWins);
removeSessionResults(pAggSup, pMapUpdate, pWins);
@ -3057,8 +3063,9 @@ SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream
pOperator->operatorType = pPhyNode->type;
if (pPhyNode->type != QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) {
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamSessionSemiAgg, NULL,
destroyStreamSessionAggOperatorInfo, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
pOperator->fpSet =
createOperatorFpSet(optrDummyOpenFn, doStreamSessionSemiAgg, NULL, destroyStreamSessionAggOperatorInfo,
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
setOperatorStreamStateFn(pOperator, streamSessionReleaseState, streamSessionSemiReloadState);
}
setOperatorInfo(pOperator, getStreamOpName(pOperator->operatorType), pPhyNode->type, false, OP_NOT_OPENED, pInfo,
@ -3187,8 +3194,8 @@ void getStateWindowInfoByKey(SStreamAggSupporter* pAggSup, SSessionKey* pKey, SS
pNextWin->winInfo.isOutput = true;
}
pAggSup->stateStore.streamStateFreeCur(pCur);
qDebug("===stream===get state next win buff. skey:%" PRId64 ", endkey:%" PRId64, pNextWin->winInfo.sessionWin.win.skey,
pNextWin->winInfo.sessionWin.win.ekey);
qDebug("===stream===get state next win buff. skey:%" PRId64 ", endkey:%" PRId64,
pNextWin->winInfo.sessionWin.win.skey, pNextWin->winInfo.sessionWin.win.ekey);
}
void setStateOutputBuf(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_t groupId, char* pKeyData,
@ -3257,13 +3264,13 @@ void setStateOutputBuf(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_t groupId,
pNextWin->winInfo.isOutput = true;
}
pAggSup->stateStore.streamStateFreeCur(pCur);
qDebug("===stream===set state next win buff. skey:%" PRId64 ", endkey:%" PRId64, pNextWin->winInfo.sessionWin.win.skey,
pNextWin->winInfo.sessionWin.win.ekey);
qDebug("===stream===set state next win buff. skey:%" PRId64 ", endkey:%" PRId64,
pNextWin->winInfo.sessionWin.win.skey, pNextWin->winInfo.sessionWin.win.ekey);
}
int32_t updateStateWindowInfo(SStreamAggSupporter* pAggSup, SStateWindowInfo* pWinInfo, SStateWindowInfo* pNextWin, TSKEY* pTs, uint64_t groupId,
SColumnInfoData* pKeyCol, int32_t rows, int32_t start, bool* allEqual,
SSHashObj* pResultRows, SSHashObj* pSeUpdated, SSHashObj* pSeDeleted) {
int32_t updateStateWindowInfo(SStreamAggSupporter* pAggSup, SStateWindowInfo* pWinInfo, SStateWindowInfo* pNextWin,
TSKEY* pTs, uint64_t groupId, SColumnInfoData* pKeyCol, int32_t rows, int32_t start,
bool* allEqual, SSHashObj* pResultRows, SSHashObj* pSeUpdated, SSHashObj* pSeDeleted) {
*allEqual = true;
for (int32_t i = start; i < rows; ++i) {
char* pKeyData = colDataGetData(pKeyCol, i);
@ -3475,6 +3482,7 @@ void doStreamStateSaveCheckpoint(SOperatorInfo* pOperator) {
len = doStreamStateEncodeOpState(&pBuf, len, pOperator, true);
pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_STATE_OP_CHECKPOINT_NAME,
strlen(STREAM_STATE_OP_CHECKPOINT_NAME), buf, len);
taosMemoryFree(buf);
}
static SSDataBlock* buildStateResult(SOperatorInfo* pOperator) {
@ -3617,7 +3625,8 @@ static void compactStateWindow(SOperatorInfo* pOperator, SResultWindowInfo* pCur
SExprSupp* pSup = &pOperator->exprSupp;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStreamStateAggOperatorInfo* pInfo = pOperator->info;
compactTimeWindow(pSup, &pInfo->streamAggSup, &pInfo->twAggSup, pTaskInfo, pCurWin, pNextWin, pStUpdated, pStDeleted, false);
compactTimeWindow(pSup, &pInfo->streamAggSup, &pInfo->twAggSup, pTaskInfo, pCurWin, pNextWin, pStUpdated, pStDeleted,
false);
}
void streamStateReloadState(SOperatorInfo* pOperator) {
@ -4010,8 +4019,9 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
setOperatorInfo(pOperator, "StreamIntervalOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL, true, OP_NOT_OPENED,
pInfo, pTaskInfo);
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamIntervalAgg, NULL,
destroyStreamFinalIntervalOperatorInfo, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
pOperator->fpSet =
createOperatorFpSet(optrDummyOpenFn, doStreamIntervalAgg, NULL, destroyStreamFinalIntervalOperatorInfo,
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
setOperatorStreamStateFn(pOperator, streamIntervalReleaseState, streamIntervalReloadState);
pInfo->stateStore = pTaskInfo->storageAPI.stateStore;

View File

@ -17,6 +17,7 @@
#define _STREAM_BACKEDN_ROCKSDB_H_
#include "rocksdb/c.h"
//#include "streamInt.h"
#include "streamState.h"
#include "tcoding.h"
#include "tcommon.h"
@ -42,15 +43,110 @@ typedef struct {
TdThreadMutex cfMutex;
SHashObj* cfInst;
int64_t defaultCfInit;
} SBackendWrapper;
typedef struct {
void* tableOpt;
} RocksdbCfParam;
typedef struct {
rocksdb_t* db;
rocksdb_writeoptions_t* writeOpt;
rocksdb_readoptions_t* readOpt;
rocksdb_options_t* dbOpt;
rocksdb_env_t* env;
rocksdb_cache_t* cache;
rocksdb_column_family_handle_t** pCf;
rocksdb_comparator_t** pCompares;
rocksdb_options_t** pCfOpts;
RocksdbCfParam* pCfParams;
rocksdb_compactionfilterfactory_t* filterFactory;
TdThreadMutex mutex;
char* idstr;
char* path;
int64_t refId;
void* pTask;
int64_t streamId;
int64_t taskId;
int64_t chkpId;
SArray* chkpSaved;
SArray* chkpInUse;
int32_t chkpCap;
TdThreadRwlock chkpDirLock;
int64_t dataWritten;
} STaskDbWrapper;
typedef struct SDbChkp {
int8_t init;
char* pCurrent;
char* pManifest;
SArray* pSST;
int64_t preCkptId;
int64_t curChkpId;
char* path;
char* buf;
int32_t len;
// ping-pong buf
SHashObj* pSstTbl[2];
int8_t idx;
SArray* pAdd;
SArray* pDel;
int8_t update;
TdThreadRwlock rwLock;
} SDbChkp;
typedef struct {
int8_t init;
char* pCurrent;
char* pManifest;
SArray* pSST;
int64_t preCkptId;
int64_t curChkpId;
char* path;
char* buf;
int32_t len;
// ping-pong buf
SHashObj* pSstTbl[2];
int8_t idx;
SArray* pAdd;
SArray* pDel;
int8_t update;
SHashObj* pDbChkpTbl;
TdThreadRwlock rwLock;
} SBkdMgt;
bool streamBackendDataIsExist(const char* path, int64_t chkpId, int32_t vgId);
void* streamBackendInit(const char* path, int64_t chkpId, int32_t vgId);
void streamBackendCleanup(void* arg);
void streamBackendHandleCleanup(void* arg);
int32_t streamBackendLoadCheckpointInfo(void* pMeta);
int32_t streamBackendDoCheckpoint(void* pMeta, uint64_t checkpointId);
int32_t streamBackendDoCheckpoint(void* pMeta, int64_t checkpointId);
SListNode* streamBackendAddCompare(void* backend, void* arg);
void streamBackendDelCompare(void* backend, void* arg);
int32_t streamStateCvtDataFormat(char* path, char* key, void* cfInst);
STaskDbWrapper* taskDbOpen(char* path, char* key, int64_t chkpId);
void taskDbDestroy(void* pBackend, bool flush);
void taskDbDestroy2(void* pBackend);
int32_t taskDbDoCheckpoint(void* arg, int64_t chkpId);
void taskDbUpdateChkpId(void* pTaskDb, int64_t chkpId);
void* taskDbAddRef(void* pTaskDb);
void taskDbRemoveRef(void* pTaskDb);
int streamStateOpenBackend(void* backend, SStreamState* pState);
void streamStateCloseBackend(SStreamState* pState, bool remove);
@ -146,5 +242,20 @@ int32_t streamBackendTriggerChkp(void* pMeta, char* dst);
int32_t streamBackendAddInUseChkp(void* arg, int64_t chkpId);
int32_t streamBackendDelInUseChkp(void* arg, int64_t chkpId);
int32_t taskDbBuildSnap(void* arg, SArray* pSnap);
// int32_t streamDefaultIter_rocksdb(SStreamState* pState, const void* start, const void* end, SArray* result);
// STaskDbWrapper* taskDbOpen(char* path, char* key, int64_t chkpId);
// void taskDbDestroy(void* pDb, bool flush);
int32_t taskDbDoCheckpoint(void* arg, int64_t chkpId);
SBkdMgt* bkdMgtCreate(char* path);
int32_t bkdMgtAddChkp(SBkdMgt* bm, char* task, char* path);
int32_t bkdMgtGetDelta(SBkdMgt* bm, char* taskId, int64_t chkpId, SArray* list, char* name);
int32_t bkdMgtDumpTo(SBkdMgt* bm, char* taskId, char* dname);
void bkdMgtDestroy(SBkdMgt* bm);
int32_t taskDbGenChkpUploadData(void* arg, void* bkdMgt, int64_t chkpId, int8_t type, char** path, SArray* list);
#endif

View File

@ -68,6 +68,12 @@ typedef struct SStreamContinueExecInfo {
SRpcMsg msg;
} SStreamContinueExecInfo;
typedef struct {
int64_t streamId;
int64_t taskId;
int64_t chkpId;
char* dbPrefixPath;
} SStreamTaskSnap;
struct STokenBucket {
int32_t numCapacity; // total capacity, available token per second
int32_t numOfToken; // total available tokens
@ -89,6 +95,7 @@ struct SStreamQueue {
extern SStreamGlobalEnv streamEnv;
extern int32_t streamBackendId;
extern int32_t streamBackendCfWrapperId;
extern int32_t taskDbWrapperId;
void streamRetryDispatchData(SStreamTask* pTask, int64_t waitDuration);
int32_t streamDispatchStreamBlock(SStreamTask* pTask);
@ -106,6 +113,8 @@ int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock)
int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const SStreamRetrieveReq* pReq);
int32_t streamSaveTaskCheckpointInfo(SStreamTask* p, int64_t checkpointId);
int32_t streamTaskBuildCheckpoint(SStreamTask* pTask);
int32_t streamSaveAllTaskStatus(SStreamMeta* pMeta, int64_t checkpointId);
int32_t streamSendCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pReq, int32_t nodeId, SEpSet* pEpSet);
@ -117,7 +126,8 @@ int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t
STaskId streamTaskExtractKey(const SStreamTask* pTask);
void streamTaskInitForLaunchHTask(SHistoryTaskInfo* pInfo);
void streamTaskSetRetryInfoForLaunch(SHistoryTaskInfo* pInfo);
int32_t streamTaskBuildScanhistoryRspMsg(SStreamTask* pTask, SStreamScanHistoryFinishReq* pReq, void** pBuffer, int32_t* pLen);
int32_t streamTaskBuildScanhistoryRspMsg(SStreamTask* pTask, SStreamScanHistoryFinishReq* pReq, void** pBuffer,
int32_t* pLen);
int32_t streamTaskFillHistoryFinished(SStreamTask* pTask);
void streamClearChkptReadyMsg(SStreamTask* pTask);
@ -132,6 +142,17 @@ int32_t streamAddEndScanHistoryMsg(SStreamTask* pTask, SRpcHandleInfo* pRpcInfo,
int32_t streamNotifyUpstreamContinue(SStreamTask* pTask);
int32_t streamTransferStateToStreamTask(SStreamTask* pTask);
// <<<<<<< HEAD
// void streamClearChkptReadyMsg(SStreamTask* pTask);
// int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t numRate, float quotaRate, const
// char*); STaskId streamTaskExtractKey(const SStreamTask* pTask); void streamTaskInitForLaunchHTask(SHistoryTaskInfo*
// pInfo); void streamTaskSetRetryInfoForLaunch(SHistoryTaskInfo* pInfo);
// void streamMetaResetStartInfo(STaskStartInfo* pMeta);
// =======
// >>>>>>> 3.0
SStreamQueue* streamQueueOpen(int64_t cap);
void streamQueueClose(SStreamQueue* pQueue, int32_t taskId);
void streamQueueProcessSuccess(SStreamQueue* queue);
@ -151,10 +172,14 @@ int uploadCheckpoint(char* id, char* path);
int downloadCheckpoint(char* id, char* path);
int deleteCheckpoint(char* id);
int deleteCheckpointFile(char* id, char* name);
int downloadCheckpointByName(char* id, char* fname, char* dstName);
int32_t streamTaskOnNormalTaskReady(SStreamTask* pTask);
int32_t streamTaskOnScanhistoryTaskReady(SStreamTask* pTask);
typedef int32_t (*__stream_async_exec_fn_t)(void* param);
int32_t streamMetaAsyncExec(SStreamMeta* pMeta, __stream_async_exec_fn_t fn, void* param, int32_t* code);
#ifdef __cplusplus
}
#endif

View File

@ -64,8 +64,8 @@ typedef struct SStreamEventInfo {
const char* name;
} SStreamEventInfo;
SStreamTaskSM* streamCreateStateMachine(SStreamTask* pTask);
void* streamDestroyStateMachine(SStreamTaskSM* pSM);
// SStreamTaskSM* streamCreateStateMachine(SStreamTask* pTask);
// void* streamDestroyStateMachine(SStreamTaskSM* pSM);
#ifdef __cplusplus
}
#endif

File diff suppressed because it is too large Load Diff

View File

@ -15,8 +15,16 @@
#include "cos.h"
#include "rsync.h"
#include "streamBackendRocksdb.h"
#include "streamInt.h"
typedef struct {
UPLOAD_TYPE type;
char* taskId;
int64_t chkpId;
SStreamTask* pTask;
} SAsyncUploadArg;
int32_t tEncodeStreamCheckpointSourceReq(SEncoder* pEncoder, const SStreamCheckpointSourceReq* pReq) {
if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
@ -95,12 +103,12 @@ int32_t tDecodeStreamCheckpointReadyMsg(SDecoder* pDecoder, SStreamCheckpointRea
static int32_t streamAlignCheckpoint(SStreamTask* pTask) {
int32_t num = taosArrayGetSize(pTask->upstreamInfo.pList);
int64_t old = atomic_val_compare_exchange_32(&pTask->checkpointAlignCnt, 0, num);
int64_t old = atomic_val_compare_exchange_32(&pTask->chkInfo.downstreamAlignNum, 0, num);
if (old == 0) {
stDebug("s-task:%s set initial align upstream num:%d", pTask->id.idStr, num);
}
return atomic_sub_fetch_32(&pTask->checkpointAlignCnt, 1);
return atomic_sub_fetch_32(&pTask->chkInfo.downstreamAlignNum, 1);
}
static int32_t appendCheckpointIntoInputQ(SStreamTask* pTask, int32_t checkpointType) {
@ -118,7 +126,7 @@ static int32_t appendCheckpointIntoInputQ(SStreamTask* pTask, int32_t checkpoint
}
pBlock->info.type = STREAM_CHECKPOINT;
pBlock->info.version = pTask->checkpointingId;
pBlock->info.version = pTask->chkInfo.checkpointingId;
pBlock->info.rows = 1;
pBlock->info.childId = pTask->info.selfChildId;
@ -141,13 +149,12 @@ int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSo
// 1. set task status to be prepared for check point, no data are allowed to put into inputQ.
streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_GEN_CHECKPOINT);
pTask->checkpointingId = pReq->checkpointId;
pTask->checkpointNotReadyTasks = streamTaskGetNumOfDownstream(pTask);
pTask->chkInfo.checkpointingId = pReq->checkpointId;
pTask->chkInfo.checkpointNotReadyTasks = streamTaskGetNumOfDownstream(pTask);
pTask->chkInfo.startTs = taosGetTimestampMs();
pTask->execInfo.checkpoint += 1;
// 2. Put the checkpoint block into inputQ, to make sure all blocks with less version have been handled by this task
// already.
int32_t code = appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT_TRIGGER);
return code;
}
@ -171,13 +178,12 @@ static int32_t continueDispatchCheckpointBlock(SStreamDataBlock* pBlock, SStream
int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBlock) {
SSDataBlock* pDataBlock = taosArrayGet(pBlock->blocks, 0);
int64_t checkpointId = pDataBlock->info.version;
const char* id = pTask->id.idStr;
int32_t code = TSDB_CODE_SUCCESS;
// set task status
if (streamTaskGetStatus(pTask, NULL) != TASK_STATUS__CK) {
pTask->checkpointingId = checkpointId;
pTask->chkInfo.checkpointingId = checkpointId;
code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_GEN_CHECKPOINT);
if (code != TSDB_CODE_SUCCESS) {
stError("s-task:%s handle checkpoint-trigger block failed, code:%s", id, tstrerror(code));
@ -185,26 +191,15 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc
}
}
{ // todo: remove this when the pipeline checkpoint generating is used.
SStreamMeta* pMeta = pTask->pMeta;
streamMetaWLock(pMeta);
if (pMeta->chkptNotReadyTasks == 0) {
pMeta->chkptNotReadyTasks = pMeta->numOfStreamTasks;
}
streamMetaWUnLock(pMeta);
}
// todo fix race condition: set the status and append checkpoint block
int32_t taskLevel = pTask->info.taskLevel;
if (taskLevel == TASK_LEVEL__SOURCE) {
if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH ||
pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
int8_t type = pTask->outputInfo.type;
if (type == TASK_OUTPUT__FIXED_DISPATCH || type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
stDebug("s-task:%s set childIdx:%d, and add checkpoint-trigger block into outputQ", id, pTask->info.selfChildId);
continueDispatchCheckpointBlock(pBlock, pTask);
} else { // only one task exists, no need to dispatch downstream info
atomic_add_fetch_32(&pTask->checkpointNotReadyTasks, 1);
atomic_add_fetch_32(&pTask->chkInfo.checkpointNotReadyTasks, 1);
streamProcessCheckpointReadyMsg(pTask);
streamFreeQitem((SStreamQueueItem*)pBlock);
}
@ -233,15 +228,13 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc
id, num);
streamFreeQitem((SStreamQueueItem*)pBlock);
streamTaskBuildCheckpoint(pTask);
} else {
stDebug(
"s-task:%s process checkpoint block, all %d upstreams sent checkpoint msgs, dispatch checkpoint msg "
"downstream",
id, num);
} else { // source & agg tasks need to forward the checkpoint msg downwards
stDebug("s-task:%s process checkpoint block, all %d upstreams sent checkpoint msgs, continue forwards msg", id,
num);
// set the needed checked downstream tasks, only when all downstream tasks do checkpoint complete, this task
// can start local checkpoint procedure
pTask->checkpointNotReadyTasks = streamTaskGetNumOfDownstream(pTask);
pTask->chkInfo.checkpointNotReadyTasks = streamTaskGetNumOfDownstream(pTask);
// Put the checkpoint block into inputQ, to make sure all blocks with less version have been handled by this task
// already. And then, dispatch check point msg to all downstream tasks
@ -260,7 +253,7 @@ int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask) {
ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE || pTask->info.taskLevel == TASK_LEVEL__AGG);
// only when all downstream tasks are send checkpoint rsp, we can start the checkpoint procedure for the agg task
int32_t notReady = atomic_sub_fetch_32(&pTask->checkpointNotReadyTasks, 1);
int32_t notReady = atomic_sub_fetch_32(&pTask->chkInfo.checkpointNotReadyTasks, 1);
ASSERT(notReady >= 0);
if (notReady == 0) {
@ -276,11 +269,11 @@ int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask) {
}
void streamTaskClearCheckInfo(SStreamTask* pTask, bool clearChkpReadyMsg) {
pTask->checkpointingId = 0; // clear the checkpoint id
pTask->chkInfo.checkpointingId = 0; // clear the checkpoint id
pTask->chkInfo.failedId = 0;
pTask->chkInfo.startTs = 0; // clear the recorded start time
pTask->checkpointNotReadyTasks = 0;
pTask->checkpointAlignCnt = 0;
pTask->chkInfo.checkpointNotReadyTasks = 0;
// pTask->chkInfo.checkpointAlignCnt = 0;
pTask->chkInfo.dispatchCheckpointTrigger = false;
streamTaskOpenAllUpstreamInput(pTask); // open inputQ for all upstream tasks
if (clearChkpReadyMsg) {
@ -288,88 +281,174 @@ void streamTaskClearCheckInfo(SStreamTask* pTask, bool clearChkpReadyMsg) {
}
}
int32_t streamSaveAllTaskStatus(SStreamMeta* pMeta, int64_t checkpointId) {
int32_t streamSaveTaskCheckpointInfo(SStreamTask* p, int64_t checkpointId) {
SStreamMeta* pMeta = p->pMeta;
int32_t vgId = pMeta->vgId;
const char* id = p->id.idStr;
int32_t code = 0;
streamMetaWLock(pMeta);
for (int32_t i = 0; i < taosArrayGetSize(pMeta->pTaskList); ++i) {
STaskId* pId = taosArrayGet(pMeta->pTaskList, i);
SStreamTask** ppTask = taosHashGet(pMeta->pTasksMap, pId, sizeof(*pId));
if (ppTask == NULL) {
continue;
}
SStreamTask* p = *ppTask;
if (p->info.fillHistory == 1) {
continue;
return code;
}
ASSERT(p->chkInfo.checkpointId <= p->checkpointingId && p->checkpointingId == checkpointId &&
p->chkInfo.checkpointVer <= p->chkInfo.processedVer);
if (p->info.taskLevel > TASK_LEVEL__SINK) {
return code;
}
p->chkInfo.checkpointId = p->checkpointingId;
taosThreadMutexLock(&p->lock);
ASSERT(p->chkInfo.checkpointId <= p->chkInfo.checkpointingId && p->chkInfo.checkpointingId == checkpointId &&
p->chkInfo.checkpointVer <= p->chkInfo.processedVer);
p->chkInfo.checkpointId = p->chkInfo.checkpointingId;
p->chkInfo.checkpointVer = p->chkInfo.processedVer;
streamTaskClearCheckInfo(p, false);
char* str = NULL;
streamTaskGetStatus(p, &str);
code = streamTaskHandleEvent(p->status.pSM, TASK_EVENT_CHECKPOINT_DONE);
taosThreadMutexUnlock(&p->lock);
if (code != TSDB_CODE_SUCCESS) {
stDebug("s-task:%s vgId:%d save task status failed, since handle event failed", p->id.idStr, vgId);
streamMetaWUnLock(pMeta);
stDebug("s-task:%s vgId:%d handle event:checkpoint-done failed", id, vgId);
return -1;
} else { // save the task
streamMetaSaveTask(pMeta, p);
}
stDebug(
"vgId:%d s-task:%s level:%d open upstream inputQ, commit task status after checkpoint completed, "
"checkpointId:%" PRId64 ", Ver(saved):%" PRId64 " currentVer:%" PRId64 ", status to be normal, prev:%s",
pMeta->vgId, p->id.idStr, p->info.taskLevel, checkpointId, p->chkInfo.checkpointVer, p->chkInfo.nextProcessVer,
str);
}
stDebug("vgId:%d s-task:%s level:%d open upstream inputQ, save status after checkpoint, checkpointId:%" PRId64
", Ver(saved):%" PRId64 " currentVer:%" PRId64 ", status: normal, prev:%s",
vgId, id, p->info.taskLevel, checkpointId, p->chkInfo.checkpointVer, p->chkInfo.nextProcessVer, str);
code = streamMetaCommit(pMeta);
if (code < 0) {
stError("vgId:%d failed to commit stream meta after do checkpoint, checkpointId:%" PRId64 ", since %s", pMeta->vgId,
checkpointId, terrstr());
} else {
stInfo("vgId:%d commit stream meta after do checkpoint, checkpointId:%" PRId64 " DONE", pMeta->vgId, checkpointId);
}
// save the task if not sink task
if (p->info.taskLevel != TASK_LEVEL__SINK) {
streamMetaWLock(pMeta);
code = streamMetaSaveTask(pMeta, p);
if (code != TSDB_CODE_SUCCESS) {
streamMetaWUnLock(pMeta);
stError("s-task:%s vgId:%d failed to save task info after do checkpoint, checkpointId:%" PRId64 ", since %s", id,
vgId, checkpointId, terrstr());
return code;
}
int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) {
int32_t code = 0;
code = streamMetaCommit(pMeta);
if (code != TSDB_CODE_SUCCESS) {
stError("s-task:%s vgId:%d failed to commit stream meta after do checkpoint, checkpointId:%" PRId64 ", since %s",
id, vgId, checkpointId, terrstr());
}
// check for all tasks, and do generate the vnode-wide checkpoint data.
SStreamMeta* pMeta = pTask->pMeta;
int32_t remain = atomic_sub_fetch_32(&pMeta->chkptNotReadyTasks, 1);
ASSERT(remain >= 0);
streamMetaWUnLock(pMeta);
}
return code;
}
double el = (taosGetTimestampMs() - pTask->chkInfo.startTs) / 1000.0;
if (remain == 0) { // all tasks are ready
stDebug("s-task:%s all downstreams are ready, ready for do checkpoint", pTask->id.idStr);
streamBackendDoCheckpoint(pMeta, pTask->checkpointingId);
streamSaveAllTaskStatus(pMeta, pTask->checkpointingId);
stInfo(
"vgId:%d vnode wide checkpoint completed, save all tasks status, last:%s, level:%d elapsed time:%.2f Sec "
"checkpointId:%" PRId64,
pMeta->vgId, pTask->id.idStr, pTask->info.taskLevel, el, pTask->checkpointingId);
void streamTaskSetFailedId(SStreamTask* pTask) {
pTask->chkInfo.failedId = pTask->chkInfo.checkpointingId;
pTask->chkInfo.checkpointId = pTask->chkInfo.checkpointingId;
}
int32_t getChkpMeta(char* id, char* path, SArray* list) {
char* file = taosMemoryCalloc(1, strlen(path) + 32);
sprintf(file, "%s%s%s", path, TD_DIRSEP, "META_TMP");
int32_t code = downloadCheckpointByName(id, "META", file);
if (code != 0) {
stDebug("chkp failed to download meta file:%s", file);
taosMemoryFree(file);
return code;
}
TdFilePtr pFile = taosOpenFile(file, TD_FILE_READ);
char buf[128] = {0};
if (taosReadFile(pFile, buf, sizeof(buf)) <= 0) {
stError("chkp failed to read meta file:%s", file);
code = -1;
} else {
stInfo(
"vgId:%d vnode wide tasks not reach checkpoint ready status, ready s-task:%s, level:%d elapsed time:%.2f Sec "
"not ready:%d/%d",
pMeta->vgId, pTask->id.idStr, pTask->info.taskLevel, el, remain, pMeta->numOfStreamTasks);
int32_t len = strlen(buf);
for (int i = 0; i < len; i++) {
if (buf[i] == '\n') {
char* item = taosMemoryCalloc(1, i + 1);
memcpy(item, buf, i);
taosArrayPush(list, &item);
item = taosMemoryCalloc(1, len - i);
memcpy(item, buf + i + 1, len - i - 1);
taosArrayPush(list, &item);
}
}
}
taosCloseFile(&pFile);
taosRemoveFile(file);
taosMemoryFree(file);
return code;
}
int32_t doUploadChkp(void* param) {
SAsyncUploadArg* arg = param;
char* path = NULL;
int32_t code = 0;
SArray* toDelFiles = taosArrayInit(4, sizeof(void*));
if ((code = taskDbGenChkpUploadData(arg->pTask->pBackend, arg->pTask->pMeta->bkdChkptMgt, arg->chkpId,
(int8_t)(arg->type), &path, toDelFiles)) != 0) {
stError("s-task:%s failed to gen upload checkpoint:%" PRId64 "", arg->pTask->id.idStr, arg->chkpId);
}
if (arg->type == UPLOAD_S3) {
if (code == 0 && (code = getChkpMeta(arg->taskId, path, toDelFiles)) != 0) {
stError("s-task:%s failed to get checkpoint:%" PRId64 " meta", arg->pTask->id.idStr, arg->chkpId);
}
}
if (code == 0 && (code = uploadCheckpoint(arg->taskId, path)) != 0) {
stError("s-task:%s failed to upload checkpoint:%" PRId64, arg->pTask->id.idStr, arg->chkpId);
}
if (code == 0) {
for (int i = 0; i < taosArrayGetSize(toDelFiles); i++) {
char* p = taosArrayGetP(toDelFiles, i);
code = deleteCheckpointFile(arg->taskId, p);
stDebug("s-task:%s try to del file: %s", arg->pTask->id.idStr, p);
if (code != 0) {
break;
}
}
}
taosArrayDestroyP(toDelFiles, taosMemoryFree);
taosRemoveDir(path);
taosMemoryFree(path);
taosMemoryFree(arg->taskId);
taosMemoryFree(arg);
return code;
}
int32_t streamTaskUploadChkp(SStreamTask* pTask, int64_t chkpId, char* taskId) {
// async upload
UPLOAD_TYPE type = getUploadType();
if (type == UPLOAD_DISABLE) {
return 0;
}
SAsyncUploadArg* arg = taosMemoryCalloc(1, sizeof(SAsyncUploadArg));
arg->type = type;
arg->taskId = taosStrdup(taskId);
arg->chkpId = chkpId;
arg->pTask = pTask;
return streamMetaAsyncExec(pTask->pMeta, doUploadChkp, arg, NULL);
}
int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) {
int32_t code = TSDB_CODE_SUCCESS;
int64_t startTs = pTask->chkInfo.startTs;
int64_t ckId = pTask->chkInfo.checkpointingId;
// sink task do not need to save the status, and generated the checkpoint
if (pTask->info.taskLevel != TASK_LEVEL__SINK) {
stDebug("s-task:%s level:%d start gen checkpoint", pTask->id.idStr, pTask->info.taskLevel);
code = streamBackendDoCheckpoint(pTask->pBackend, ckId);
if (code != TSDB_CODE_SUCCESS) {
stError("s-task:%s gen checkpoint:%" PRId64 " failed, code:%s", pTask->id.idStr, ckId, tstrerror(terrno));
}
}
// send check point response to upstream task
if (code == TSDB_CODE_SUCCESS) {
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
code = streamTaskSendCheckpointSourceRsp(pTask);
} else {
@ -379,8 +458,39 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) {
if (code != TSDB_CODE_SUCCESS) {
// todo: let's retry send rsp to upstream/mnode
stError("s-task:%s failed to send checkpoint rsp to upstream, checkpointId:%" PRId64 ", code:%s", pTask->id.idStr,
pTask->checkpointingId, tstrerror(code));
ckId, tstrerror(code));
}
}
// clear the checkpoint info, and commit the newest checkpoint info if all works are done successfully
if (code == TSDB_CODE_SUCCESS) {
code = streamSaveTaskCheckpointInfo(pTask, ckId);
if (code != TSDB_CODE_SUCCESS) {
stError("s-task:%s commit taskInfo failed, checkpoint:%" PRId64 " failed, code:%s", pTask->id.idStr, ckId,
tstrerror(terrno));
} else {
code = streamTaskUploadChkp(pTask, ckId, (char*)pTask->id.idStr);
if (code != 0) {
stError("s-task:%s failed to upload checkpoint:%" PRId64 " failed", pTask->id.idStr, ckId);
}
}
}
if (code != TSDB_CODE_SUCCESS) { // clear the checkpoint info if failed
taosThreadMutexLock(&pTask->lock);
streamTaskClearCheckInfo(pTask, false);
code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_CHECKPOINT_DONE);
taosThreadMutexUnlock(&pTask->lock);
streamTaskSetFailedId(pTask);
stDebug("s-task:%s clear checkpoint flag since gen checkpoint failed, checkpointId:%" PRId64, pTask->id.idStr,
ckId);
}
double el = (taosGetTimestampMs() - startTs) / 1000.0;
stInfo("s-task:%s vgId:%d level:%d, checkpointId:%" PRId64 " ver:%" PRId64 " elapsed time:%.2f Sec, %s ",
pTask->id.idStr, pTask->pMeta->vgId, pTask->info.taskLevel, ckId, pTask->chkInfo.checkpointVer, el,
(code == TSDB_CODE_SUCCESS) ? "succ" : "failed");
return code;
}
@ -390,6 +500,7 @@ static int uploadCheckpointToS3(char* id, char* path) {
if (pDir == NULL) return -1;
TdDirEntryPtr de = NULL;
s3Init();
while ((de = taosReadDir(pDir)) != NULL) {
char* name = taosGetDirEntryName(de);
if (strcmp(name, ".") == 0 || strcmp(name, "..") == 0 || taosDirEntryIsDir(de)) continue;
@ -409,12 +520,24 @@ static int uploadCheckpointToS3(char* id, char* path) {
return -1;
}
stDebug("[s3] upload checkpoint:%s", filename);
// break;
}
taosCloseDir(&pDir);
return 0;
}
static int downloadCheckpointByNameS3(char* id, char* fname, char* dstName) {
int code = 0;
char* buf = taosMemoryCalloc(1, strlen(id) + strlen(dstName) + 4);
sprintf(buf, "%s/%s", id, fname);
if (s3GetObjectToFile(buf, dstName) != 0) {
code = -1;
}
taosMemoryFree(buf);
return code;
}
UPLOAD_TYPE getUploadType() {
if (strlen(tsSnodeAddress) != 0) {
return UPLOAD_RSYNC;
@ -438,6 +561,20 @@ int uploadCheckpoint(char* id, char* path) {
return 0;
}
// fileName: CURRENT
int downloadCheckpointByName(char* id, char* fname, char* dstName) {
if (id == NULL || fname == NULL || strlen(id) == 0 || strlen(fname) == 0 || strlen(fname) >= PATH_MAX) {
stError("uploadCheckpointByName parameters invalid");
return -1;
}
if (strlen(tsSnodeAddress) != 0) {
return 0;
} else if (tsS3StreamEnabled) {
return downloadCheckpointByNameS3(id, fname, dstName);
}
return 0;
}
int downloadCheckpoint(char* id, char* path) {
if (id == NULL || path == NULL || strlen(id) == 0 || strlen(path) == 0 || strlen(path) >= PATH_MAX) {
stError("downloadCheckpoint parameters invalid");

View File

@ -1167,10 +1167,10 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
stError("s-task:%s failed to dispatch checkpoint-trigger msg, checkpointId:%" PRId64
", set the current checkpoint failed, and send rsp to mnode",
id, pTask->checkpointingId);
id, pTask->chkInfo.checkpointingId);
{ // send checkpoint failure msg to mnode directly
pTask->chkInfo.failedId = pTask->checkpointingId; // record the latest failed checkpoint id
pTask->checkpointingId = pTask->checkpointingId;
pTask->chkInfo.failedId = pTask->chkInfo.checkpointingId; // record the latest failed checkpoint id
pTask->chkInfo.checkpointingId = pTask->chkInfo.checkpointingId;
streamTaskSendCheckpointSourceRsp(pTask);
}
} else {

View File

@ -18,6 +18,7 @@
#include "streamInt.h"
#include "tmisce.h"
#include "tref.h"
#include "tsched.h"
#include "tstream.h"
#include "ttimer.h"
#include "wal.h"
@ -27,6 +28,7 @@ static TdThreadOnce streamMetaModuleInit = PTHREAD_ONCE_INIT;
int32_t streamBackendId = 0;
int32_t streamBackendCfWrapperId = 0;
int32_t streamMetaId = 0;
int32_t taskDbWrapperId = 0;
static void metaHbToMnode(void* param, void* tmrId);
static void streamMetaClear(SStreamMeta* pMeta);
@ -55,6 +57,7 @@ int32_t metaRefMgtAdd(int64_t vgId, int64_t* rid);
static void streamMetaEnvInit() {
streamBackendId = taosOpenRef(64, streamBackendCleanup);
streamBackendCfWrapperId = taosOpenRef(64, streamBackendHandleCleanup);
taskDbWrapperId = taosOpenRef(64, taskDbDestroy2);
streamMetaId = taosOpenRef(64, streamMetaCloseImpl);
@ -62,6 +65,7 @@ static void streamMetaEnvInit() {
}
void streamMetaInit() { taosThreadOnce(&streamMetaModuleInit, streamMetaEnvInit); }
void streamMetaCleanup() {
taosCloseRef(streamBackendId);
taosCloseRef(streamBackendCfWrapperId);
@ -106,6 +110,174 @@ int32_t metaRefMgtAdd(int64_t vgId, int64_t* rid) {
return 0;
}
typedef struct {
int64_t chkpId;
char* path;
char* taskId;
SArray* pChkpSave;
SArray* pChkpInUse;
int8_t chkpCap;
void* backend;
} StreamMetaTaskState;
int32_t streamMetaOpenTdb(SStreamMeta* pMeta) {
if (tdbOpen(pMeta->path, 16 * 1024, 1, &pMeta->db, 0) < 0) {
return -1;
// goto _err;
}
if (tdbTbOpen("task.db", STREAM_TASK_KEY_LEN, -1, NULL, pMeta->db, &pMeta->pTaskDb, 0) < 0) {
return -1;
}
if (tdbTbOpen("checkpoint.db", sizeof(int32_t), -1, NULL, pMeta->db, &pMeta->pCheckpointDb, 0) < 0) {
return -1;
}
return 0;
}
//
// impl later
//
enum STREAM_STATE_VER {
STREAM_STATA_NO_COMPATIBLE,
STREAM_STATA_COMPATIBLE,
STREAM_STATA_NEED_CONVERT,
};
int32_t streamMetaCheckBackendCompatible(SStreamMeta* pMeta) {
int8_t ret = STREAM_STATA_COMPATIBLE;
TBC* pCur = NULL;
if (tdbTbcOpen(pMeta->pTaskDb, &pCur, NULL) < 0) {
// no task info, no stream
return ret;
}
void* pKey = NULL;
int32_t kLen = 0;
void* pVal = NULL;
int32_t vLen = 0;
tdbTbcMoveToFirst(pCur);
while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) {
if (pVal == NULL || vLen == 0) {
break;
}
SDecoder decoder;
SCheckpointInfo info;
tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
if (tDecodeStreamTaskChkInfo(&decoder, &info) < 0) {
continue;
}
if (info.msgVer <= SSTREAM_TASK_INCOMPATIBLE_VER) {
ret = STREAM_STATA_NO_COMPATIBLE;
} else if (info.msgVer == SSTREAM_TASK_NEED_CONVERT_VER) {
ret = STREAM_STATA_NEED_CONVERT;
} else if (info.msgVer == SSTREAM_TASK_VER) {
ret = STREAM_STATA_COMPATIBLE;
}
tDecoderClear(&decoder);
break;
}
tdbFree(pKey);
tdbFree(pVal);
tdbTbcClose(pCur);
return ret;
}
int32_t streamMetaCvtDbFormat(SStreamMeta* pMeta) {
int32_t code = 0;
int64_t chkpId = streamMetaGetLatestCheckpointId(pMeta);
bool exist = streamBackendDataIsExist(pMeta->path, chkpId, pMeta->vgId);
if (exist == false) {
return code;
}
SBackendWrapper* pBackend = streamBackendInit(pMeta->path, chkpId, pMeta->vgId);
void* pIter = taosHashIterate(pBackend->cfInst, NULL);
while (pIter) {
void* key = taosHashGetKey(pIter, NULL);
code = streamStateCvtDataFormat(pMeta->path, key, *(void**)pIter);
if (code != 0) {
qError("failed to cvt data");
goto _EXIT;
}
pIter = taosHashIterate(pBackend->cfInst, pIter);
}
_EXIT:
streamBackendCleanup((void*)pBackend);
if (code == 0) {
char* state = taosMemoryCalloc(1, strlen(pMeta->path) + 32);
sprintf(state, "%s%s%s", pMeta->path, TD_DIRSEP, "state");
taosRemoveDir(state);
taosMemoryFree(state);
}
return code;
}
int32_t streamMetaMayCvtDbFormat(SStreamMeta* pMeta) {
int8_t compatible = streamMetaCheckBackendCompatible(pMeta);
if (compatible == STREAM_STATA_COMPATIBLE) {
return 0;
} else if (compatible == STREAM_STATA_NEED_CONVERT) {
qInfo("stream state need covert backend format");
return streamMetaCvtDbFormat(pMeta);
} else if (compatible == STREAM_STATA_NO_COMPATIBLE) {
qError(
"stream read incompatible data, rm %s/vnode/vnode*/tq/stream if taosd cannot start, and rebuild stream "
"manually",
tsDataDir);
return -1;
}
return 0;
}
int32_t streamTaskSetDb(SStreamMeta* pMeta, void* arg, char* key) {
SStreamTask* pTask = arg;
int64_t chkpId = pTask->chkInfo.checkpointId;
taosThreadMutexLock(&pMeta->backendMutex);
void** ppBackend = taosHashGet(pMeta->pTaskDbUnique, key, strlen(key));
if (ppBackend != NULL && *ppBackend != NULL) {
taskDbAddRef(*ppBackend);
STaskDbWrapper* pBackend = *ppBackend;
pTask->backendRefId = pBackend->refId;
pTask->pBackend = pBackend;
taosThreadMutexUnlock(&pMeta->backendMutex);
stDebug("s-task:0x%x set backend %p", pTask->id.taskId, pBackend);
return 0;
}
STaskDbWrapper* pBackend = taskDbOpen(pMeta->path, key, chkpId);
if (pBackend == NULL) {
taosThreadMutexUnlock(&pMeta->backendMutex);
return -1;
}
int64_t tref = taosAddRef(taskDbWrapperId, pBackend);
pTask->backendRefId = tref;
pTask->pBackend = pBackend;
pBackend->refId = tref;
pBackend->pTask = pTask;
taosHashPut(pMeta->pTaskDbUnique, key, strlen(key), &pBackend, sizeof(void*));
taosThreadMutexUnlock(&pMeta->backendMutex);
stDebug("s-task:0x%x set backend %p", pTask->id.taskId, pBackend);
return 0;
}
SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId, int64_t stage) {
int32_t code = -1;
SStreamMeta* pMeta = taosMemoryCalloc(1, sizeof(SStreamMeta));
@ -121,15 +293,11 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
sprintf(tpath, "%s%s%s", path, TD_DIRSEP, "stream");
pMeta->path = tpath;
if (tdbOpen(pMeta->path, 16 * 1024, 1, &pMeta->db, 0) < 0) {
if (streamMetaOpenTdb(pMeta) < 0) {
goto _err;
}
if (tdbTbOpen("task.db", STREAM_TASK_KEY_LEN, -1, NULL, pMeta->db, &pMeta->pTaskDb, 0) < 0) {
goto _err;
}
if (tdbTbOpen("checkpoint.db", sizeof(int32_t), -1, NULL, pMeta->db, &pMeta->pCheckpointDb, 0) < 0) {
if (streamMetaMayCvtDbFormat(pMeta) < 0) {
goto _err;
}
if (streamMetaBegin(pMeta) < 0) {
@ -176,44 +344,38 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
pMeta->stage = stage;
pMeta->role = (vgId == SNODE_HANDLE) ? NODE_ROLE_LEADER : NODE_ROLE_UNINIT;
// send heartbeat every 5sec.
pMeta->rid = taosAddRef(streamMetaId, pMeta);
int64_t* pRid = taosMemoryMalloc(sizeof(int64_t));
*pRid = pMeta->rid;
pMeta->pTaskDbUnique = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
metaRefMgtAdd(pMeta->vgId, pRid);
pMeta->pHbInfo->hbTmr = taosTmrStart(metaHbToMnode, META_HB_CHECK_INTERVAL, pRid, streamEnv.timer);
pMeta->pHbInfo->tickCounter = 0;
pMeta->pHbInfo->stopFlag = 0;
pMeta->pTaskBackendUnique =
taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
pMeta->chkpSaved = taosArrayInit(4, sizeof(int64_t));
pMeta->chkpInUse = taosArrayInit(4, sizeof(int64_t));
pMeta->chkpCap = 2;
taosInitRWLatch(&pMeta->chkpDirLock);
pMeta->chkpId = streamMetaGetLatestCheckpointId(pMeta);
pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId, pMeta->vgId);
while (pMeta->streamBackend == NULL) {
taosMsleep(100);
pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId, vgId);
if (pMeta->streamBackend == NULL) {
stInfo("vgId:%d failed to init stream backend, retry in 100ms", pMeta->vgId);
}
}
pMeta->streamBackendRid = taosAddRef(streamBackendId, pMeta->streamBackend);
code = streamBackendLoadCheckpointInfo(pMeta);
taosInitRWLatch(&pMeta->lock);
taosThreadMutexInit(&pMeta->backendMutex, NULL);
// pMeta->chkpId = streamGetLatestCheckpointId(pMeta);
// pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId);
// while (pMeta->streamBackend == NULL) {
// qError("vgId:%d failed to init stream backend", pMeta->vgId);
// taosMsleep(2 * 1000);
// qInfo("vgId:%d retry to init stream backend", pMeta->vgId);
// pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId);
// if (pMeta->streamBackend == NULL) {
// }
// }
// pMeta->streamBackendRid = taosAddRef(streamBackendId, pMeta->streamBackend);
pMeta->numOfPausedTasks = 0;
pMeta->numOfStreamTasks = 0;
stInfo("vgId:%d open stream meta successfully, latest checkpoint:%" PRId64 ", stage:%" PRId64, vgId, pMeta->chkpId,
stage);
pMeta->rid = taosAddRef(streamMetaId, pMeta);
int64_t* pRid = taosMemoryMalloc(sizeof(int64_t));
memcpy(pRid, &pMeta->rid, sizeof(pMeta->rid));
metaRefMgtAdd(pMeta->vgId, pRid);
pMeta->pHbInfo->hbTmr = taosTmrStart(metaHbToMnode, META_HB_CHECK_INTERVAL, pRid, streamEnv.timer);
pMeta->pHbInfo->tickCounter = 0;
pMeta->pHbInfo->stopFlag = 0;
pMeta->qHandle = taosInitScheduler(32, 1, "stream-chkp", NULL);
pMeta->bkdChkptMgt = bkdMgtCreate(tpath);
return pMeta;
_err:
@ -311,14 +473,13 @@ void streamMetaClear(SStreamMeta* pMeta) {
taosRemoveRef(streamBackendId, pMeta->streamBackendRid);
taosHashClear(pMeta->pTasksMap);
taosHashClear(pMeta->pTaskBackendUnique);
taosHashClear(pMeta->pTaskDbUnique);
taosArrayClear(pMeta->pTaskList);
taosArrayClear(pMeta->chkpSaved);
taosArrayClear(pMeta->chkpInUse);
pMeta->numOfStreamTasks = 0;
pMeta->numOfPausedTasks = 0;
pMeta->chkptNotReadyTasks = 0;
// the willrestart/starting flag can NOT be cleared
taosHashClear(pMeta->startInfo.pReadyTaskSet);
@ -360,7 +521,9 @@ void streamMetaCloseImpl(void* arg) {
taosArrayDestroy(pMeta->chkpInUse);
taosHashCleanup(pMeta->pTasksMap);
taosHashCleanup(pMeta->pTaskBackendUnique);
taosHashCleanup(pMeta->pTaskDbUnique);
taosHashCleanup(pMeta->pUpdateTaskSet);
// taosHashCleanup(pMeta->pTaskBackendUnique);
taosHashCleanup(pMeta->updateInfo.pTasks);
taosHashCleanup(pMeta->startInfo.pReadyTaskSet);
taosHashCleanup(pMeta->startInfo.pFailedTaskSet);
@ -369,6 +532,11 @@ void streamMetaCloseImpl(void* arg) {
taosMemoryFree(pMeta->path);
taosThreadMutexDestroy(&pMeta->backendMutex);
taosCleanUpScheduler(pMeta->qHandle);
taosMemoryFree(pMeta->qHandle);
bkdMgtDestroy(pMeta->bkdChkptMgt);
pMeta->role = NODE_ROLE_UNINIT;
taosMemoryFree(pMeta);
stDebug("end to close stream meta");
@ -661,6 +829,11 @@ static void doClear(void* pKey, void* pVal, TBC* pCur, SArray* pRecycleList) {
taosArrayDestroy(pRecycleList);
}
int32_t streamMetaReloadAllTasks(SStreamMeta* pMeta) {
if (pMeta == NULL) return 0;
return streamMetaLoadAllTasks(pMeta);
}
int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) {
TBC* pCur = NULL;
int32_t vgId = pMeta->vgId;
@ -728,8 +901,6 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) {
} else {
// todo this should replace the existed object put by replay creating stream task msg from mnode
stError("s-task:0x%x already added into table meta by replaying WAL, need check", pTask->id.taskId);
tdbFree(pKey);
tdbFree(pVal);
taosMemoryFree(pTask);
continue;
}
@ -983,9 +1154,9 @@ void metaHbToMnode(void* param, void* tmrId) {
entry.sinkDataSize = SIZE_IN_MiB((*pTask)->execInfo.sink.dataSize);
}
if ((*pTask)->checkpointingId != 0) {
entry.checkpointFailed = ((*pTask)->chkInfo.failedId >= (*pTask)->checkpointingId);
entry.activeCheckpointId = (*pTask)->checkpointingId;
if ((*pTask)->chkInfo.checkpointingId != 0) {
entry.checkpointFailed = ((*pTask)->chkInfo.failedId >= (*pTask)->chkInfo.checkpointingId);
entry.activeCheckpointId = (*pTask)->chkInfo.checkpointingId;
}
if ((*pTask)->exec.pWalReader != NULL) {
@ -1028,7 +1199,9 @@ void metaHbToMnode(void* param, void* tmrId) {
}
tEncoderClear(&encoder);
SRpcMsg msg = {.info.noResp = 1,};
SRpcMsg msg = {
.info.noResp = 1,
};
initRpcMsg(&msg, TDMT_MND_STREAM_HEARTBEAT, buf, tlen);
pMeta->pHbInfo->hbCount += 1;
@ -1070,8 +1243,8 @@ bool streamMetaTaskInTimer(SStreamMeta* pMeta) {
void streamMetaNotifyClose(SStreamMeta* pMeta) {
int32_t vgId = pMeta->vgId;
stDebug("vgId:%d notify all stream tasks that the vnode is closing. isLeader:%d startHb:%" PRId64 ", totalHb:%d", vgId,
(pMeta->role == NODE_ROLE_LEADER), pMeta->pHbInfo->hbStart, pMeta->pHbInfo->hbCount);
stDebug("vgId:%d notify all stream tasks that the vnode is closing. isLeader:%d startHb:%" PRId64 ", totalHb:%d",
vgId, (pMeta->role == NODE_ROLE_LEADER), pMeta->pHbInfo->hbStart, pMeta->pHbInfo->hbCount);
streamMetaWLock(pMeta);
@ -1142,4 +1315,19 @@ void streamMetaWUnLock(SStreamMeta* pMeta) {
stTrace("vgId:%d meta-wunlock", pMeta->vgId);
taosWUnLockLatch(&pMeta->lock);
}
static void execHelper(struct SSchedMsg* pSchedMsg) {
__async_exec_fn_t execFn = (__async_exec_fn_t)pSchedMsg->ahandle;
int32_t code = execFn(pSchedMsg->thandle);
if (code != 0 && pSchedMsg->msg != NULL) {
*(int32_t*)pSchedMsg->msg = code;
}
}
int32_t streamMetaAsyncExec(SStreamMeta* pMeta, __stream_async_exec_fn_t fn, void* param, int32_t* code) {
SSchedMsg schedMsg = {0};
schedMsg.fp = execHelper;
schedMsg.ahandle = fn;
schedMsg.thandle = param;
schedMsg.msg = code;
return taosScheduleTask(pMeta->qHandle, &schedMsg);
}

View File

@ -32,6 +32,7 @@ typedef struct SBackendFileItem {
char* name;
int8_t type;
int64_t size;
int8_t ref;
} SBackendFileItem;
typedef struct SBackendFile {
char* pCurrent;
@ -40,7 +41,28 @@ typedef struct SBackendFile {
SArray* pSst;
char* pCheckpointMeta;
char* path;
} SBanckendFile;
typedef struct SBackendSnapFiles2 {
char* pCurrent;
char* pMainfest;
char* pOptions;
SArray* pSst;
char* pCheckpointMeta;
char* path;
int64_t checkpointId;
int64_t seraial;
int64_t offset;
TdFilePtr fd;
int8_t filetype;
SArray* pFileList;
int32_t currFileIdx;
SStreamTaskSnap snapInfo;
int8_t inited;
} SBackendSnapFile2;
struct SStreamSnapHandle {
void* handle;
SBanckendFile* pBackendFile;
@ -51,12 +73,19 @@ struct SStreamSnapHandle {
int8_t filetype;
SArray* pFileList;
int32_t currFileIdx;
char* metaPath;
SArray* pDbSnapSet;
int32_t currIdx;
int8_t delFlag; // 0 : not del, 1: del
};
struct SStreamSnapBlockHdr {
int8_t type;
int8_t flag;
int64_t index;
// int64_t streamId;
// int64_t taskId;
SStreamTaskSnap snapInfo;
char name[128];
int64_t totalSize;
int64_t size;
@ -82,7 +111,7 @@ const char* ROCKSDB_CURRENT = "CURRENT";
const char* ROCKSDB_CHECKPOINT_META = "CHECKPOINT";
static int64_t kBlockSize = 64 * 1024;
int32_t streamSnapHandleInit(SStreamSnapHandle* handle, char* path, int64_t chkpId, void* pMeta);
int32_t streamSnapHandleInit(SStreamSnapHandle* handle, char* path, void* pMeta);
void streamSnapHandleDestroy(SStreamSnapHandle* handle);
// static void streamBuildFname(char* path, char* file, char* fullname)
@ -106,195 +135,205 @@ int32_t streamGetFileSize(char* path, char* name, int64_t* sz) {
TdFilePtr streamOpenFile(char* path, char* name, int32_t opt) {
char fullname[256] = {0};
STREAM_ROCKSDB_BUILD_FULLNAME(path, name, fullname);
return taosOpenFile(fullname, opt);
}
int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path, int64_t chkpId, void* pMeta) {
// impl later
int len = strlen(path);
char* tdir = taosMemoryCalloc(1, len + 256);
memcpy(tdir, path, len);
int32_t streamTaskDbGetSnapInfo(void* arg, char* path, SArray* pSnap) { return taskDbBuildSnap(arg, pSnap); }
int32_t code = 0;
void snapFileDebugInfo(SBackendSnapFile2* pSnapFile) {
if (qDebugFlag & DEBUG_DEBUG) {
char* buf = taosMemoryCalloc(1, 512);
sprintf(buf + strlen(buf), "[");
int8_t validChkp = 0;
if (chkpId != 0) {
sprintf(tdir, "%s%s%s%s%s%scheckpoint%" PRId64 "", path, TD_DIRSEP, "stream", TD_DIRSEP, "checkpoints", TD_DIRSEP,
chkpId);
if (taosIsDir(tdir)) {
validChkp = 1;
stInfo("%s start to read snap %s", STREAM_STATE_TRANSFER, tdir);
streamBackendAddInUseChkp(pMeta, chkpId);
} else {
stWarn("%s failed to read from %s, reason: dir not exist,retry to default state dir", STREAM_STATE_TRANSFER,
tdir);
if (pSnapFile->pCurrent) sprintf(buf, "current: %s,", pSnapFile->pCurrent);
if (pSnapFile->pMainfest) sprintf(buf + strlen(buf), "MANIFEST: %s,", pSnapFile->pMainfest);
if (pSnapFile->pOptions) sprintf(buf + strlen(buf), "options: %s,", pSnapFile->pOptions);
if (pSnapFile->pSst) {
for (int i = 0; i < taosArrayGetSize(pSnapFile->pSst); i++) {
char* name = taosArrayGetP(pSnapFile->pSst, i);
sprintf(buf + strlen(buf), "%s,", name);
}
}
sprintf(buf + strlen(buf) - 1, "]");
qInfo("%s %" PRId64 "-%" PRId64 " get file list: %s", STREAM_STATE_TRANSFER, pSnapFile->snapInfo.streamId,
pSnapFile->snapInfo.taskId, buf);
taosMemoryFree(buf);
}
}
// no checkpoint specified or not exists invalid checkpoint, do checkpoint at default path and translate it
if (validChkp == 0) {
sprintf(tdir, "%s%s%s%s%s", path, TD_DIRSEP, "stream", TD_DIRSEP, "state");
char* chkpdir = taosMemoryCalloc(1, len + 256);
sprintf(chkpdir, "%s%s%s", tdir, TD_DIRSEP, "tmp");
taosMemoryFree(tdir);
int32_t snapFileGenMeta(SBackendSnapFile2* pSnapFile) {
SBackendFileItem item = {0};
item.ref = 1;
// current
item.name = pSnapFile->pCurrent;
item.type = ROCKSDB_CURRENT_TYPE;
streamGetFileSize(pSnapFile->path, item.name, &item.size);
taosArrayPush(pSnapFile->pFileList, &item);
tdir = chkpdir;
stInfo("%s start to trigger checkpoint on %s", STREAM_STATE_TRANSFER, tdir);
// mainfest
item.name = pSnapFile->pMainfest;
item.type = ROCKSDB_MAINFEST_TYPE;
streamGetFileSize(pSnapFile->path, item.name, &item.size);
taosArrayPush(pSnapFile->pFileList, &item);
code = streamBackendTriggerChkp(pMeta, tdir);
if (code != 0) {
stError("%s failed to trigger chekckpoint at %s", STREAM_STATE_TRANSFER, tdir);
taosMemoryFree(tdir);
return code;
// options
item.name = pSnapFile->pOptions;
item.type = ROCKSDB_OPTIONS_TYPE;
streamGetFileSize(pSnapFile->path, item.name, &item.size);
taosArrayPush(pSnapFile->pFileList, &item);
// sst
for (int i = 0; i < taosArrayGetSize(pSnapFile->pSst); i++) {
char* sst = taosArrayGetP(pSnapFile->pSst, i);
item.name = sst;
item.type = ROCKSDB_SST_TYPE;
streamGetFileSize(pSnapFile->path, item.name, &item.size);
taosArrayPush(pSnapFile->pFileList, &item);
}
pHandle->delFlag = 1;
chkpId = 0;
// meta
item.name = pSnapFile->pCheckpointMeta;
item.type = ROCKSDB_CHECKPOINT_META_TYPE;
if (streamGetFileSize(pSnapFile->path, item.name, &item.size) == 0) {
taosArrayPush(pSnapFile->pFileList, &item);
}
stInfo("%s start to read dir: %s", STREAM_STATE_TRANSFER, tdir);
TdDirPtr pDir = taosOpenDir(tdir);
return 0;
}
int32_t snapFileReadMeta(SBackendSnapFile2* pSnapFile) {
TdDirPtr pDir = taosOpenDir(pSnapFile->path);
if (NULL == pDir) {
stError("%s failed to open %s", STREAM_STATE_TRANSFER, tdir);
goto _err;
qError("%s failed to open %s", STREAM_STATE_TRANSFER, pSnapFile->path);
return -1;
}
SBanckendFile* pFile = taosMemoryCalloc(1, sizeof(SBanckendFile));
pHandle->pBackendFile = pFile;
pHandle->checkpointId = chkpId;
pHandle->seraial = 0;
pFile->path = tdir;
pFile->pSst = taosArrayInit(16, sizeof(void*));
TdDirEntryPtr pDirEntry;
while ((pDirEntry = taosReadDir(pDir)) != NULL) {
char* name = taosGetDirEntryName(pDirEntry);
if (strlen(name) >= strlen(ROCKSDB_CURRENT) && 0 == strncmp(name, ROCKSDB_CURRENT, strlen(ROCKSDB_CURRENT))) {
pFile->pCurrent = taosStrdup(name);
pSnapFile->pCurrent = taosStrdup(name);
continue;
}
if (strlen(name) >= strlen(ROCKSDB_MAINFEST) && 0 == strncmp(name, ROCKSDB_MAINFEST, strlen(ROCKSDB_MAINFEST))) {
pFile->pMainfest = taosStrdup(name);
pSnapFile->pMainfest = taosStrdup(name);
continue;
}
if (strlen(name) >= strlen(ROCKSDB_OPTIONS) && 0 == strncmp(name, ROCKSDB_OPTIONS, strlen(ROCKSDB_OPTIONS))) {
pFile->pOptions = taosStrdup(name);
pSnapFile->pOptions = taosStrdup(name);
continue;
}
if (strlen(name) >= strlen(ROCKSDB_CHECKPOINT_META) &&
0 == strncmp(name, ROCKSDB_CHECKPOINT_META, strlen(ROCKSDB_CHECKPOINT_META))) {
pFile->pCheckpointMeta = taosStrdup(name);
pSnapFile->pCheckpointMeta = taosStrdup(name);
continue;
}
if (strlen(name) >= strlen(ROCKSDB_SST) &&
0 == strncmp(name + strlen(name) - strlen(ROCKSDB_SST), ROCKSDB_SST, strlen(ROCKSDB_SST))) {
char* sst = taosStrdup(name);
taosArrayPush(pFile->pSst, &sst);
taosArrayPush(pSnapFile->pSst, &sst);
}
}
if (qDebugFlag & DEBUG_TRACE) {
char* buf = taosMemoryCalloc(1, 128 + taosArrayGetSize(pFile->pSst) * 64);
sprintf(buf, "[current: %s,", pFile->pCurrent);
sprintf(buf + strlen(buf), "MANIFEST: %s,", pFile->pMainfest);
sprintf(buf + strlen(buf), "options: %s,", pFile->pOptions);
for (int i = 0; i < taosArrayGetSize(pFile->pSst); i++) {
char* name = taosArrayGetP(pFile->pSst, i);
sprintf(buf + strlen(buf), "%s,", name);
}
sprintf(buf + strlen(buf) - 1, "]");
stInfo("%s get file list: %s", STREAM_STATE_TRANSFER, buf);
taosMemoryFree(buf);
}
taosCloseDir(&pDir);
if (pFile->pCurrent == NULL) {
stError("%s failed to open %s, reason: no valid file", STREAM_STATE_TRANSFER, tdir);
code = -1;
tdir = NULL;
goto _err;
}
SArray* list = taosArrayInit(64, sizeof(SBackendFileItem));
SBackendFileItem item;
// current
item.name = pFile->pCurrent;
item.type = ROCKSDB_CURRENT_TYPE;
streamGetFileSize(pFile->path, item.name, &item.size);
taosArrayPush(list, &item);
// mainfest
item.name = pFile->pMainfest;
item.type = ROCKSDB_MAINFEST_TYPE;
streamGetFileSize(pFile->path, item.name, &item.size);
taosArrayPush(list, &item);
// options
item.name = pFile->pOptions;
item.type = ROCKSDB_OPTIONS_TYPE;
streamGetFileSize(pFile->path, item.name, &item.size);
taosArrayPush(list, &item);
// sst
for (int i = 0; i < taosArrayGetSize(pFile->pSst); i++) {
char* sst = taosArrayGetP(pFile->pSst, i);
item.name = sst;
item.type = ROCKSDB_SST_TYPE;
streamGetFileSize(pFile->path, item.name, &item.size);
taosArrayPush(list, &item);
}
// meta
item.name = pFile->pCheckpointMeta;
item.type = ROCKSDB_CHECKPOINT_META_TYPE;
if (streamGetFileSize(pFile->path, item.name, &item.size) == 0) {
taosArrayPush(list, &item);
}
pHandle->pBackendFile = pFile;
pHandle->currFileIdx = 0;
pHandle->pFileList = list;
pHandle->seraial = 0;
pHandle->offset = 0;
pHandle->handle = pMeta;
return 0;
}
int32_t streamBackendSnapInitFile(char* metaPath, SStreamTaskSnap* pSnap, SBackendSnapFile2* pSnapFile) {
int32_t code = -1;
char* path = taosMemoryCalloc(1, strlen(pSnap->dbPrefixPath) + 256);
// char idstr[64] = {0};
sprintf(path, "%s%s%s%s%s%" PRId64 "", pSnap->dbPrefixPath, TD_DIRSEP, "checkpoints", TD_DIRSEP, "checkpoint",
pSnap->chkpId);
if (!taosIsDir(path)) {
goto _ERROR;
}
pSnapFile->pSst = taosArrayInit(16, sizeof(void*));
pSnapFile->pFileList = taosArrayInit(64, sizeof(SBackendFileItem));
pSnapFile->path = path;
pSnapFile->snapInfo = *pSnap;
if ((code = snapFileReadMeta(pSnapFile)) != 0) {
goto _ERROR;
}
if ((code = snapFileGenMeta(pSnapFile)) != 0) {
goto _ERROR;
}
snapFileDebugInfo(pSnapFile);
path = NULL;
code = 0;
_ERROR:
taosMemoryFree(path);
return code;
}
void snapFileDestroy(SBackendSnapFile2* pSnap) {
taosMemoryFree(pSnap->pCheckpointMeta);
taosMemoryFree(pSnap->pCurrent);
taosMemoryFree(pSnap->pMainfest);
taosMemoryFree(pSnap->pOptions);
taosMemoryFree(pSnap->path);
for (int i = 0; i < taosArrayGetSize(pSnap->pSst); i++) {
char* sst = taosArrayGetP(pSnap->pSst, i);
taosMemoryFree(sst);
}
// unite read/write snap file
for (int i = 0; i < taosArrayGetSize(pSnap->pFileList); i++) {
SBackendFileItem* pItem = taosArrayGet(pSnap->pFileList, i);
if (pItem->ref == 0) {
taosMemoryFree(pItem->name);
}
}
taosArrayDestroy(pSnap->pFileList);
taosArrayDestroy(pSnap->pSst);
taosCloseFile(&pSnap->fd);
return;
}
int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path, void* pMeta) {
// impl later
SArray* pSnapSet = taosArrayInit(4, sizeof(SStreamTaskSnap));
int32_t code = streamTaskDbGetSnapInfo(pMeta, path, pSnapSet);
if (code != 0) {
return -1;
}
SArray* pDbSnapSet = taosArrayInit(8, sizeof(SBackendSnapFile2));
for (int i = 0; i < taosArrayGetSize(pSnapSet); i++) {
SStreamTaskSnap* pSnap = taosArrayGet(pSnapSet, i);
SBackendSnapFile2 snapFile = {0};
code = streamBackendSnapInitFile(path, pSnap, &snapFile);
ASSERT(code == 0);
taosArrayPush(pDbSnapSet, &snapFile);
}
for (int i = 0; i < taosArrayGetSize(pSnapSet); i++) {
SStreamTaskSnap* pSnap = taosArrayGet(pSnapSet, i);
taosMemoryFree(pSnap->dbPrefixPath);
}
taosArrayDestroy(pSnapSet);
pHandle->pDbSnapSet = pDbSnapSet;
pHandle->currIdx = 0;
return 0;
_err:
streamSnapHandleDestroy(pHandle);
taosMemoryFreeClear(tdir);
code = -1;
return code;
}
void streamSnapHandleDestroy(SStreamSnapHandle* handle) {
SBanckendFile* pFile = handle->pBackendFile;
if (handle->checkpointId == 0) {
// del tmp dir
if (pFile && taosIsDir(pFile->path)) {
if (handle->delFlag) taosRemoveDir(pFile->path);
if (handle->pDbSnapSet) {
for (int i = 0; i < taosArrayGetSize(handle->pDbSnapSet); i++) {
SBackendSnapFile2* pSnapFile = taosArrayGet(handle->pDbSnapSet, i);
snapFileDebugInfo(pSnapFile);
snapFileDestroy(pSnapFile);
}
} else {
streamBackendDelInUseChkp(handle->handle, handle->checkpointId);
taosArrayDestroy(handle->pDbSnapSet);
}
if (pFile) {
taosMemoryFree(pFile->pCheckpointMeta);
taosMemoryFree(pFile->pCurrent);
taosMemoryFree(pFile->pMainfest);
taosMemoryFree(pFile->pOptions);
taosMemoryFree(pFile->path);
for (int i = 0; i < taosArrayGetSize(pFile->pSst); i++) {
char* sst = taosArrayGetP(pFile->pSst, i);
taosMemoryFree(sst);
}
taosArrayDestroy(pFile->pSst);
taosMemoryFree(pFile);
}
taosArrayDestroy(handle->pFileList);
taosCloseFile(&handle->fd);
taosMemoryFree(handle->metaPath);
return;
}
@ -305,7 +344,7 @@ int32_t streamSnapReaderOpen(void* pMeta, int64_t sver, int64_t chkpId, char* pa
return TSDB_CODE_OUT_OF_MEMORY;
}
if (streamSnapHandleInit(&pReader->handle, (char*)path, chkpId, pMeta) < 0) {
if (streamSnapHandleInit(&pReader->handle, (char*)path, pMeta) < 0) {
taosMemoryFree(pReader);
return -1;
}
@ -321,34 +360,50 @@ int32_t streamSnapReaderClose(SStreamSnapReader* pReader) {
taosMemoryFree(pReader);
return 0;
}
int32_t streamSnapRead(SStreamSnapReader* pReader, uint8_t** ppData, int64_t* size) {
// impl later
int32_t code = 0;
SStreamSnapHandle* pHandle = &pReader->handle;
SBanckendFile* pFile = pHandle->pBackendFile;
int32_t idx = pHandle->currIdx;
SBackendFileItem* item = taosArrayGet(pHandle->pFileList, pHandle->currFileIdx);
SBackendSnapFile2* pSnapFile = taosArrayGet(pHandle->pDbSnapSet, idx);
if (pSnapFile == NULL) {
return 0;
}
SBackendFileItem* item = NULL;
if (pHandle->fd == NULL) {
if (pHandle->currFileIdx >= taosArrayGetSize(pHandle->pFileList)) {
// finish
_NEXT:
if (pSnapFile->fd == NULL) {
if (pSnapFile->currFileIdx >= taosArrayGetSize(pSnapFile->pFileList)) {
if (pHandle->currIdx + 1 < taosArrayGetSize(pHandle->pDbSnapSet)) {
pHandle->currIdx += 1;
pSnapFile = taosArrayGet(pHandle->pDbSnapSet, pHandle->currIdx);
goto _NEXT;
} else {
*ppData = NULL;
*size = 0;
return 0;
} else {
pHandle->fd = streamOpenFile(pFile->path, item->name, TD_FILE_READ);
stDebug("%s open file %s, current offset:%" PRId64 ", size:% " PRId64 ", file no.%d", STREAM_STATE_TRANSFER,
item->name, (int64_t)pHandle->offset, item->size, pHandle->currFileIdx);
}
}
stDebug("%s start to read file %s, current offset:%" PRId64 ", size:%" PRId64 ", file no.%d", STREAM_STATE_TRANSFER,
item->name, (int64_t)pHandle->offset, item->size, pHandle->currFileIdx);
uint8_t* buf = taosMemoryCalloc(1, sizeof(SStreamSnapBlockHdr) + kBlockSize);
if (buf == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
} else {
item = taosArrayGet(pSnapFile->pFileList, pSnapFile->currFileIdx);
pSnapFile->fd = streamOpenFile(pSnapFile->path, item->name, TD_FILE_READ);
stDebug("%s open file %s, current offset:%" PRId64 ", size:% " PRId64 ", file no.%d", STREAM_STATE_TRANSFER,
item->name, (int64_t)pSnapFile->offset, item->size, pSnapFile->currFileIdx);
}
int64_t nread = taosPReadFile(pHandle->fd, buf + sizeof(SStreamSnapBlockHdr), kBlockSize, pHandle->offset);
}
item = taosArrayGet(pSnapFile->pFileList, pSnapFile->currFileIdx);
qDebug("%s start to read file %s, current offset:%" PRId64 ", size:%" PRId64
", file no.%d, total set:%d, current set idx: %d",
STREAM_STATE_TRANSFER, item->name, (int64_t)pSnapFile->offset, item->size, pSnapFile->currFileIdx,
(int)taosArrayGetSize(pHandle->pDbSnapSet), pHandle->currIdx);
uint8_t* buf = taosMemoryCalloc(1, sizeof(SStreamSnapBlockHdr) + kBlockSize);
int64_t nread = taosPReadFile(pSnapFile->fd, buf + sizeof(SStreamSnapBlockHdr), kBlockSize, pSnapFile->offset);
if (nread == -1) {
taosMemoryFree(buf);
code = TAOS_SYSTEM_ERROR(terrno);
@ -358,44 +413,51 @@ int32_t streamSnapRead(SStreamSnapReader* pReader, uint8_t** ppData, int64_t* si
} else if (nread > 0 && nread <= kBlockSize) {
// left bytes less than kBlockSize
stDebug("%s read file %s, current offset:%" PRId64 ",size:% " PRId64 ", file no.%d", STREAM_STATE_TRANSFER,
item->name, (int64_t)pHandle->offset, item->size, pHandle->currFileIdx);
pHandle->offset += nread;
if (pHandle->offset >= item->size || nread < kBlockSize) {
taosCloseFile(&pHandle->fd);
pHandle->offset = 0;
pHandle->currFileIdx += 1;
item->name, (int64_t)pSnapFile->offset, item->size, pSnapFile->currFileIdx);
pSnapFile->offset += nread;
if (pSnapFile->offset >= item->size || nread < kBlockSize) {
taosCloseFile(&pSnapFile->fd);
pSnapFile->offset = 0;
pSnapFile->currFileIdx += 1;
}
} else {
stDebug("%s no data read, close file no.%d, move to next file, open and read", STREAM_STATE_TRANSFER,
pHandle->currFileIdx);
taosCloseFile(&pHandle->fd);
pHandle->offset = 0;
pHandle->currFileIdx += 1;
pSnapFile->currFileIdx);
taosCloseFile(&pSnapFile->fd);
pSnapFile->offset = 0;
pSnapFile->currFileIdx += 1;
if (pHandle->currFileIdx >= taosArrayGetSize(pHandle->pFileList)) {
if (pSnapFile->currFileIdx >= taosArrayGetSize(pSnapFile->pFileList)) {
// finish
if (pHandle->currIdx + 1 < taosArrayGetSize(pHandle->pDbSnapSet)) {
// skip to next snap set
pHandle->currIdx += 1;
pSnapFile = taosArrayGet(pHandle->pDbSnapSet, pHandle->currIdx);
goto _NEXT;
} else {
*ppData = NULL;
*size = 0;
taosMemoryFree(buf);
return 0;
}
item = taosArrayGet(pHandle->pFileList, pHandle->currFileIdx);
pHandle->fd = streamOpenFile(pFile->path, item->name, TD_FILE_READ);
}
item = taosArrayGet(pSnapFile->pFileList, pSnapFile->currFileIdx);
pSnapFile->fd = streamOpenFile(pSnapFile->path, item->name, TD_FILE_READ);
nread = taosPReadFile(pHandle->fd, buf + sizeof(SStreamSnapBlockHdr), kBlockSize, pHandle->offset);
pHandle->offset += nread;
nread = taosPReadFile(pSnapFile->fd, buf + sizeof(SStreamSnapBlockHdr), kBlockSize, pSnapFile->offset);
pSnapFile->offset += nread;
stDebug("%s open file and read file %s, current offset:%" PRId64 ", size:% " PRId64 ", file no.%d",
STREAM_STATE_TRANSFER, item->name, (int64_t)pHandle->offset, item->size, pHandle->currFileIdx);
STREAM_STATE_TRANSFER, item->name, (int64_t)pSnapFile->offset, item->size, pSnapFile->currFileIdx);
}
SStreamSnapBlockHdr* pHdr = (SStreamSnapBlockHdr*)buf;
pHdr->size = nread;
pHdr->type = item->type;
pHdr->totalSize = item->size;
pHdr->snapInfo = pSnapFile->snapInfo;
memcpy(pHdr->name, item->name, strlen(item->name));
pHandle->seraial += nread;
pSnapFile->seraial += nread;
*ppData = buf;
*size = sizeof(SStreamSnapBlockHdr) + nread;
@ -408,101 +470,133 @@ int32_t streamSnapWriterOpen(void* pMeta, int64_t sver, int64_t ever, char* path
if (pWriter == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
SStreamSnapHandle* pHandle = &pWriter->handle;
pHandle->currIdx = 0;
pHandle->metaPath = taosStrdup(path);
pHandle->pDbSnapSet = taosArrayInit(8, sizeof(SBackendSnapFile2));
SBanckendFile* pFile = taosMemoryCalloc(1, sizeof(SBanckendFile));
pFile->path = taosStrdup(path);
SArray* list = taosArrayInit(64, sizeof(SBackendFileItem));
SBackendFileItem item;
item.name = taosStrdup((char*)ROCKSDB_CURRENT);
item.type = ROCKSDB_CURRENT_TYPE;
taosArrayPush(list, &item);
pHandle->pBackendFile = pFile;
pHandle->pFileList = list;
pHandle->currFileIdx = 0;
pHandle->offset = 0;
pHandle->delFlag = 0;
SBackendSnapFile2 snapFile = {0};
taosArrayPush(pHandle->pDbSnapSet, &snapFile);
*ppWriter = pWriter;
return 0;
}
int32_t snapInfoEqual(SStreamTaskSnap* a, SStreamTaskSnap* b) {
if (a->streamId != b->streamId || a->taskId != b->taskId || a->chkpId != b->chkpId) {
return 0;
}
return 1;
}
int32_t streamSnapWriteImpl(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t nData, SBackendSnapFile2* pSnapFile) {
int code = -1;
SStreamSnapBlockHdr* pHdr = (SStreamSnapBlockHdr*)pData;
SStreamSnapHandle* pHandle = &pWriter->handle;
SStreamTaskSnap snapInfo = pHdr->snapInfo;
SStreamTaskSnap* pSnapInfo = &pSnapFile->snapInfo;
SBackendFileItem* pItem = taosArrayGet(pSnapFile->pFileList, pSnapFile->currFileIdx);
if (pSnapFile->fd == 0) {
pSnapFile->fd = streamOpenFile(pSnapFile->path, pItem->name, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND);
if (pSnapFile->fd == NULL) {
code = TAOS_SYSTEM_ERROR(terrno);
stError("%s failed to open file name:%s%s%s, reason:%s", STREAM_STATE_TRANSFER, pHandle->metaPath, TD_DIRSEP,
pHdr->name, tstrerror(code));
}
}
if (strlen(pHdr->name) == strlen(pItem->name) && strcmp(pHdr->name, pItem->name) == 0) {
int64_t bytes = taosPWriteFile(pSnapFile->fd, pHdr->data, pHdr->size, pSnapFile->offset);
if (bytes != pHdr->size) {
code = TAOS_SYSTEM_ERROR(terrno);
stError("%s failed to write snap, file name:%s, reason:%s", STREAM_STATE_TRANSFER, pHdr->name, tstrerror(code));
return code;
} else {
qInfo("succ to write data %s", pItem->name);
}
pSnapFile->offset += bytes;
} else {
taosCloseFile(&pSnapFile->fd);
pSnapFile->offset = 0;
pSnapFile->currFileIdx += 1;
SBackendFileItem item = {0};
item.name = taosStrdup(pHdr->name);
item.type = pHdr->type;
taosArrayPush(pSnapFile->pFileList, &item);
SBackendFileItem* pItem = taosArrayGet(pSnapFile->pFileList, pSnapFile->currFileIdx);
pSnapFile->fd = streamOpenFile(pSnapFile->path, pItem->name, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND);
if (pSnapFile->fd == NULL) {
code = TAOS_SYSTEM_ERROR(terrno);
stError("%s failed to open file name:%s%s%s, reason:%s", STREAM_STATE_TRANSFER, pSnapFile->path, TD_DIRSEP,
pHdr->name, tstrerror(code));
}
taosPWriteFile(pSnapFile->fd, pHdr->data, pHdr->size, pSnapFile->offset);
qInfo("succ to write data %s", pItem->name);
pSnapFile->offset += pHdr->size;
}
code = 0;
_EXIT:
return code;
}
int32_t streamSnapWrite(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t nData) {
int32_t code = 0;
SStreamSnapBlockHdr* pHdr = (SStreamSnapBlockHdr*)pData;
SStreamSnapHandle* pHandle = &pWriter->handle;
SBanckendFile* pFile = pHandle->pBackendFile;
SBackendFileItem* pItem = taosArrayGet(pHandle->pFileList, pHandle->currFileIdx);
SStreamTaskSnap snapInfo = pHdr->snapInfo;
if (pHandle->fd == NULL) {
pHandle->fd = streamOpenFile(pFile->path, pItem->name, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND);
if (pHandle->fd == NULL) {
code = TAOS_SYSTEM_ERROR(terrno);
stError("%s failed to open file name:%s%s%s, reason:%s", STREAM_STATE_TRANSFER, pFile->path, TD_DIRSEP,
pHdr->name, tstrerror(code));
}
SBackendSnapFile2* pDbSnapFile = taosArrayGet(pHandle->pDbSnapSet, pHandle->currIdx);
if (pDbSnapFile->inited == 0) {
char idstr[64] = {0};
sprintf(idstr, "0x%" PRIx64 "-0x%x", snapInfo.streamId, (int32_t)(snapInfo.taskId));
char* path = taosMemoryCalloc(1, strlen(pHandle->metaPath) + 256);
sprintf(path, "%s%s%s%s%s%s%s%" PRId64 "", pHandle->metaPath, TD_DIRSEP, idstr, TD_DIRSEP, "checkpoints", TD_DIRSEP,
"checkpoint", snapInfo.chkpId);
if (!taosIsDir(path)) {
code = taosMulMkDir(path);
qInfo("%s mkdir %s", STREAM_STATE_TRANSFER, path);
ASSERT(code == 0);
}
if (strlen(pHdr->name) == strlen(pItem->name) && strcmp(pHdr->name, pItem->name) == 0) {
int64_t bytes = taosPWriteFile(pHandle->fd, pHdr->data, pHdr->size, pHandle->offset);
if (bytes != pHdr->size) {
code = TAOS_SYSTEM_ERROR(terrno);
stError("%s failed to write snap, file name:%s, reason:%s", STREAM_STATE_TRANSFER, pHdr->name, tstrerror(code));
pDbSnapFile->path = path;
pDbSnapFile->snapInfo = snapInfo;
pDbSnapFile->pFileList = taosArrayInit(64, sizeof(SBackendFileItem));
pDbSnapFile->currFileIdx = 0;
pDbSnapFile->offset = 0;
SBackendFileItem item = {0};
item.name = taosStrdup((char*)ROCKSDB_CURRENT);
item.type = ROCKSDB_CURRENT_TYPE;
taosArrayPush(pDbSnapFile->pFileList, &item);
pDbSnapFile->inited = 1;
return streamSnapWriteImpl(pWriter, pData, nData, pDbSnapFile);
} else {
if (snapInfoEqual(&snapInfo, &pDbSnapFile->snapInfo)) {
return streamSnapWriteImpl(pWriter, pData, nData, pDbSnapFile);
} else {
SBackendSnapFile2 snapFile = {0};
taosArrayPush(pHandle->pDbSnapSet, &snapFile);
pHandle->currIdx += 1;
return streamSnapWrite(pWriter, pData, nData);
}
}
return code;
}
pHandle->offset += bytes;
} else {
taosCloseFile(&pHandle->fd);
pHandle->offset = 0;
pHandle->currFileIdx += 1;
SBackendFileItem item;
item.name = taosStrdup(pHdr->name);
item.type = pHdr->type;
taosArrayPush(pHandle->pFileList, &item);
SBackendFileItem* pItem = taosArrayGet(pHandle->pFileList, pHandle->currFileIdx);
pHandle->fd = streamOpenFile(pFile->path, pItem->name, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND);
if (pHandle->fd == NULL) {
code = TAOS_SYSTEM_ERROR(terrno);
stError("%s failed to open file name:%s%s%s, reason:%s", STREAM_STATE_TRANSFER, pFile->path, TD_DIRSEP,
pHdr->name, tstrerror(code));
}
taosPWriteFile(pHandle->fd, pHdr->data, pHdr->size, pHandle->offset);
pHandle->offset += pHdr->size;
}
// impl later
return 0;
}
int32_t streamSnapWriterClose(SStreamSnapWriter* pWriter, int8_t rollback) {
SStreamSnapHandle* handle = &pWriter->handle;
if (qDebugFlag & DEBUG_TRACE) {
char* buf = (char*)taosMemoryMalloc(128 + taosArrayGetSize(handle->pFileList) * 64);
int n = sprintf(buf, "[");
for (int i = 0; i < taosArrayGetSize(handle->pFileList); i++) {
SBackendFileItem* item = taosArrayGet(handle->pFileList, i);
if (i != taosArrayGetSize(handle->pFileList) - 1) {
n += sprintf(buf + n, "%s %" PRId64 ",", item->name, item->size);
} else {
n += sprintf(buf + n, "%s %" PRId64 "]", item->name, item->size);
}
}
stDebug("%s snap get file list, %s", STREAM_STATE_TRANSFER, buf);
taosMemoryFree(buf);
}
for (int i = 0; i < taosArrayGetSize(handle->pFileList); i++) {
SBackendFileItem* item = taosArrayGet(handle->pFileList, i);
taosMemoryFree(item->name);
}
streamSnapHandleDestroy(handle);
if (pWriter == NULL) return 0;
streamSnapHandleDestroy(&pWriter->handle);
taosMemoryFree(pWriter);
return 0;

View File

@ -106,44 +106,14 @@ SStreamState* streamStateOpen(char* path, void* pTask, bool specPath, int32_t sz
}
SStreamTask* pStreamTask = pTask;
char statePath[1024];
if (!specPath) {
sprintf(statePath, "%s%s%d", path, TD_DIRSEP, pStreamTask->id.taskId);
} else {
memset(statePath, 0, 1024);
tstrncpy(statePath, path, 1024);
}
pState->taskId = pStreamTask->id.taskId;
pState->streamId = pStreamTask->id.streamId;
sprintf(pState->pTdbState->idstr, "0x%" PRIx64 "-%d", pState->streamId, pState->taskId);
sprintf(pState->pTdbState->idstr, "0x%" PRIx64 "-0x%x", pState->streamId, pState->taskId);
streamTaskSetDb(pStreamTask->pMeta, pTask, pState->pTdbState->idstr);
#ifdef USE_ROCKSDB
SStreamMeta* pMeta = pStreamTask->pMeta;
pState->streamBackendRid = pMeta->streamBackendRid;
// streamMetaWLock(pMeta);
taosThreadMutexLock(&pMeta->backendMutex);
void* uniqueId =
taosHashGet(pMeta->pTaskBackendUnique, pState->pTdbState->idstr, strlen(pState->pTdbState->idstr) + 1);
if (uniqueId == NULL) {
int code = streamStateOpenBackend(pMeta->streamBackend, pState);
if (code == -1) {
taosThreadMutexUnlock(&pMeta->backendMutex);
taosMemoryFree(pState);
return NULL;
}
taosHashPut(pMeta->pTaskBackendUnique, pState->pTdbState->idstr, strlen(pState->pTdbState->idstr) + 1,
&pState->pTdbState->backendCfWrapperId, sizeof(pState->pTdbState->backendCfWrapperId));
} else {
int64_t id = *(int64_t*)uniqueId;
pState->pTdbState->backendCfWrapperId = id;
pState->pTdbState->pBackendCfWrapper = taosAcquireRef(streamBackendCfWrapperId, id);
// already exist stream task for
stInfo("already exist stream-state for %s", pState->pTdbState->idstr);
// taosAcquireRef(streamBackendId, pState->streamBackendRid);
}
taosThreadMutexUnlock(&pMeta->backendMutex);
pState->pTdbState->pOwner = pTask;
pState->pFileState = NULL;
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT);
@ -237,6 +207,12 @@ _err:
#endif
}
int32_t streamStateDelTaskDb(SStreamState* pState) {
SStreamTask* pTask = pState->pTdbState->pOwner;
taskDbRemoveRef(pTask->pBackend);
taosMemoryFree(pTask);
return 0;
}
void streamStateClose(SStreamState* pState, bool remove) {
SStreamTask* pTask = pState->pTdbState->pOwner;
#ifdef USE_ROCKSDB
@ -692,8 +668,7 @@ void streamStateResetCur(SStreamStateCur* pCur) {
}
void streamStateFreeCur(SStreamStateCur* pCur) {
if (!pCur || pCur->buffIndex >= 0) {
taosMemoryFree(pCur);
if (!pCur) {
return;
}
qDebug("streamStateFreeCur");
@ -768,7 +743,7 @@ int32_t streamStateSessionGet(SStreamState* pState, SSessionKey* key, void** pVa
int32_t streamStateSessionDel(SStreamState* pState, const SSessionKey* key) {
#ifdef USE_ROCKSDB
stDebug("===stream===delete skey:%" PRId64 ", ekey:%" PRId64 ", groupId:%" PRIu64, key->win.skey, key->win.ekey,
qDebug("===stream===delete skey:%" PRId64 ", ekey:%" PRId64 ", groupId:%" PRIu64, key->win.skey, key->win.ekey,
key->groupId);
return deleteRowBuff(pState->pFileState, key, sizeof(SSessionKey));
#else
@ -1122,7 +1097,7 @@ int32_t streamStateGetParName(SStreamState* pState, int64_t groupId, void** pVal
void streamStateDestroy(SStreamState* pState, bool remove) {
#ifdef USE_ROCKSDB
streamFileStateDestroy(pState->pFileState);
streamStateDestroy_rocksdb(pState, remove);
// streamStateDestroy_rocksdb(pState, remove);
tSimpleHashCleanup(pState->parNameMap);
// do nothong
#endif

View File

@ -250,9 +250,8 @@ int32_t tDecodeStreamTaskChkInfo(SDecoder* pDecoder, SCheckpointInfo* pChkpInfo)
SEpSet epSet;
if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeI64(pDecoder, &ver) < 0) return -1;
if (ver != SSTREAM_TASK_VER) return -1;
if (tDecodeI64(pDecoder, &pChkpInfo->msgVer) < 0) return -1;
// if (ver != SSTREAM_TASK_VER) return -1;
if (tDecodeI64(pDecoder, &skip64) < 0) return -1;
if (tDecodeI32(pDecoder, &skip32) < 0) return -1;
@ -379,6 +378,8 @@ void tFreeStreamTask(SStreamTask* pTask) {
if (pTask->pState) {
stDebug("s-task:0x%x start to free task state", taskId);
streamStateClose(pTask->pState, status == TASK_STATUS__DROPPING);
taskDbRemoveRef(pTask->pBackend);
}
if (pTask->id.idStr != NULL) {
@ -467,6 +468,14 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i
}
taosThreadMutexInit(&pTask->lock, &attr);
// if (pTask->info.fillHistory == 1) {
// //
// } else {
// }
// if (streamTaskSetDb(pMeta, pTask) != 0) {
// return -1;
// }
streamTaskOpenAllUpstreamInput(pTask);
pTask->outputInfo.pDownstreamUpdateList = taosArrayInit(4, sizeof(SDownstreamTaskEpset));

View File

@ -1499,6 +1499,7 @@ int transSendResponse(const STransMsg* msg) {
}
SExHandle* exh = msg->info.handle;
if (exh == NULL) {
rpcFreeCont(msg->pCont);
return 0;
}
int64_t refId = msg->info.refId;

View File

@ -22,7 +22,7 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 8-stream/pause_resume_test.py
#,,n,system-test,python3 ./test.py -f 8-stream/vnode_restart.py -N 4
#,,n,system-test,python3 ./test.py -f 8-stream/snode_restart.py -N 4
,,n,system-test,python3 ./test.py -f 8-stream/snode_restart_with_checkpoint.py -N 4
#,,n,system-test,python3 ./test.py -f 8-stream/snode_restart_with_checkpoint.py -N 4
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/tbname_vgroup.py
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/stbJoin.py

View File

@ -31,7 +31,7 @@ class TDTestCase:
tdSql.query("use test")
tdSql.query("create snode on dnode 4")
tdSql.query("create stream if not exists s1 trigger at_once ignore expired 0 ignore update 0 fill_history 1 into st1 as select _wstart,sum(voltage),groupid from meters partition by groupid interval(2s)")
tdLog.debug("========create stream useing snode and insert data ok========")
tdLog.debug("========create stream using snode and insert data ok========")
time.sleep(60)
tdDnodes = cluster.dnodes