fix(stream): scheduler the checkpoint generating.

This commit is contained in:
Haojun Liao 2023-11-22 15:17:21 +08:00
commit c96f7cf182
17 changed files with 517 additions and 301 deletions

View File

@ -186,6 +186,7 @@ enum { // WARN: new msg should be appended to segment tail
TD_DEF_MSG_TYPE(TDMT_MND_RESUME_STREAM, "resume-stream", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_STREAM_CHECKPOINT_TIMER, "stream-checkpoint-tmr", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_STREAM_BEGIN_CHECKPOINT, "stream-begin-checkpoint", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_STREAM_CHECKPOINT_CANDIDITATE, "stream-checkpoint-remain", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_STREAM_NODECHANGE_CHECK, "stream-nodechange-check", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_TRIM_DB_TIMER, "trim-db-tmr", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_GRANT_NOTIFY, "grant-notify", NULL, NULL)

View File

@ -150,19 +150,6 @@ typedef struct {
int32_t colNum;
} SMetaStbStats;
// void tqReaderSetColIdList(STqReader *pReader, SArray *pColIdList);
// int32_t tqReaderSetTbUidList(STqReader *pReader, const SArray *tbUidList);
// int32_t tqReaderAddTbUidList(STqReader *pReader, const SArray *pTableUidList);
// int32_t tqReaderRemoveTbUidList(STqReader *pReader, const SArray *tbUidList);
// bool tqReaderIsQueriedTable(STqReader* pReader, uint64_t uid);
// bool tqCurrentBlockConsumed(const STqReader* pReader);
// int32_t tqReaderSeek(STqReader *pReader, int64_t ver, const char *id);
// bool tqNextBlockInWal(STqReader* pReader, const char* idstr);
// bool tqNextBlockImpl(STqReader *pReader, const char* idstr);
// int32_t getTableInfoFromSnapshot(SSnapContext *ctx, void **pBuf, int32_t *contLen, int16_t *type, int64_t
// *uid); SMetaTableInfo getMetaTableInfoFromSnapshot(SSnapContext *ctx); int32_t setForSnapShot(SSnapContext
// *ctx, int64_t uid); int32_t destroySnapContext(SSnapContext *ctx);
// clang-format off
/*-------------------------------------------------new api format---------------------------------------------------*/
typedef struct TsdReader {
@ -197,27 +184,6 @@ typedef struct SStoreCacheReader {
// clang-format on
/*------------------------------------------------------------------------------------------------------------------*/
/*
void tqReaderSetColIdList(STqReader *pReader, SArray *pColIdList);
int32_t tqReaderSetTbUidList(STqReader *pReader, const SArray *tbUidList);
int32_t tqReaderAddTbUidList(STqReader *pReader, const SArray *pTableUidList);
int32_t tqReaderRemoveTbUidList(STqReader *pReader, const SArray *tbUidList);
bool tqReaderIsQueriedTable(STqReader* pReader, uint64_t uid);
bool tqCurrentBlockConsumed(const STqReader* pReader);
int32_t tqReaderSeek(STqReader *pReader, int64_t ver, const char *id);
bool tqNextBlockInWal(STqReader* pReader, const char* idstr);
bool tqNextBlockImpl(STqReader *pReader, const char* idstr);
int32_t tqRetrieveDataBlock(STqReader *pReader, SSDataBlock **pRes, const char* idstr);
STqReader *tqReaderOpen(void *pVnode);
void tqReaderClose(STqReader *);
int32_t tqReaderSetSubmitMsg(STqReader *pReader, void *msgStr, int32_t msgLen, int64_t ver);
bool tqNextDataBlockFilterOut(STqReader *pReader, SHashObj *filterOutUids);
SWalReader* tqGetWalReader(STqReader* pReader);
int32_t tqRetrieveTaosxBlock(STqReader *pReader, SArray *blocks, SArray *schemas, SSubmitTbData **pSubmitTbDataRet);
*/
// todo rename
typedef struct SStoreTqReader {
struct STqReader* (*tqReaderOpen)();
@ -281,28 +247,18 @@ typedef struct SStoreMeta {
void* (*storeGetIndexInfo)();
void* (*getInvertIndex)(void* pVnode);
int32_t (*getChildTableList)(
void* pVnode, int64_t suid,
SArray* list); // support filter and non-filter cases. [vnodeGetCtbIdList & vnodeGetCtbIdListByFilter]
int32_t (*storeGetTableList)(void* pVnode, int8_t type, SArray* pList); // vnodeGetStbIdList & vnodeGetAllTableList
// support filter and non-filter cases. [vnodeGetCtbIdList & vnodeGetCtbIdListByFilter]
int32_t (*getChildTableList)( void* pVnode, int64_t suid, SArray* list);
int32_t (*storeGetTableList)(void* pVnode, int8_t type, SArray* pList);
void* storeGetVersionRange;
void* storeGetLastTimestamp;
int32_t (*getTableSchema)(void* pVnode, int64_t uid, STSchema** pSchema, int64_t* suid); // tsdbGetTableSchema
int32_t (*getNumOfChildTables)( void* pVnode, int64_t uid, int64_t* numOfTables, int32_t* numOfCols);
void (*getBasicInfo)(void* pVnode, const char** dbname, int32_t* vgId, int64_t* numOfTables, int64_t* numOfNormalTables);
// db name, vgId, numOfTables, numOfSTables
int32_t (*getNumOfChildTables)(
void* pVnode, int64_t uid, int64_t* numOfTables,
int32_t* numOfCols); // int32_t metaGetStbStats(SMeta *pMeta, int64_t uid, SMetaStbStats *pInfo);
void (*getBasicInfo)(void* pVnode, const char** dbname, int32_t* vgId, int64_t* numOfTables,
int64_t* numOfNormalTables); // vnodeGetInfo(void *pVnode, const char **dbname, int32_t *vgId) &
// metaGetTbNum(SMeta *pMeta) & metaGetNtbNum(SMeta *pMeta);
int64_t (*getNumOfRowsInMem)(void* pVnode);
/**
int32_t vnodeGetCtbIdList(void *pVnode, int64_t suid, SArray *list);
int32_t vnodeGetCtbIdListByFilter(void *pVnode, int64_t suid, SArray *list, bool (*filter)(void *arg), void *arg);
int32_t vnodeGetStbIdList(void *pVnode, int64_t suid, SArray *list);
*/
SMCtbCursor* (*openCtbCursor)(void *pVnode, tb_uid_t uid, int lock);
int32_t (*resumeCtbCursor)(SMCtbCursor* pCtbCur, int8_t first);
void (*pauseCtbCursor)(SMCtbCursor* pCtbCur);

View File

@ -650,7 +650,6 @@ typedef struct SStreamConf {
typedef struct {
char name[TSDB_STREAM_FNAME_LEN];
// ctl
SRWLatch lock;
// create info

View File

@ -22,23 +22,49 @@
extern "C" {
#endif
typedef struct SStreamTransInfo {
int64_t startTime;
int32_t transId;
const char *name;
} SStreamTransInfo;
// time to generated the checkpoint, if now() - checkpointTs >= tsCheckpointInterval, this checkpoint will be discard
// to avoid too many checkpoints for a taskk in the waiting list
typedef struct SCheckpointCandEntry {
char* pName;
int64_t streamId;
int64_t checkpointTs;
int64_t checkpointId;
} SCheckpointCandEntry;
typedef struct SStreamTransMgmt {
SHashObj *pDBTrans;
SHashObj *pWaitingList; // stream id list, of which timed checkpoint failed to be issued due to the trans conflict.
} SStreamTransMgmt;
typedef struct SStreamExecInfo {
SArray *pNodeList;
int64_t ts; // snapshot ts
SStreamTransMgmt transMgmt;
int64_t activeCheckpoint; // active check point id
SHashObj * pTaskMap;
SArray * pTaskList;
TdThreadMutex lock;
} SStreamExecInfo;
extern SStreamExecInfo execInfo;
int32_t mndInitStream(SMnode *pMnode);
void mndCleanupStream(SMnode *pMnode);
SStreamObj *mndAcquireStream(SMnode *pMnode, char *streamName);
void mndReleaseStream(SMnode *pMnode, SStreamObj *pStream);
SSdbRaw *mndStreamActionEncode(SStreamObj *pStream);
SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw);
int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb);
int32_t mndPersistStream(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
SSdbRaw * mndStreamSeqActionEncode(SStreamObj *pStream);
SSdbRow * mndStreamSeqActionDecode(SSdbRaw *pRaw);
static int32_t mndStreamSeqActionInsert(SSdb *pSdb, SStreamSeq *pStream);
static int32_t mndStreamSeqActionDelete(SSdb *pSdb, SStreamSeq *pStream);
static int32_t mndStreamSeqActionUpdate(SSdb *pSdb, SStreamSeq *pOldStream, SStreamSeq *pNewStream);
int32_t mndStreamRegisterTrans(STrans* pTrans, const char* pName, const char* pSrcDb, const char* pDstDb);
bool mndStreamTransConflictOtherTrans(SMnode *pMnode, const char *pSrcDb, const char *pDstDb);
int32_t mndAddtoCheckpointWaitingList(SStreamObj* pStream, int64_t checkpointId);
// for sma
// TODO refactor
int32_t mndDropStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);

View File

@ -146,6 +146,15 @@ static void mndStreamCheckpointTick(SMnode *pMnode, int64_t sec) {
}
}
static void mndStreamCheckpointRemain(SMnode* pMnode) {
int32_t contLen = 0;
void *pReq = mndBuildCheckpointTickMsg(&contLen, 0);
if (pReq != NULL) {
SRpcMsg rpcMsg = {.msgType = TDMT_MND_STREAM_CHECKPOINT_CANDIDITATE, .pCont = pReq, .contLen = contLen};
tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg);
}
}
static void mndStreamCheckNode(SMnode* pMnode) {
int32_t contLen = 0;
void *pReq = mndBuildTimerMsg(&contLen);
@ -286,6 +295,10 @@ static void *mndThreadFp(void *param) {
mndStreamCheckpointTick(pMnode, sec);
}
if (sec % 5 == 0) {
mndStreamCheckpointRemain(pMnode);
}
if (sec % tsStreamNodeCheckInterval == 0) {
mndStreamCheckNode(pMnode);
}

View File

@ -16,15 +16,11 @@
#include "mndStream.h"
#include "audit.h"
#include "mndDb.h"
#include "mndDnode.h"
#include "mndMnode.h"
#include "mndPrivilege.h"
#include "mndScheduler.h"
#include "mndShow.h"
#include "mndStb.h"
#include "mndTopic.h"
#include "mndTrans.h"
#include "mndUser.h"
#include "mndVgroup.h"
#include "osMemory.h"
#include "parser.h"
@ -34,7 +30,13 @@
#define MND_STREAM_VER_NUMBER 4
#define MND_STREAM_RESERVE_SIZE 64
#define MND_STREAM_MAX_NUM 60
#define MND_STREAM_CHECKPOINT_NAME "stream-checkpoint"
#define MND_STREAM_PAUSE_NAME "stream-pause"
#define MND_STREAM_RESUME_NAME "stream-resume"
#define MND_STREAM_DROP_NAME "stream-drop"
#define MND_STREAM_TASK_RESET_NAME "stream-task-reset"
#define MND_STREAM_TASK_UPDATE_NAME "stream-task-update"
typedef struct SNodeEntry {
int32_t nodeId;
@ -43,22 +45,13 @@ typedef struct SNodeEntry {
int64_t hbTimestamp; // second
} SNodeEntry;
typedef struct SStreamExecInfo {
SArray * pNodeList;
int64_t ts; // snapshot ts
int64_t activeCheckpoint; // active check point id
SHashObj * pTaskMap;
SArray * pTaskList;
TdThreadMutex lock;
} SStreamExecInfo;
typedef struct SVgroupChangeInfo {
SHashObj *pDBMap;
SArray * pUpdateNodeList; // SArray<SNodeUpdateInfo>
} SVgroupChangeInfo;
static int32_t mndNodeCheckSentinel = 0;
static SStreamExecInfo execInfo;
SStreamExecInfo execInfo;
static int32_t mndStreamActionInsert(SSdb *pSdb, SStreamObj *pStream);
static int32_t mndStreamActionDelete(SSdb *pSdb, SStreamObj *pStream);
@ -67,6 +60,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq);
static int32_t mndProcessDropStreamReq(SRpcMsg *pReq);
static int32_t mndProcessStreamCheckpointTmr(SRpcMsg *pReq);
static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq);
static int32_t mndProcessStreamCheckpointInCandid(SRpcMsg *pReq);
static int32_t mndProcessStreamHb(SRpcMsg *pReq);
static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
static void mndCancelGetNextStream(SMnode *pMnode, void *pIter);
@ -83,16 +77,26 @@ static SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady);
static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeList, const SArray *pNodeList);
static STrans *doCreateTrans(SMnode *pMnode, SStreamObj *pStream, const char *name);
static STrans *doCreateTrans(SMnode *pMnode, SStreamObj *pStream, SRpcMsg *pReq, const char *name, const char* pMsg);
static int32_t mndPersistTransLog(SStreamObj *pStream, STrans *pTrans);
static void initTransAction(STransAction *pAction, void *pCont, int32_t contLen, int32_t msgType, const SEpSet *pEpset,
int32_t retryCode);
static int32_t createStreamUpdateTrans(SStreamObj *pStream, SVgroupChangeInfo *pInfo, STrans *pTrans);
static void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode);
static void keepStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode);
static void saveStreamTasksInfo(SStreamObj *pStream, SStreamExecInfo *pExecNode);
static int32_t removeExpirednodeEntryAndTask(SArray *pNodeSnapshot);
static int32_t doKillActiveCheckpointTrans(SMnode *pMnode);
static int32_t killActiveCheckpointTrans(SMnode *pMnode, const char* pDbName, size_t len);
static int32_t setNodeEpsetExpiredFlag(const SArray *pNodeList);
static void freeCheckpointCandEntry(void*);
static SSdbRaw *mndStreamActionEncode(SStreamObj *pStream);
static SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw);
SSdbRaw * mndStreamSeqActionEncode(SStreamObj *pStream);
SSdbRow * mndStreamSeqActionDecode(SSdbRaw *pRaw);
static int32_t mndStreamSeqActionInsert(SSdb *pSdb, SStreamSeq *pStream);
static int32_t mndStreamSeqActionDelete(SSdb *pSdb, SStreamSeq *pStream);
static int32_t mndStreamSeqActionUpdate(SSdb *pSdb, SStreamSeq *pOldStream, SStreamSeq *pNewStream);
int32_t mndInitStream(SMnode *pMnode) {
SSdbTable table = {
@ -129,6 +133,7 @@ int32_t mndInitStream(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_VND_STREAM_CHECK_POINT_SOURCE_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_MND_STREAM_CHECKPOINT_TIMER, mndProcessStreamCheckpointTmr);
mndSetMsgHandle(pMnode, TDMT_MND_STREAM_BEGIN_CHECKPOINT, mndProcessStreamDoCheckpoint);
mndSetMsgHandle(pMnode, TDMT_MND_STREAM_CHECKPOINT_CANDIDITATE, mndProcessStreamCheckpointInCandid);
mndSetMsgHandle(pMnode, TDMT_MND_STREAM_HEARTBEAT, mndProcessStreamHb);
mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_REPORT_CHECKPOINT, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_MND_STREAM_NODECHANGE_CHECK, mndProcessNodeCheckReq);
@ -142,8 +147,13 @@ int32_t mndInitStream(SMnode *pMnode) {
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_STREAM_TASKS, mndCancelGetNextStreamTask);
taosThreadMutexInit(&execInfo.lock, NULL);
execInfo.pTaskMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_NO_LOCK);
_hash_fn_t fn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR);
execInfo.pTaskList = taosArrayInit(4, sizeof(STaskId));
execInfo.pTaskMap = taosHashInit(64, fn, true, HASH_NO_LOCK);
execInfo.transMgmt.pDBTrans = taosHashInit(32, fn, true, HASH_NO_LOCK);
execInfo.transMgmt.pWaitingList = taosHashInit(32, fn, true, HASH_NO_LOCK);
taosHashSetFreeFp(execInfo.transMgmt.pWaitingList, freeCheckpointCandEntry);
if (sdbSetTable(pMnode->pSdb, table) != 0) {
return -1;
@ -322,9 +332,9 @@ static void mndShowStreamStatus(char *dst, SStreamObj *pStream) {
SSdbRaw * mndStreamSeqActionEncode(SStreamObj *pStream) { return NULL; }
SSdbRow * mndStreamSeqActionDecode(SSdbRaw *pRaw) { return NULL; }
static int32_t mndStreamSeqActionInsert(SSdb *pSdb, SStreamSeq *pStream) { return 0; }
static int32_t mndStreamSeqActionDelete(SSdb *pSdb, SStreamSeq *pStream) { return 0; }
static int32_t mndStreamSeqActionUpdate(SSdb *pSdb, SStreamSeq *pOldStream, SStreamSeq *pNewStream) { return 0; }
int32_t mndStreamSeqActionInsert(SSdb *pSdb, SStreamSeq *pStream) { return 0; }
int32_t mndStreamSeqActionDelete(SSdb *pSdb, SStreamSeq *pStream) { return 0; }
int32_t mndStreamSeqActionUpdate(SSdb *pSdb, SStreamSeq *pOldStream, SStreamSeq *pNewStream) { return 0; }
static void mndShowStreamTrigger(char *dst, SStreamObj *pStream) {
int8_t trigger = pStream->conf.trigger;
@ -360,7 +370,7 @@ static int32_t mndStreamGetPlanString(const char *ast, int8_t triggerType, int64
.pAstRoot = pAst,
.topicQuery = false,
.streamQuery = true,
.triggerType = triggerType == STREAM_TRIGGER_MAX_DELAY ? STREAM_TRIGGER_WINDOW_CLOSE : triggerType,
.triggerType = (triggerType == STREAM_TRIGGER_MAX_DELAY) ? STREAM_TRIGGER_WINDOW_CLOSE : triggerType,
.watermark = watermark,
};
code = qCreateQueryPlan(&cxt, &pPlan, NULL);
@ -745,11 +755,38 @@ int32_t mndDropStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream)
return 0;
}
static int32_t checkForNumOfStreams(SMnode *pMnode, SStreamObj *pStreamObj) { // check for number of existed tasks
int32_t numOfStream = 0;
SStreamObj *pStream = NULL;
void *pIter = NULL;
while ((pIter = sdbFetch(pMnode->pSdb, SDB_STREAM, pIter, (void **)&pStream)) != NULL) {
if (pStream->sourceDbUid == pStreamObj->sourceDbUid) {
++numOfStream;
}
sdbRelease(pMnode->pSdb, pStream);
if (numOfStream > MND_STREAM_MAX_NUM) {
mError("too many streams, no more than %d for each database", MND_STREAM_MAX_NUM);
sdbCancelFetch(pMnode->pSdb, pIter);
return TSDB_CODE_MND_TOO_MANY_STREAMS;
}
if (pStream->targetStbUid == pStreamObj->targetStbUid) {
mError("Cannot write the same stable as other stream:%s", pStream->name);
sdbCancelFetch(pMnode->pSdb, pIter);
return TSDB_CODE_MND_INVALID_TARGET_TABLE;
}
}
return TSDB_CODE_SUCCESS;
}
static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
SMnode * pMnode = pReq->info.node;
int32_t code = -1;
SStreamObj * pStream = NULL;
SDbObj * pDb = NULL;
SCMCreateStreamReq createStreamReq = {0};
SStreamObj streamObj = {0};
@ -757,6 +794,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
terrno = TSDB_CODE_INVALID_MSG;
goto _OVER;
}
#ifdef WINDOWS
terrno = TSDB_CODE_MND_INVALID_PLATFORM;
goto _OVER;
@ -797,43 +835,10 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
goto _OVER;
}
{
int32_t numOfStream = 0;
SStreamObj *pStream = NULL;
void * pIter = NULL;
while (1) {
pIter = sdbFetch(pMnode->pSdb, SDB_STREAM, pIter, (void **)&pStream);
if (pIter == NULL) {
if (numOfStream > MND_STREAM_MAX_NUM) {
mError("too many streams, no more than %d for each database", MND_STREAM_MAX_NUM);
terrno = TSDB_CODE_MND_TOO_MANY_STREAMS;
code = checkForNumOfStreams(pMnode, &streamObj);
if (code != TSDB_CODE_SUCCESS) {
goto _OVER;
}
break;
}
if (pStream->sourceDbUid == streamObj.sourceDbUid) {
++numOfStream;
}
sdbRelease(pMnode->pSdb, pStream);
if (numOfStream > MND_STREAM_MAX_NUM) {
mError("too many streams, no more than %d for each database", MND_STREAM_MAX_NUM);
terrno = TSDB_CODE_MND_TOO_MANY_STREAMS;
sdbCancelFetch(pMnode->pSdb, pIter);
goto _OVER;
}
if (pStream->targetStbUid == streamObj.targetStbUid) {
mError("Cannot write the same stable as other stream:%s", pStream->name);
terrno = TSDB_CODE_MND_INVALID_TARGET_TABLE;
sdbCancelFetch(pMnode->pSdb, pIter);
goto _OVER;
}
}
}
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB_INSIDE, pReq, "create-stream");
if (pTrans == NULL) {
@ -891,7 +896,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
taosThreadMutexLock(&execInfo.lock);
mDebug("stream tasks register into node list");
keepStreamTasksInBuf(&streamObj, &execInfo);
saveStreamTasksInfo(&streamObj, &execInfo);
taosThreadMutexUnlock(&execInfo.lock);
code = TSDB_CODE_ACTION_IN_PROGRESS;
@ -910,6 +915,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
sprintf(detail, "dbname:%s, stream name:%s", dbname.dbname, name.dbname);
auditRecord(pReq, pMnode->clusterId, "createStream", dbname.dbname, name.dbname, detail, strlen(detail));
}
_OVER:
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
mError("stream:%s, failed to create since %s", createStreamReq.name, terrstr());
@ -959,6 +965,22 @@ static int32_t mndProcessStreamCheckpointTmr(SRpcMsg *pReq) {
return 0;
}
static int32_t mndProcessStreamRemainChkptTmr(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
SSdb * pSdb = pMnode->pSdb;
if (sdbGetSize(pSdb, SDB_STREAM) <= 0) {
return 0;
}
SMStreamDoCheckpointMsg *pMsg = rpcMallocCont(sizeof(SMStreamDoCheckpointMsg));
pMsg->checkpointId = 0;
int32_t size = sizeof(SMStreamDoCheckpointMsg);
SRpcMsg rpcMsg = {.msgType = TDMT_MND_STREAM_CHECKPOINT_CANDIDITATE, .pCont = pMsg, .contLen = size};
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
return 0;
}
static int32_t mndBuildStreamCheckpointSourceReq2(void **pBuf, int32_t *pLen, int32_t nodeId, int64_t checkpointId,
int64_t streamId, int32_t taskId) {
SStreamCheckpointSourceReq req = {0};
@ -1009,8 +1031,20 @@ static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStre
return -1;
}
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, NULL, "stream-checkpoint");
if (pTrans == NULL) return -1;
bool conflict = mndStreamTransConflictOtherTrans(pMnode, pStream->sourceDb, pStream->targetDb);
if (conflict) {
mndAddtoCheckpointWaitingList(pStream, checkpointId);
mWarn("checkpoint conflict with other trans in %s, ignore the checkpoint for stream:%s %" PRIx64, pStream->sourceDb,
pStream->name, pStream->uid);
return -1;
}
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, NULL, MND_STREAM_CHECKPOINT_NAME);
if (pTrans == NULL) {
return -1;
}
mndStreamRegisterTrans(pTrans, MND_STREAM_CHECKPOINT_NAME, pStream->sourceDb, pStream->targetDb);
mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetDb);
if (mndTrancCheckConflict(pMnode, pTrans) != 0) {
@ -1020,19 +1054,21 @@ static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStre
}
mDebug("start to trigger checkpoint for stream:%s, checkpoint: %" PRId64 "", pStream->name, checkpointId);
taosWLockLatch(&pStream->lock);
pStream->currentTick = 1;
// 1. redo action: broadcast checkpoint source msg for all source vg
// 1. redo action: broadcast checkpoint source msg for all source vg
int32_t totLevel = taosArrayGetSize(pStream->tasks);
for (int32_t i = 0; i < totLevel; i++) {
SArray * pLevel = taosArrayGetP(pStream->tasks, i);
SStreamTask *pTask = taosArrayGetP(pLevel, 0);
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
SStreamTask *p = taosArrayGetP(pLevel, 0);
if (p->info.taskLevel == TASK_LEVEL__SOURCE) {
int32_t sz = taosArrayGetSize(pLevel);
for (int32_t j = 0; j < sz; j++) {
SStreamTask *pTask = taosArrayGetP(pLevel, j);
/*A(pTask->info.nodeId > 0);*/
SVgObj *pVgObj = mndAcquireVgroup(pMnode, pTask->info.nodeId);
if (pVgObj == NULL) {
taosWUnLockLatch(&pStream->lock);
@ -1048,15 +1084,12 @@ static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStre
goto _ERR;
}
STransAction action = {0};
action.epSet = mndGetVgroupEpset(pMnode, pVgObj);
action.pCont = buf;
action.contLen = tlen;
action.msgType = TDMT_VND_STREAM_CHECK_POINT_SOURCE;
STransAction act = {0};
SEpSet epset = mndGetVgroupEpset(pMnode, pVgObj);
mndReleaseVgroup(pMnode, pVgObj);
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
initTransAction(&act, buf, tlen, TDMT_VND_STREAM_CHECK_POINT_SOURCE, &epset, TSDB_CODE_SYN_PROPOSE_NOT_READY);
if (mndTransAppendRedoAction(pTrans, &act) != 0) {
taosMemoryFree(buf);
taosWUnLockLatch(&pStream->lock);
goto _ERR;
@ -1064,34 +1097,25 @@ static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStre
}
}
}
// 2. reset tick
pStream->checkpointId = checkpointId;
pStream->checkpointFreq = taosGetTimestampMs();
pStream->currentTick = 0;
// 3. commit log: stream checkpoint info
pStream->version = pStream->version + 1;
taosWUnLockLatch(&pStream->lock);
SSdbRaw *pCommitRaw = mndStreamActionEncode(pStream);
if (pCommitRaw == NULL) {
mError("failed to prepare trans rebalance since %s", terrstr());
goto _ERR;
if ((code = mndPersistTransLog(pStream, pTrans)) != TSDB_CODE_SUCCESS) {
return code;
}
if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
sdbFreeRaw(pCommitRaw);
mError("failed to prepare trans rebalance since %s", terrstr());
goto _ERR;
}
if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY) != 0) {
sdbFreeRaw(pCommitRaw);
if ((code = mndTransPrepare(pMnode, pTrans)) != TSDB_CODE_SUCCESS) {
mError("failed to prepare trans rebalance since %s", terrstr());
goto _ERR;
}
if (mndTransPrepare(pMnode, pTrans) != 0) {
mError("failed to prepare trans rebalance since %s", terrstr());
goto _ERR;
}
code = 0;
_ERR:
mndTransDrop(pTrans);
@ -1181,27 +1205,11 @@ static int32_t initStreamNodeList(SMnode *pMnode) {
return taosArrayGetSize(execInfo.pNodeList);
}
static const char *mndGetStreamDB(SMnode *pMnode) {
SSdb * pSdb = pMnode->pSdb;
SStreamObj *pStream = NULL;
void * pIter = NULL;
pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
if (pIter == NULL) {
return NULL;
}
const char *p = taosStrdup(pStream->sourceDb);
mndReleaseStream(pMnode, pStream);
sdbCancelFetch(pSdb, pIter);
return p;
}
static bool taskNodeIsUpdated(SMnode *pMnode) {
// check if the node update happens or not
taosThreadMutexLock(&execInfo.lock);
int32_t numOfNodes = initStreamNodeList(pMnode);
int32_t numOfNodes = initStreamNodeList(pMnode);
if (numOfNodes == 0) {
mDebug("stream task node change checking done, no vgroups exist, do nothing");
execInfo.ts = taosGetTimestampSec();
@ -1243,18 +1251,19 @@ static bool taskNodeIsUpdated(SMnode *pMnode) {
static int32_t mndCheckNodeStatus(SMnode *pMnode) {
bool ready = true;
// check if the node update happens or not
int64_t ts = taosGetTimestampSec();
bool updated = taskNodeIsUpdated(pMnode);
if (updated) return -1;
if (taskNodeIsUpdated(pMnode)) {
return -1;
}
taosThreadMutexLock(&execInfo.lock);
if (taosArrayGetSize(execInfo.pNodeList) == 0) {
mDebug("stream task node change checking done, no vgroups exist, do nothing");
execInfo.ts = ts;
}
for (int32_t i = 0; i < taosArrayGetSize(execInfo.pTaskList); ++i) {
STaskId * p = taosArrayGet(execInfo.pTaskList, i);
STaskId *p = taosArrayGet(execInfo.pTaskList, i);
STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, p, sizeof(*p));
if (pEntry == NULL) {
continue;
@ -1267,32 +1276,73 @@ static int32_t mndCheckNodeStatus(SMnode *pMnode) {
break;
}
}
taosThreadMutexUnlock(&execInfo.lock);
if (!ready) {
return -1;
}
return 0;
return ready? 0:-1;
}
static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) {
SMnode * pMnode = pReq->info.node;
SSdb * pSdb = pMnode->pSdb;
void * pIter = NULL;
SMnode *pMnode = pReq->info.node;
SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL;
SStreamObj *pStream = NULL;
int32_t code = 0;
if ((code = mndCheckNodeStatus(pMnode)) != 0) {
return code;
}
SMStreamDoCheckpointMsg *pMsg = (SMStreamDoCheckpointMsg *)pReq->pCont;
while (1) {
pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
if (pIter == NULL) break;
while ((pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream)) != NULL) {
code = mndProcessStreamCheckpointTrans(pMnode, pStream, pMsg->checkpointId);
sdbRelease(pSdb, pStream);
if (code == -1) {
break;
}
}
return code;
}
static int32_t mndProcessStreamCheckpointInCandid(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
void *pIter = NULL;
int32_t code = 0;
taosThreadMutexLock(&execInfo.lock);
int32_t num = taosHashGetSize(execInfo.transMgmt.pWaitingList);
taosThreadMutexUnlock(&execInfo.lock);
if (num == 0) {
return code;
}
if ((code = mndCheckNodeStatus(pMnode)) != 0) {
return code;
}
SArray *pList = taosArrayInit(4, sizeof(int64_t));
while ((pIter = taosHashIterate(execInfo.transMgmt.pWaitingList, pIter)) != NULL) {
SCheckpointCandEntry *pEntry = pIter;
SStreamObj *ps = mndAcquireStream(pMnode, pEntry->pName);
mDebug("start to launch checkpoint for stream:%s %"PRIx64" in candidate list", pEntry->pName, pEntry->streamId);
code = mndProcessStreamCheckpointTrans(pMnode, ps, pEntry->checkpointId);
mndReleaseStream(pMnode, ps);
if (code == TSDB_CODE_SUCCESS) {
taosArrayPush(pList, &pEntry->streamId);
}
}
for (int32_t i = 0; i < taosArrayGetSize(pList); ++i) {
int64_t *pId = taosArrayGet(pList, i);
taosHashRemove(execInfo.transMgmt.pWaitingList, &pId, sizeof(*pId));
}
int32_t remain = taosHashGetSize(execInfo.transMgmt.pWaitingList);
mDebug("%d in candidate list generated checkpoint, remaining:%d", (int32_t)taosArrayGetSize(pList), remain);
taosArrayDestroy(pList);
return code;
}
@ -1328,7 +1378,14 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
return -1;
}
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pReq, "drop-stream");
// check if it is conflict with other trans in both sourceDb and targetDb.
bool conflict = mndStreamTransConflictOtherTrans(pMnode, pStream->sourceDb, pStream->targetDb);
if (!conflict) {
sdbRelease(pMnode->pSdb, pStream);
return -1;
}
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pReq, MND_STREAM_DROP_NAME);
if (pTrans == NULL) {
mError("stream:%s, failed to drop since %s", dropReq.name, terrstr());
sdbRelease(pMnode->pSdb, pStream);
@ -1336,7 +1393,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
return -1;
}
mInfo("trans:%d, used to drop stream:%s", pTrans->id, dropReq.name);
mInfo("trans:%d used to drop stream:%s", pTrans->id, dropReq.name);
mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetDb);
if (mndTransCheckConflict(pMnode, pTrans) != 0) {
@ -1346,6 +1403,8 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
return -1;
}
int32_t code = mndStreamRegisterTrans(pTrans, MND_STREAM_DROP_NAME, pStream->sourceDb, pStream->targetDb);
// drop all tasks
if (mndDropStreamTasks(pMnode, pTrans, pStream) < 0) {
mError("stream:%s, failed to drop task since %s", dropReq.name, terrstr());
@ -1796,6 +1855,13 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) {
return -1;
}
// check if it is conflict with other trans in both sourceDb and targetDb.
bool conflict = mndStreamTransConflictOtherTrans(pMnode, pStream->sourceDb, pStream->targetDb);
if (!conflict) {
sdbRelease(pMnode->pSdb, pStream);
return -1;
}
bool updated = taskNodeIsUpdated(pMnode);
if (updated) {
mError("tasks are not ready for pause, node update detected");
@ -1804,7 +1870,7 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) {
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pReq, "pause-stream");
if (pTrans == NULL) {
mError("stream:%s, failed to pause stream since %s", pauseReq.name, terrstr());
mError("stream:%s failed to pause stream since %s", pauseReq.name, terrstr());
sdbRelease(pMnode->pSdb, pStream);
return -1;
}
@ -1818,7 +1884,9 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) {
return -1;
}
// pause all tasks
int32_t code = mndStreamRegisterTrans(pTrans, MND_STREAM_PAUSE_NAME, pStream->sourceDb, pStream->targetDb);
// if nodeUpdate happened, not send pause trans
if (mndPauseAllStreamTasks(pMnode, pTrans, pStream) < 0) {
mError("stream:%s, failed to pause task since %s", pauseReq.name, terrstr());
sdbRelease(pMnode->pSdb, pStream);
@ -1922,13 +1990,21 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) {
return -1;
}
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pReq, "pause-stream");
if (pTrans == NULL) {
mError("stream:%s, failed to pause stream since %s", pauseReq.name, terrstr());
// check if it is conflict with other trans in both sourceDb and targetDb.
bool conflict = mndStreamTransConflictOtherTrans(pMnode, pStream->sourceDb, pStream->targetDb);
if (!conflict) {
sdbRelease(pMnode->pSdb, pStream);
return -1;
}
mInfo("trans:%d, used to pause stream:%s", pTrans->id, pauseReq.name);
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pReq, MND_STREAM_RESUME_NAME);
if (pTrans == NULL) {
mError("stream:%s, failed to resume stream since %s", pauseReq.name, terrstr());
sdbRelease(pMnode->pSdb, pStream);
return -1;
}
mInfo("trans:%d used to resume stream:%s", pTrans->id, pauseReq.name);
mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetDb);
if (mndTransCheckConflict(pMnode, pTrans) != 0) {
@ -1937,6 +2013,8 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) {
return -1;
}
int32_t code = mndStreamRegisterTrans(pTrans, MND_STREAM_RESUME_NAME, pStream->sourceDb, pStream->targetDb);
// resume all tasks
if (mndResumeAllStreamTasks(pTrans, pMnode, pStream, pauseReq.igUntreated) < 0) {
mError("stream:%s, failed to drop task since %s", pauseReq.name, terrstr());
@ -2201,7 +2279,7 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange
// here create only one trans
if (pTrans == NULL) {
pTrans = doCreateTrans(pMnode, pStream, "stream-task-update");
pTrans = doCreateTrans(pMnode, pStream, NULL, MND_STREAM_TASK_UPDATE_NAME, "update task epsets");
if (pTrans == NULL) {
sdbRelease(pSdb, pStream);
sdbCancelFetch(pSdb, pIter);
@ -2311,7 +2389,7 @@ static void doExtractTasksFromStream(SMnode *pMnode) {
break;
}
keepStreamTasksInBuf(pStream, &execInfo);
saveStreamTasksInfo(pStream, &execInfo);
sdbRelease(pSdb, pStream);
}
}
@ -2394,6 +2472,18 @@ int32_t removeExpirednodeEntryAndTask(SArray *pNodeSnapshot) {
return 0;
}
// kill all trans in the dst DB
static void killAllCheckpointTrans(SMnode* pMnode, SVgroupChangeInfo* pChangeInfo) {
void* pIter = NULL;
while((pIter = taosHashIterate(pChangeInfo->pDBMap, pIter)) != NULL) {
char* pDb = (char*) pIter;
size_t len = 0;
void* pKey = taosHashGetKey(pDb, &len);
killActiveCheckpointTrans(pMnode, pKey, len);
}
}
// this function runs by only one thread, so it is not multi-thread safe
static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) {
int32_t code = 0;
@ -2435,7 +2525,8 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) {
SVgroupChangeInfo changeInfo = mndFindChangedNodeInfo(pMnode, execInfo.pNodeList, pNodeSnapshot);
if (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0) {
// kill current active checkpoint transaction, since the transaction is vnode wide.
doKillActiveCheckpointTrans(pMnode);
killAllCheckpointTrans(pMnode, &changeInfo);
code = mndProcessVgroupChange(pMnode, &changeInfo);
// keep the new vnode snapshot
@ -2481,7 +2572,7 @@ static int32_t mndProcessNodeCheck(SRpcMsg *pReq) {
return 0;
}
void keepStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode) {
void saveStreamTasksInfo(SStreamObj *pStream, SStreamExecInfo *pExecNode) {
int32_t level = taosArrayGetSize(pStream->tasks);
for (int32_t i = 0; i < level; i++) {
@ -2524,8 +2615,9 @@ void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode) {
STaskId *pId = taosArrayGet(pExecNode->pTaskList, k);
if (pId->taskId == id.taskId && pId->streamId == id.streamId) {
taosArrayRemove(pExecNode->pTaskList, k);
mInfo("s-task:0x%x removed from buffer, remain:%d", (int32_t)id.taskId,
(int32_t)taosArrayGetSize(pExecNode->pTaskList));
int32_t num = taosArrayGetSize(pExecNode->pTaskList);
mInfo("s-task:0x%x removed from buffer, remain:%d", (int32_t)id.taskId, num);
break;
}
}
@ -2536,15 +2628,15 @@ void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode) {
ASSERT(taosHashGetSize(pExecNode->pTaskMap) == taosArrayGetSize(pExecNode->pTaskList));
}
STrans *doCreateTrans(SMnode *pMnode, SStreamObj *pStream, const char *name) {
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, NULL, name);
STrans *doCreateTrans(SMnode *pMnode, SStreamObj *pStream, SRpcMsg *pReq, const char *name, const char* pMsg) {
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pReq, name);
if (pTrans == NULL) {
mError("failed to build trans:%s, reason: %s", name, tstrerror(TSDB_CODE_OUT_OF_MEMORY));
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
mDebug("start to build stream:0x%" PRIx64 " task DAG update", pStream->uid);
mDebug("s-task:0x%"PRIx64" start to build trans %s", pStream->uid, pMsg);
mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetDb);
if (mndTransCheckConflict(pMnode, pTrans) != 0) {
@ -2559,7 +2651,7 @@ STrans *doCreateTrans(SMnode *pMnode, SStreamObj *pStream, const char *name) {
}
int32_t createStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream) {
STrans *pTrans = doCreateTrans(pMnode, pStream, "stream-task-reset");
STrans *pTrans = doCreateTrans(pMnode, pStream, NULL, MND_STREAM_TASK_RESET_NAME, " reset from failed checkpoint");
if (pTrans == NULL) {
return terrno;
}
@ -2624,43 +2716,36 @@ int32_t createStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream) {
return TSDB_CODE_ACTION_IN_PROGRESS;
}
int32_t doKillActiveCheckpointTrans(SMnode *pMnode) {
int32_t transId = 0;
SSdb * pSdb = pMnode->pSdb;
STrans *pTrans = NULL;
void * pIter = NULL;
while (1) {
pIter = sdbFetch(pSdb, SDB_TRANS, pIter, (void **)&pTrans);
if (pIter == NULL) {
break;
}
if (strncmp(pTrans->opername, MND_STREAM_CHECKPOINT_NAME, tListLen(pTrans->opername) - 1) == 0) {
transId = pTrans->id;
sdbRelease(pSdb, pTrans);
sdbCancelFetch(pSdb, pIter);
break;
}
sdbRelease(pSdb, pTrans);
}
if (transId == 0) {
mDebug("failed to find the checkpoint trans, reset not executed");
int32_t killActiveCheckpointTrans(SMnode *pMnode, const char* pDBName, size_t len) {
// data in the hash table will be removed automatically, no need to remove it here.
SStreamTransInfo* pTransInfo = taosHashGet(execInfo.transMgmt.pDBTrans, pDBName, len);
if (pTransInfo == NULL) {
return TSDB_CODE_SUCCESS;
}
pTrans = mndAcquireTrans(pMnode, transId);
mInfo("kill checkpoint trans:%d", transId);
// not checkpoint trans, ignore
if (strcmp(pTransInfo->name, MND_STREAM_CHECKPOINT_NAME) != 0) {
mDebug("not checkpoint trans, not kill it, name:%s, transId:%d", pTransInfo->name, pTransInfo->transId);
return TSDB_CODE_SUCCESS;
}
STrans* pTrans = mndAcquireTrans(pMnode, pTransInfo->transId);
if (pTrans != NULL) {
mInfo("kill checkpoint transId:%d in Db:%s", pTransInfo->transId, pDBName);
mndKillTrans(pMnode, pTrans);
mndReleaseTrans(pMnode, pTrans);
}
return TSDB_CODE_SUCCESS;
}
int32_t mndResetFromCheckpoint(SMnode *pMnode) {
doKillActiveCheckpointTrans(pMnode);
int32_t mndResetStatusFromCheckpoint(SMnode *pMnode, int32_t transId) {
STrans* pTrans = mndAcquireTrans(pMnode, transId);
if (pTrans != NULL) {
mInfo("kill checkpoint transId:%d to reset task status", transId);
mndKillTrans(pMnode, pTrans);
mndReleaseTrans(pMnode, pTrans);
}
// set all tasks status to be normal, refactor later to be stream level, instead of vnode level.
SSdb * pSdb = pMnode->pSdb;
@ -2672,7 +2757,13 @@ int32_t mndResetFromCheckpoint(SMnode *pMnode) {
break;
}
// todo this transaction should exist be only one
bool conflict = mndStreamTransConflictOtherTrans(pMnode, pStream->sourceDb, pStream->targetDb);
if (conflict) {
mError("stream:%s other trans exists in DB:%s & %s failed to start reset-status trans",
pStream->name, pStream->sourceDb, pStream->targetDb);
continue;
}
mDebug("stream:%s (0x%" PRIx64 ") reset checkpoint procedure, create reset trans", pStream->name, pStream->uid);
int32_t code = createStreamResetStatusTrans(pMnode, pStream);
if (code != TSDB_CODE_SUCCESS) {
@ -2798,7 +2889,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
// if the execInfo.activeCheckpoint == 0, the checkpoint is restoring from wal
mInfo("checkpointId:%" PRId64 " failed, issue task-reset trans to reset all tasks status",
execInfo.activeCheckpoint);
mndResetFromCheckpoint(pMnode);
mndResetStatusFromCheckpoint(pMnode, activeCheckpointId);
} else {
mInfo("not all vgroups are ready, wait for next HB from stream tasks");
}
@ -2810,3 +2901,8 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
taosArrayDestroy(req.pUpdateNodes);
return TSDB_CODE_SUCCESS;
}
void freeCheckpointCandEntry(void* param) {
SCheckpointCandEntry* pEntry = param;
taosMemoryFreeClear(pEntry->pName);
}

View File

@ -0,0 +1,123 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "mndTrans.h"
#include "mndStream.h"
typedef struct SKeyInfo {
void* pKey;
int32_t keyLen;
} SKeyInfo;
static int32_t clearFinishedTrans(SMnode* pMnode);
int32_t mndStreamRegisterTrans(STrans* pTrans, const char* pName, const char* pSrcDb, const char* pDstDb) {
SStreamTransInfo info = {.transId = pTrans->id, .startTime = taosGetTimestampMs(), .name = pName};
taosHashPut(execInfo.transMgmt.pDBTrans, pSrcDb, strlen(pSrcDb), &info, sizeof(SStreamTransInfo));
if (strcmp(pSrcDb, pDstDb) != 0) {
taosHashPut(execInfo.transMgmt.pDBTrans, pDstDb, strlen(pDstDb), &info, sizeof(SStreamTransInfo));
}
return 0;
}
int32_t clearFinishedTrans(SMnode* pMnode) {
SArray* pList = taosArrayInit(4, sizeof(SKeyInfo));
size_t keyLen = 0;
taosThreadMutexLock(&execInfo.lock);
void* pIter = NULL;
while ((pIter = taosHashIterate(execInfo.transMgmt.pDBTrans, pIter)) != NULL) {
SStreamTransInfo *pEntry = (SStreamTransInfo *)pIter;
STrans* pTrans = mndAcquireTrans(pMnode, pEntry->transId);
// let's clear the finished trans
if (pTrans == NULL) {
void* pKey = taosHashGetKey(pEntry, &keyLen);
// key is the name of src/dst db name
SKeyInfo info = {.pKey = pKey, .keyLen = keyLen};
mDebug("transId:%d %s startTs:%" PRId64 "cleared due to finished", pEntry->transId, pEntry->name,
pEntry->startTime);
taosArrayPush(pList, &info);
} else {
mndReleaseTrans(pMnode, pTrans);
}
}
size_t num = taosArrayGetSize(pList);
for(int32_t i = 0; i < num; ++i) {
SKeyInfo* pKey = taosArrayGet(pList, i);
taosHashRemove(execInfo.transMgmt.pDBTrans, pKey->pKey, pKey->keyLen);
}
mDebug("clear %d finished stream-trans, remained:%d", (int32_t) num, taosHashGetSize(execInfo.transMgmt.pDBTrans));
taosThreadMutexUnlock(&execInfo.lock);
terrno = TSDB_CODE_SUCCESS;
taosArrayDestroy(pList);
return 0;
}
bool mndStreamTransConflictOtherTrans(SMnode* pMnode, const char* pSrcDb, const char* pDstDb) {
clearFinishedTrans(pMnode);
taosThreadMutexLock(&execInfo.lock);
int32_t num = taosHashGetSize(execInfo.transMgmt.pDBTrans);
if (num <= 0) {
taosThreadMutexUnlock(&execInfo.lock);
return false;
}
SStreamTransInfo *pEntry = taosHashGet(execInfo.transMgmt.pDBTrans, pSrcDb, strlen(pSrcDb));
if (pEntry != NULL) {
taosThreadMutexUnlock(&execInfo.lock);
return true;
}
pEntry = taosHashGet(execInfo.transMgmt.pDBTrans, pDstDb, strlen(pDstDb));
if (pEntry != NULL) {
taosThreadMutexUnlock(&execInfo.lock);
return true;
}
taosThreadMutexUnlock(&execInfo.lock);
return false;
}
int32_t mndAddtoCheckpointWaitingList(SStreamObj* pStream, int64_t checkpointId) {
SCheckpointCandEntry* pEntry = taosHashGet(execInfo.transMgmt.pWaitingList, &pStream->uid, sizeof(pStream->uid));
if (pEntry == NULL) {
SCheckpointCandEntry entry = {.streamId = pStream->uid,
.checkpointTs = taosGetTimestampMs(),
.checkpointId = checkpointId,
.pName = taosStrdup(pStream->name)};
taosHashPut(execInfo.transMgmt.pWaitingList, &pStream->uid, sizeof(pStream->uid), &entry, sizeof(entry));
int32_t size = taosHashGetSize(execInfo.transMgmt.pWaitingList);
mDebug("stream:%" PRIx64 " add into waiting list due to conflict, ts:%" PRId64 ", total in waitingList:%d",
pStream->uid, entry.checkpointTs, size);
} else {
mDebug("stream:%" PRIx64 " ts:%" PRId64 "already in waiting list, no need to add into", pStream->uid,
pEntry->checkpointTs);
}
return TSDB_CODE_SUCCESS;
}

View File

@ -1152,16 +1152,10 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
// let's decide which step should be executed now
if (pTask->execInfo.step1Start == 0) {
ASSERT(pTask->status.pauseAllowed == false);
int64_t ts = taosGetTimestampMs();
pTask->execInfo.step1Start = ts;
tqDebug("s-task:%s start scan-history stage(step 1), status:%s, step1 startTs:%" PRId64, id, pStatus, ts);
// NOTE: in case of stream task, scan-history data in wal is not allowed to pause
if (pTask->info.fillHistory == 1) {
streamTaskEnablePause(pTask);
}
} else {
if (pTask->execInfo.step2Start == 0) {
tqDebug("s-task:%s continue exec scan-history(step1), original step1 startTs:%" PRId64 ", already elapsed:%.2fs",

View File

@ -212,8 +212,10 @@ int32_t tqScanWalAsync(STQ* pTq, bool ckPause) {
int32_t vgId = TD_VID(pTq->pVnode);
SStreamMeta* pMeta = pTq->pStreamMeta;
bool alreadyRestored = pTq->pVnode->restored;
// do not launch the stream tasks, if it is a follower or not restored vnode.
if (!(vnodeIsRoleLeader(pTq->pVnode) && pTq->pVnode->restored)) {
if (!(vnodeIsRoleLeader(pTq->pVnode) && alreadyRestored)) {
return TSDB_CODE_SUCCESS;
}
@ -255,7 +257,9 @@ int32_t tqScanWalAsync(STQ* pTq, bool ckPause) {
return -1;
}
tqDebug("vgId:%d create msg to start wal scan to launch stream tasks, numOfTasks:%d", vgId, numOfTasks);
tqDebug("vgId:%d create msg to start wal scan to launch stream tasks, numOfTasks:%d, restored:%d", vgId, numOfTasks,
alreadyRestored);
pRunReq->head.vgId = vgId;
pRunReq->streamId = 0;
pRunReq->taskId = STREAM_EXEC_EXTRACT_DATA_IN_WAL_ID;

View File

@ -572,8 +572,13 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockN
if (isEmptyQueryTimeWindow(&w)) {
k += 1;
if (k >= numOfTables) {
break;
} else {
continue;
}
}
// 1. time range check
if (pRecord->firstKey > w.ekey || pRecord->lastKey < w.skey) {

View File

@ -81,7 +81,8 @@ struct STokenBucket {
double quotaCapacity; // available capacity for maximum input size, KiloBytes per Second
double quotaRemain; // not consumed bytes per second
double quotaRate; // number of token per second
int64_t fillTimestamp; // fill timestamp
int64_t tokenFillTimestamp; // fill timestamp
int64_t quotaFillTimestamp; // fill timestamp
};
struct SStreamQueue {

View File

@ -1102,6 +1102,13 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
int32_t vgId = pTask->pMeta->vgId;
int32_t msgId = pTask->execInfo.dispatch;
#if 0
// for test purpose, build the failure case
if (pTask->msgInfo.dispatchMsgType == STREAM_INPUT__CHECKPOINT_TRIGGER) {
pRsp->inputStatus = TASK_INPUT_STATUS__REFUSED;
}
#endif
// follower not handle the dispatch rsp
if ((pTask->pMeta->role == NODE_ROLE_FOLLOWER) || (pTask->status.downstreamReady != 1)) {
stError("s-task:%s vgId:%d is follower or task just re-launched, not handle the dispatch rsp, discard it", id, vgId);
@ -1143,10 +1150,23 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
stWarn("s-task:%s inputQ of downstream task:0x%x(vgId:%d) is full, wait for %dms and retry dispatch", id,
pRsp->downstreamTaskId, pRsp->downstreamNodeId, DISPATCH_RETRY_INTERVAL_MS);
} else if (pRsp->inputStatus == TASK_INPUT_STATUS__REFUSED) {
// todo handle the agg task failure, add test case
if (pTask->msgInfo.dispatchMsgType == STREAM_INPUT__CHECKPOINT_TRIGGER &&
pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
stError("s-task:%s failed to dispatch checkpoint-trigger msg, checkpointId:%" PRId64
", set the current checkpoint failed, and send rsp to mnode",
id, pTask->chkInfo.checkpointingId);
{ // send checkpoint failure msg to mnode directly
pTask->chkInfo.failedId = pTask->chkInfo.checkpointingId; // record the latest failed checkpoint id
pTask->chkInfo.checkpointingId = pTask->chkInfo.checkpointingId;
streamTaskSendCheckpointSourceRsp(pTask);
}
} else {
stError("s-task:%s downstream task:0x%x(vgId:%d) refused the dispatch msg, treat it as success", id,
pRsp->downstreamTaskId, pRsp->downstreamNodeId);
}
}
}
int32_t leftRsp = 0;
if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {

View File

@ -388,32 +388,36 @@ int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t
pBucket->quotaCapacity = quotaRate * MAX_SMOOTH_BURST_RATIO;
pBucket->quotaRemain = pBucket->quotaCapacity;
pBucket->fillTimestamp = taosGetTimestampMs();
pBucket->tokenFillTimestamp = taosGetTimestampMs();
pBucket->quotaFillTimestamp = taosGetTimestampMs();
stDebug("s-task:%s sink quotaRate:%.2fMiB, numRate:%d", id, quotaRate, numRate);
return TSDB_CODE_SUCCESS;
}
static void fillTokenBucket(STokenBucket* pBucket, const char* id) {
int64_t now = taosGetTimestampMs();
int64_t delta = now - pBucket->fillTimestamp;
int64_t deltaToken = now - pBucket->tokenFillTimestamp;
ASSERT(pBucket->numOfToken >= 0);
int32_t incNum = (delta / 1000.0) * pBucket->numRate;
int32_t incNum = (deltaToken / 1000.0) * pBucket->numRate;
if (incNum > 0) {
pBucket->numOfToken = TMIN(pBucket->numOfToken + incNum, pBucket->numCapacity);
pBucket->fillTimestamp = now;
pBucket->tokenFillTimestamp = now;
}
// increase the new available quota as time goes on
double incSize = (delta / 1000.0) * pBucket->quotaRate;
int64_t deltaQuota = now - pBucket->quotaFillTimestamp;
double incSize = (deltaQuota / 1000.0) * pBucket->quotaRate;
if (incSize > 0) {
pBucket->quotaRemain = TMIN(pBucket->quotaRemain + incSize, pBucket->quotaCapacity);
pBucket->fillTimestamp = now;
pBucket->quotaFillTimestamp = now;
}
if (incNum > 0 || incSize > 0) {
stTrace("token/quota available, token:%d inc:%d, quota:%.2fMiB inc:%.3fMiB, ts:%" PRId64 " idle:%" PRId64 "ms, %s",
pBucket->numOfToken, incNum, pBucket->quotaRemain, incSize, now, delta, id);
stTrace("token/quota available, token:%d inc:%d, token_TsDelta:%" PRId64
", quota:%.2fMiB inc:%.3fMiB quotaTs:%" PRId64 " now:%" PRId64 "ms, %s",
pBucket->numOfToken, incNum, deltaToken, pBucket->quotaRemain, incSize, deltaQuota, now, id);
}
}

View File

@ -168,7 +168,6 @@ int32_t streamTaskStartScanHistory(SStreamTask* pTask) {
} else if (level == TASK_LEVEL__AGG) {
if (pTask->info.fillHistory) {
streamSetParamForScanHistory(pTask);
streamTaskEnablePause(pTask);
}
} else if (level == TASK_LEVEL__SINK) {
stDebug("s-task:%s sink task do nothing to handle scan-history", pTask->id.idStr);
@ -345,7 +344,6 @@ int32_t onNormalTaskReady(SStreamTask* pTask) {
stDebug("s-task:%s level:%d status:%s sched-status:%d", id, pTask->info.taskLevel, p, pTask->status.schedStatus);
}
streamTaskEnablePause(pTask);
return TSDB_CODE_SUCCESS;
}
@ -659,9 +657,6 @@ int32_t streamProcessScanHistoryFinishRsp(SStreamTask* pTask) {
streamMetaCommit(pMeta);
streamMetaWUnLock(pMeta);
// history data scan in the stream time window finished, now let's enable the pause
streamTaskEnablePause(pTask);
// for source tasks, let's continue execute.
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
streamSchedExec(pTask);
@ -1040,11 +1035,6 @@ void streamTaskResume(SStreamTask* pTask) {
}
}
void streamTaskEnablePause(SStreamTask* pTask) {
stDebug("s-task:%s enable task pause", pTask->id.idStr);
pTask->status.pauseAllowed = 1;
}
static void displayStatusInfo(SStreamMeta* pMeta, SHashObj* pTaskSet, bool succ) {
int32_t vgId = pMeta->vgId;
void* pIter = NULL;

View File

@ -1061,7 +1061,6 @@ _end:
}
int32_t streamStatePutParName(SStreamState* pState, int64_t groupId, const char tbname[TSDB_TABLE_NAME_LEN]) {
stDebug("try to write to cf parname");
#ifdef USE_ROCKSDB
if (tSimpleHashGetSize(pState->parNameMap) > MAX_TABLE_NAME_NUM) {
if (tSimpleHashGet(pState->parNameMap, &groupId, sizeof(int64_t)) == NULL) {

View File

@ -269,6 +269,7 @@ int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event) {
pTask->id.idStr, pSM->current.name, GET_EVT_NAME(evt));
taosMsleep(100);
} else {
// no active event trans exists, handle this event directly
pTrans = streamTaskFindTransform(pSM->current.state, event);
if (pTrans == NULL) {
stDebug("s-task:%s failed to handle event:%s", pTask->id.idStr, GET_EVT_NAME(event));
@ -451,60 +452,43 @@ int32_t initStateTransferTable() {
return TSDB_CODE_SUCCESS;
}
//clang-format off
void doInitStateTransferTable(void) {
streamTaskSMTrans = taosArrayInit(8, sizeof(STaskStateTrans));
// initialization event handle
STaskStateTrans trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__READY, TASK_EVENT_INIT,
streamTaskInitStatus, onNormalTaskReady, false, false);
STaskStateTrans trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__READY, TASK_EVENT_INIT, streamTaskInitStatus, onNormalTaskReady, false, false);
taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__SCAN_HISTORY, TASK_EVENT_INIT_SCANHIST,
streamTaskInitStatus, onScanhistoryTaskReady, false, false);
trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__SCAN_HISTORY, TASK_EVENT_INIT_SCANHIST, streamTaskInitStatus, onScanhistoryTaskReady, false, false);
taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__STREAM_SCAN_HISTORY, TASK_EVENT_INIT_STREAM_SCANHIST,
streamTaskInitStatus, onScanhistoryTaskReady, false, false);
trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__STREAM_SCAN_HISTORY, TASK_EVENT_INIT_STREAM_SCANHIST, streamTaskInitStatus, onScanhistoryTaskReady, false, false);
taosArrayPush(streamTaskSMTrans, &trans);
// scan-history related event
trans = createStateTransform(TASK_STATUS__SCAN_HISTORY, TASK_STATUS__READY, TASK_EVENT_SCANHIST_DONE, NULL, NULL,
NULL, true);
trans = createStateTransform(TASK_STATUS__SCAN_HISTORY, TASK_STATUS__READY, TASK_EVENT_SCANHIST_DONE, NULL, NULL, NULL, true);
taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS__STREAM_SCAN_HISTORY, TASK_STATUS__READY, TASK_EVENT_SCANHIST_DONE, NULL,
NULL, NULL, true);
trans = createStateTransform(TASK_STATUS__STREAM_SCAN_HISTORY, TASK_STATUS__READY, TASK_EVENT_SCANHIST_DONE, NULL, NULL, NULL, true);
taosArrayPush(streamTaskSMTrans, &trans);
// halt stream task, from other task status
trans = createStateTransform(TASK_STATUS__READY, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL,
streamTaskKeepCurrentVerInWal, NULL, true);
trans = createStateTransform(TASK_STATUS__READY, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, streamTaskKeepCurrentVerInWal, NULL, true);
taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS__HALT, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL,
streamTaskKeepCurrentVerInWal, NULL, true);
trans = createStateTransform(TASK_STATUS__HALT, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, streamTaskKeepCurrentVerInWal, NULL, true);
taosArrayPush(streamTaskSMTrans, &trans);
SAttachedEventInfo info = {.status = TASK_STATUS__READY, .event = TASK_EVENT_HALT};
trans = createStateTransform(TASK_STATUS__STREAM_SCAN_HISTORY, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL,
streamTaskKeepCurrentVerInWal, &info, true);
taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, streamTaskKeepCurrentVerInWal,
&info, true);
trans = createStateTransform(TASK_STATUS__STREAM_SCAN_HISTORY, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, streamTaskKeepCurrentVerInWal, &info, true);
taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS__PAUSE, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL,
streamTaskKeepCurrentVerInWal, NULL, true);
trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, streamTaskKeepCurrentVerInWal, &info, true);
taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS__PAUSE, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, streamTaskKeepCurrentVerInWal, NULL, true);
taosArrayPush(streamTaskSMTrans, &trans);
// checkpoint related event
trans = createStateTransform(TASK_STATUS__READY, TASK_STATUS__CK, TASK_EVENT_GEN_CHECKPOINT, NULL,
streamTaskDoCheckpoint, NULL, true);
trans = createStateTransform(TASK_STATUS__READY, TASK_STATUS__CK, TASK_EVENT_GEN_CHECKPOINT, NULL, streamTaskDoCheckpoint, NULL, true);
taosArrayPush(streamTaskSMTrans, &trans);
trans =
createStateTransform(TASK_STATUS__CK, TASK_STATUS__READY, TASK_EVENT_CHECKPOINT_DONE, NULL, NULL, NULL, true);
trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__READY, TASK_EVENT_CHECKPOINT_DONE, NULL, NULL, NULL, true);
taosArrayPush(streamTaskSMTrans, &trans);
// pause & resume related event handle
@ -572,3 +556,4 @@ void doInitStateTransferTable(void) {
trans = createStateTransform(TASK_STATUS__STREAM_SCAN_HISTORY, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL, true);
taosArrayPush(streamTaskSMTrans, &trans);
}
//clang-format on