Merge pull request #23785 from taosdata/fix/liaohj

other: merge fix from 3.0 to main.
This commit is contained in:
Haojun Liao 2023-11-24 14:43:02 +08:00 committed by GitHub
commit d51baab9c2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 340 additions and 234 deletions

View File

@ -3112,7 +3112,7 @@ typedef struct {
int32_t tSerializeSMDropStreamReq(void* buf, int32_t bufLen, const SMDropStreamReq* pReq);
int32_t tDeserializeSMDropStreamReq(void* buf, int32_t bufLen, SMDropStreamReq* pReq);
void tFreeSMDropStreamReq(SMDropStreamReq* pReq);
void tFreeMDropStreamReq(SMDropStreamReq* pReq);
typedef struct {
char name[TSDB_STREAM_FNAME_LEN];

View File

@ -150,19 +150,6 @@ typedef struct {
int32_t colNum;
} SMetaStbStats;
// void tqReaderSetColIdList(STqReader *pReader, SArray *pColIdList);
// int32_t tqReaderSetTbUidList(STqReader *pReader, const SArray *tbUidList);
// int32_t tqReaderAddTbUidList(STqReader *pReader, const SArray *pTableUidList);
// int32_t tqReaderRemoveTbUidList(STqReader *pReader, const SArray *tbUidList);
// bool tqReaderIsQueriedTable(STqReader* pReader, uint64_t uid);
// bool tqCurrentBlockConsumed(const STqReader* pReader);
// int32_t tqReaderSeek(STqReader *pReader, int64_t ver, const char *id);
// bool tqNextBlockInWal(STqReader* pReader, const char* idstr);
// bool tqNextBlockImpl(STqReader *pReader, const char* idstr);
// int32_t getTableInfoFromSnapshot(SSnapContext *ctx, void **pBuf, int32_t *contLen, int16_t *type, int64_t
// *uid); SMetaTableInfo getMetaTableInfoFromSnapshot(SSnapContext *ctx); int32_t setForSnapShot(SSnapContext
// *ctx, int64_t uid); int32_t destroySnapContext(SSnapContext *ctx);
// clang-format off
/*-------------------------------------------------new api format---------------------------------------------------*/
typedef struct TsdReader {
@ -197,27 +184,6 @@ typedef struct SStoreCacheReader {
// clang-format on
/*------------------------------------------------------------------------------------------------------------------*/
/*
void tqReaderSetColIdList(STqReader *pReader, SArray *pColIdList);
int32_t tqReaderSetTbUidList(STqReader *pReader, const SArray *tbUidList);
int32_t tqReaderAddTbUidList(STqReader *pReader, const SArray *pTableUidList);
int32_t tqReaderRemoveTbUidList(STqReader *pReader, const SArray *tbUidList);
bool tqReaderIsQueriedTable(STqReader* pReader, uint64_t uid);
bool tqCurrentBlockConsumed(const STqReader* pReader);
int32_t tqReaderSeek(STqReader *pReader, int64_t ver, const char *id);
bool tqNextBlockInWal(STqReader* pReader, const char* idstr);
bool tqNextBlockImpl(STqReader *pReader, const char* idstr);
int32_t tqRetrieveDataBlock(STqReader *pReader, SSDataBlock **pRes, const char* idstr);
STqReader *tqReaderOpen(void *pVnode);
void tqReaderClose(STqReader *);
int32_t tqReaderSetSubmitMsg(STqReader *pReader, void *msgStr, int32_t msgLen, int64_t ver);
bool tqNextDataBlockFilterOut(STqReader *pReader, SHashObj *filterOutUids);
SWalReader* tqGetWalReader(STqReader* pReader);
int32_t tqRetrieveTaosxBlock(STqReader *pReader, SArray *blocks, SArray *schemas, SSubmitTbData **pSubmitTbDataRet);
*/
// todo rename
typedef struct SStoreTqReader {
struct STqReader* (*tqReaderOpen)();
@ -281,28 +247,18 @@ typedef struct SStoreMeta {
void* (*storeGetIndexInfo)();
void* (*getInvertIndex)(void* pVnode);
int32_t (*getChildTableList)(
void* pVnode, int64_t suid,
SArray* list); // support filter and non-filter cases. [vnodeGetCtbIdList & vnodeGetCtbIdListByFilter]
int32_t (*storeGetTableList)(void* pVnode, int8_t type, SArray* pList); // vnodeGetStbIdList & vnodeGetAllTableList
// support filter and non-filter cases. [vnodeGetCtbIdList & vnodeGetCtbIdListByFilter]
int32_t (*getChildTableList)( void* pVnode, int64_t suid, SArray* list);
int32_t (*storeGetTableList)(void* pVnode, int8_t type, SArray* pList);
void* storeGetVersionRange;
void* storeGetLastTimestamp;
int32_t (*getTableSchema)(void* pVnode, int64_t uid, STSchema** pSchema, int64_t* suid); // tsdbGetTableSchema
int32_t (*getNumOfChildTables)( void* pVnode, int64_t uid, int64_t* numOfTables, int32_t* numOfCols);
void (*getBasicInfo)(void* pVnode, const char** dbname, int32_t* vgId, int64_t* numOfTables, int64_t* numOfNormalTables);
// db name, vgId, numOfTables, numOfSTables
int32_t (*getNumOfChildTables)(
void* pVnode, int64_t uid, int64_t* numOfTables,
int32_t* numOfCols); // int32_t metaGetStbStats(SMeta *pMeta, int64_t uid, SMetaStbStats *pInfo);
void (*getBasicInfo)(void* pVnode, const char** dbname, int32_t* vgId, int64_t* numOfTables,
int64_t* numOfNormalTables); // vnodeGetInfo(void *pVnode, const char **dbname, int32_t *vgId) &
// metaGetTbNum(SMeta *pMeta) & metaGetNtbNum(SMeta *pMeta);
int64_t (*getNumOfRowsInMem)(void* pVnode);
/**
int32_t vnodeGetCtbIdList(void *pVnode, int64_t suid, SArray *list);
int32_t vnodeGetCtbIdListByFilter(void *pVnode, int64_t suid, SArray *list, bool (*filter)(void *arg), void *arg);
int32_t vnodeGetStbIdList(void *pVnode, int64_t suid, SArray *list);
*/
SMCtbCursor* (*openCtbCursor)(void *pVnode, tb_uid_t uid, int lock);
int32_t (*resumeCtbCursor)(SMCtbCursor* pCtbCur, int8_t first);
void (*pauseCtbCursor)(SMCtbCursor* pCtbCur);

View File

@ -314,7 +314,6 @@ typedef struct SStreamStatus {
int8_t schedStatus;
int8_t keepTaskStatus;
bool appendTranstateBlock; // has append the transfer state data block already, todo: remove it
int8_t pauseAllowed; // allowed task status to be set to be paused
int32_t timerActive; // timer is active
int32_t inScanHistorySentinel;
} SStreamStatus;
@ -787,7 +786,6 @@ int32_t streamQueueGetNumOfItems(const SStreamQueue* pQueue);
int32_t streamRestoreParam(SStreamTask* pTask);
void streamTaskPause(SStreamTask* pTask, SStreamMeta* pMeta);
void streamTaskResume(SStreamTask* pTask);
void streamTaskEnablePause(SStreamTask* pTask);
int32_t streamTaskSetUpstreamInfo(SStreamTask* pTask, const SStreamTask* pUpstreamTask);
void streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet);
void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet);

View File

@ -7152,7 +7152,7 @@ int32_t tDeserializeSMDropStreamReq(void *buf, int32_t bufLen, SMDropStreamReq *
return 0;
}
void tFreeSMDropStreamReq(SMDropStreamReq *pReq) {
void tFreeMDropStreamReq(SMDropStreamReq *pReq) {
FREESQL();
}

View File

@ -649,8 +649,7 @@ typedef struct SStreamConf {
} SStreamConf;
typedef struct {
char name[TSDB_STREAM_FNAME_LEN];
// ctl
char name[TSDB_STREAM_FNAME_LEN];
SRWLatch lock;
// create info

View File

@ -22,17 +22,37 @@
extern "C" {
#endif
int32_t mndInitStream(SMnode *pMnode);
void mndCleanupStream(SMnode *pMnode);
typedef struct SStreamTransInfo {
int64_t startTime;
int32_t transId;
const char *name;
} SStreamTransInfo;
typedef struct SStreamTransMgmt {
SHashObj *pDBTrans;
} SStreamTransMgmt;
typedef struct SStreamExecInfo {
SArray *pNodeList;
int64_t ts; // snapshot ts
SStreamTransMgmt transMgmt;
int64_t activeCheckpoint; // active check point id
SHashObj * pTaskMap;
SArray * pTaskList;
TdThreadMutex lock;
} SStreamExecInfo;
extern SStreamExecInfo execInfo;
int32_t mndInitStream(SMnode *pMnode);
void mndCleanupStream(SMnode *pMnode);
SStreamObj *mndAcquireStream(SMnode *pMnode, char *streamName);
void mndReleaseStream(SMnode *pMnode, SStreamObj *pStream);
int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb);
int32_t mndPersistStream(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
SSdbRaw *mndStreamActionEncode(SStreamObj *pStream);
SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw);
int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb);
int32_t mndPersistStream(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
int32_t mndStreamRegisterTrans(STrans* pTrans, const char* pName, const char* pSrcDb, const char* pDstDb);
bool streamTransConflictOtherTrans(SMnode *pMnode, const char *pSrcDb, const char *pDstDb);
// for sma
// TODO refactor

View File

@ -28,7 +28,7 @@
extern bool tsDeployOnSnode;
static int32_t doAddSinkTask(SStreamObj* pStream, SArray* pTaskList, SMnode* pMnode, int32_t vgId, SVgObj* pVgroup,
SEpSet* pEpset, bool isFillhistory);
SEpSet* pEpset, bool isFillhistory);
int32_t mndConvertRsmaTask(char** pDst, int32_t* pDstLen, const char* ast, int64_t uid, int8_t triggerType,
int64_t watermark, int64_t deleteMark) {

View File

@ -34,7 +34,13 @@
#define MND_STREAM_VER_NUMBER 4
#define MND_STREAM_RESERVE_SIZE 64
#define MND_STREAM_MAX_NUM 60
#define MND_STREAM_CHECKPOINT_NAME "stream-checkpoint"
#define MND_STREAM_CHECKPOINT_NAME "stream-checkpoint"
#define MND_STREAM_PAUSE_NAME "stream-pause"
#define MND_STREAM_RESUME_NAME "stream-resume"
#define MND_STREAM_DROP_NAME "stream-drop"
#define MND_STREAM_TASK_RESET_NAME "stream-task-reset"
#define MND_STREAM_TASK_UPDATE_NAME "stream-task-update"
typedef struct SNodeEntry {
int32_t nodeId;
@ -43,22 +49,13 @@ typedef struct SNodeEntry {
int64_t hbTimestamp; // second
} SNodeEntry;
typedef struct SStreamExecInfo {
SArray *pNodeList;
int64_t ts; // snapshot ts
int64_t activeCheckpoint; // active check point id
SHashObj * pTaskMap;
SArray * pTaskList;
TdThreadMutex lock;
} SStreamExecInfo;
typedef struct SVgroupChangeInfo {
SHashObj *pDBMap;
SArray * pUpdateNodeList; // SArray<SNodeUpdateInfo>
} SVgroupChangeInfo;
static int32_t mndNodeCheckSentinel = 0;
static SStreamExecInfo execInfo;
static int32_t mndNodeCheckSentinel = 0;
SStreamExecInfo execInfo;
static int32_t mndStreamActionInsert(SSdb *pSdb, SStreamObj *pStream);
static int32_t mndStreamActionDelete(SSdb *pSdb, SStreamObj *pStream);
@ -83,17 +80,20 @@ static SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady);
static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeList, const SArray *pNodeList);
static STrans *doCreateTrans(SMnode *pMnode, SStreamObj *pStream, const char *name);
static STrans *doCreateTrans(SMnode *pMnode, SStreamObj *pStream, SRpcMsg *pReq, const char *name, const char* pMsg);
static int32_t mndPersistTransLog(SStreamObj *pStream, STrans *pTrans);
static void initTransAction(STransAction *pAction, void *pCont, int32_t contLen, int32_t msgType, const SEpSet *pEpset,
int32_t retryCode);
static int32_t createStreamUpdateTrans(SStreamObj *pStream, SVgroupChangeInfo *pInfo, STrans *pTrans);
static void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode);
static void keepStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode);
static void saveStreamTasksInfo(SStreamObj *pStream, SStreamExecInfo *pExecNode);
static int32_t removeExpirednodeEntryAndTask(SArray *pNodeSnapshot);
static int32_t doKillActiveCheckpointTrans(SMnode *pMnode);
static int32_t killActiveCheckpointTrans(SMnode *pMnode, const char* pDbName, size_t len);
static int32_t setNodeEpsetExpiredFlag(const SArray *pNodeList);
static SSdbRaw *mndStreamActionEncode(SStreamObj *pStream);
static SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw);
int32_t mndInitStream(SMnode *pMnode) {
SSdbTable table = {
.sdbType = SDB_STREAM,
@ -133,8 +133,11 @@ int32_t mndInitStream(SMnode *pMnode) {
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_STREAM_TASKS, mndCancelGetNextStreamTask);
taosThreadMutexInit(&execInfo.lock, NULL);
execInfo.pTaskMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_NO_LOCK);
_hash_fn_t fn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR);
execInfo.pTaskList = taosArrayInit(4, sizeof(STaskId));
execInfo.pTaskMap = taosHashInit(64, fn, true, HASH_NO_LOCK);
execInfo.transMgmt.pDBTrans = taosHashInit(32, fn, true, HASH_NO_LOCK);
return sdbSetTable(pMnode->pSdb, table);
}
@ -142,6 +145,7 @@ int32_t mndInitStream(SMnode *pMnode) {
void mndCleanupStream(SMnode *pMnode) {
taosArrayDestroy(execInfo.pTaskList);
taosHashCleanup(execInfo.pTaskMap);
taosHashCleanup(execInfo.transMgmt.pDBTrans);
taosThreadMutexDestroy(&execInfo.lock);
mDebug("mnd stream exec info cleanup");
}
@ -335,7 +339,7 @@ static int32_t mndStreamGetPlanString(const char *ast, int8_t triggerType, int64
.pAstRoot = pAst,
.topicQuery = false,
.streamQuery = true,
.triggerType = triggerType == STREAM_TRIGGER_MAX_DELAY ? STREAM_TRIGGER_WINDOW_CLOSE : triggerType,
.triggerType = (triggerType == STREAM_TRIGGER_MAX_DELAY) ? STREAM_TRIGGER_WINDOW_CLOSE : triggerType,
.watermark = watermark,
};
code = qCreateQueryPlan(&cxt, &pPlan, NULL);
@ -720,6 +724,34 @@ int32_t mndDropStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream)
return 0;
}
static int32_t checkForNumOfStreams(SMnode *pMnode, SStreamObj *pStreamObj) { // check for number of existed tasks
int32_t numOfStream = 0;
SStreamObj *pStream = NULL;
void *pIter = NULL;
while ((pIter = sdbFetch(pMnode->pSdb, SDB_STREAM, pIter, (void **)&pStream)) != NULL) {
if (pStream->sourceDbUid == pStreamObj->sourceDbUid) {
++numOfStream;
}
sdbRelease(pMnode->pSdb, pStream);
if (numOfStream > MND_STREAM_MAX_NUM) {
mError("too many streams, no more than %d for each database", MND_STREAM_MAX_NUM);
sdbCancelFetch(pMnode->pSdb, pIter);
return TSDB_CODE_MND_TOO_MANY_STREAMS;
}
if (pStream->targetStbUid == pStreamObj->targetStbUid) {
mError("Cannot write the same stable as other stream:%s", pStream->name);
sdbCancelFetch(pMnode->pSdb, pIter);
return TSDB_CODE_MND_INVALID_TARGET_TABLE;
}
}
return TSDB_CODE_SUCCESS;
}
static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
SMnode * pMnode = pReq->info.node;
int32_t code = -1;
@ -732,6 +764,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
terrno = TSDB_CODE_INVALID_MSG;
goto _OVER;
}
#ifdef WINDOWS
terrno = TSDB_CODE_MND_INVALID_PLATFORM;
goto _OVER;
@ -772,42 +805,9 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
goto _OVER;
}
{
int32_t numOfStream = 0;
SStreamObj *pStream = NULL;
void * pIter = NULL;
while (1) {
pIter = sdbFetch(pMnode->pSdb, SDB_STREAM, pIter, (void **)&pStream);
if (pIter == NULL) {
if (numOfStream > MND_STREAM_MAX_NUM) {
mError("too many streams, no more than %d for each database", MND_STREAM_MAX_NUM);
terrno = TSDB_CODE_MND_TOO_MANY_STREAMS;
goto _OVER;
}
break;
}
if (pStream->sourceDbUid == streamObj.sourceDbUid) {
++numOfStream;
}
sdbRelease(pMnode->pSdb, pStream);
if (numOfStream > MND_STREAM_MAX_NUM) {
mError("too many streams, no more than %d for each database", MND_STREAM_MAX_NUM);
terrno = TSDB_CODE_MND_TOO_MANY_STREAMS;
sdbCancelFetch(pMnode->pSdb, pIter);
goto _OVER;
}
if (pStream->targetStbUid == streamObj.targetStbUid) {
mError("Cannot write the same stable as other stream:%s", pStream->name);
terrno = TSDB_CODE_MND_INVALID_TARGET_TABLE;
sdbCancelFetch(pMnode->pSdb, pIter);
goto _OVER;
}
}
code = checkForNumOfStreams(pMnode, &streamObj);
if (code != TSDB_CODE_SUCCESS) {
goto _OVER;
}
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB_INSIDE, pReq, "create-stream");
@ -866,7 +866,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
taosThreadMutexLock(&execInfo.lock);
mDebug("stream tasks register into node list");
keepStreamTasksInBuf(&streamObj, &execInfo);
saveStreamTasksInfo(&streamObj, &execInfo);
taosThreadMutexUnlock(&execInfo.lock);
code = TSDB_CODE_ACTION_IN_PROGRESS;
@ -893,7 +893,6 @@ _OVER:
}
mndReleaseStream(pMnode, pStream);
tFreeSCMCreateStreamReq(&createStreamReq);
tFreeStreamObj(&streamObj);
if(sql != NULL){
@ -1268,7 +1267,7 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) {
SMStreamDoCheckpointMsg *pMsg = (SMStreamDoCheckpointMsg *)pReq->pCont;
int64_t checkpointId = pMsg->checkpointId;
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, NULL, MND_STREAM_CHECKPOINT_NAME);
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, NULL, MND_STREAM_CHECKPOINT_NAME);
if (pTrans == NULL) {
mError("failed to trigger checkpoint, reason: %s", tstrerror(TSDB_CODE_OUT_OF_MEMORY));
return -1;
@ -1277,7 +1276,8 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) {
mDebug("start to trigger checkpoint, checkpointId: %" PRId64, checkpointId);
const char *pDb = mndGetStreamDB(pMnode);
mndTransSetDbName(pTrans, pDb, "checkpoint");
mndTransSetDbName(pTrans, pDb, pDb);
mndStreamRegisterTrans(pTrans, MND_STREAM_CHECKPOINT_NAME, pDb, pDb);
taosMemoryFree((void *)pDb);
if (mndTransCheckConflict(pMnode, pTrans) != 0) {
@ -1329,46 +1329,56 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
if (dropReq.igNotExists) {
mInfo("stream:%s not exist, ignore not exist is set", dropReq.name);
sdbRelease(pMnode->pSdb, pStream);
tFreeSMDropStreamReq(&dropReq);
tFreeMDropStreamReq(&dropReq);
return 0;
} else {
terrno = TSDB_CODE_MND_STREAM_NOT_EXIST;
mError("stream:%s not exist failed to drop", dropReq.name);
tFreeSMDropStreamReq(&dropReq);
tFreeMDropStreamReq(&dropReq);
return -1;
}
}
if (mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pStream->targetDb) != 0) {
sdbRelease(pMnode->pSdb, pStream);
tFreeSMDropStreamReq(&dropReq);
tFreeMDropStreamReq(&dropReq);
return -1;
}
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pReq, "drop-stream");
// check if it is conflict with other trans in both sourceDb and targetDb.
bool conflict = streamTransConflictOtherTrans(pMnode, pStream->sourceDb, pStream->targetDb);
if (conflict) {
sdbRelease(pMnode->pSdb, pStream);
tFreeMDropStreamReq(&dropReq);
return -1;
}
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pReq, MND_STREAM_DROP_NAME);
if (pTrans == NULL) {
mError("stream:%s, failed to drop since %s", dropReq.name, terrstr());
sdbRelease(pMnode->pSdb, pStream);
tFreeSMDropStreamReq(&dropReq);
tFreeMDropStreamReq(&dropReq);
return -1;
}
mInfo("trans:%d, used to drop stream:%s", pTrans->id, dropReq.name);
mInfo("trans:%d used to drop stream:%s", pTrans->id, dropReq.name);
mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetDb);
if (mndTransCheckConflict(pMnode, pTrans) != 0) {
sdbRelease(pMnode->pSdb, pStream);
mndTransDrop(pTrans);
tFreeSMDropStreamReq(&dropReq);
tFreeMDropStreamReq(&dropReq);
return -1;
}
int32_t code = mndStreamRegisterTrans(pTrans, MND_STREAM_DROP_NAME, pStream->sourceDb, pStream->targetDb);
// drop all tasks
if (mndDropStreamTasks(pMnode, pTrans, pStream) < 0) {
mError("stream:%s, failed to drop task since %s", dropReq.name, terrstr());
sdbRelease(pMnode->pSdb, pStream);
mndTransDrop(pTrans);
tFreeSMDropStreamReq(&dropReq);
tFreeMDropStreamReq(&dropReq);
return -1;
}
@ -1376,7 +1386,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
if (mndPersistDropStreamLog(pMnode, pTrans, pStream) < 0) {
sdbRelease(pMnode->pSdb, pStream);
mndTransDrop(pTrans);
tFreeSMDropStreamReq(&dropReq);
tFreeMDropStreamReq(&dropReq);
return -1;
}
@ -1384,7 +1394,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
mError("trans:%d, failed to prepare drop stream trans since %s", pTrans->id, terrstr());
sdbRelease(pMnode->pSdb, pStream);
mndTransDrop(pTrans);
tFreeSMDropStreamReq(&dropReq);
tFreeMDropStreamReq(&dropReq);
return -1;
}
@ -1392,13 +1402,12 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
SName name = {0};
tNameFromString(&name, dropReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
// reuse this function for stream
auditRecord(pReq, pMnode->clusterId, "dropStream", "", name.dbname, dropReq.sql, dropReq.sqlLen);
sdbRelease(pMnode->pSdb, pStream);
mndTransDrop(pTrans);
tFreeSMDropStreamReq(&dropReq);
tFreeMDropStreamReq(&dropReq);
return TSDB_CODE_ACTION_IN_PROGRESS;
}
@ -1814,6 +1823,13 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) {
return -1;
}
// check if it is conflict with other trans in both sourceDb and targetDb.
bool conflict = streamTransConflictOtherTrans(pMnode, pStream->sourceDb, pStream->targetDb);
if (conflict) {
sdbRelease(pMnode->pSdb, pStream);
return -1;
}
bool updated = taskNodeIsUpdated(pMnode);
if (updated) {
mError("tasks are not ready for pause, node update detected");
@ -1822,7 +1838,7 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) {
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pReq, "pause-stream");
if (pTrans == NULL) {
mError("stream:%s, failed to pause stream since %s", pauseReq.name, terrstr());
mError("stream:%s failed to pause stream since %s", pauseReq.name, terrstr());
sdbRelease(pMnode->pSdb, pStream);
return -1;
}
@ -1836,7 +1852,9 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) {
return -1;
}
// pause all tasks
int32_t code = mndStreamRegisterTrans(pTrans, MND_STREAM_PAUSE_NAME, pStream->sourceDb, pStream->targetDb);
// if nodeUpdate happened, not send pause trans
if (mndPauseAllStreamTasks(pMnode, pTrans, pStream) < 0) {
mError("stream:%s, failed to pause task since %s", pauseReq.name, terrstr());
sdbRelease(pMnode->pSdb, pStream);
@ -1940,13 +1958,21 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) {
return -1;
}
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pReq, "pause-stream");
if (pTrans == NULL) {
mError("stream:%s, failed to pause stream since %s", pauseReq.name, terrstr());
// check if it is conflict with other trans in both sourceDb and targetDb.
bool conflict = streamTransConflictOtherTrans(pMnode, pStream->sourceDb, pStream->targetDb);
if (conflict) {
sdbRelease(pMnode->pSdb, pStream);
return -1;
}
mInfo("trans:%d, used to pause stream:%s", pTrans->id, pauseReq.name);
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pReq, MND_STREAM_RESUME_NAME);
if (pTrans == NULL) {
mError("stream:%s, failed to resume stream since %s", pauseReq.name, terrstr());
sdbRelease(pMnode->pSdb, pStream);
return -1;
}
mInfo("trans:%d used to resume stream:%s", pTrans->id, pauseReq.name);
mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetDb);
if (mndTransCheckConflict(pMnode, pTrans) != 0) {
@ -1955,6 +1981,8 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) {
return -1;
}
int32_t code = mndStreamRegisterTrans(pTrans, MND_STREAM_RESUME_NAME, pStream->sourceDb, pStream->targetDb);
// resume all tasks
if (mndResumeAllStreamTasks(pTrans, pMnode, pStream, pauseReq.igUntreated) < 0) {
mError("stream:%s, failed to drop task since %s", pauseReq.name, terrstr());
@ -2219,7 +2247,7 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange
// here create only one trans
if (pTrans == NULL) {
pTrans = doCreateTrans(pMnode, pStream, "stream-task-update");
pTrans = doCreateTrans(pMnode, pStream, NULL, MND_STREAM_TASK_UPDATE_NAME, "update task epsets");
if (pTrans == NULL) {
sdbRelease(pSdb, pStream);
sdbCancelFetch(pSdb, pIter);
@ -2329,7 +2357,7 @@ static void doExtractTasksFromStream(SMnode *pMnode) {
break;
}
keepStreamTasksInBuf(pStream, &execInfo);
saveStreamTasksInfo(pStream, &execInfo);
sdbRelease(pSdb, pStream);
}
}
@ -2413,6 +2441,17 @@ int32_t removeExpirednodeEntryAndTask(SArray *pNodeSnapshot) {
return 0;
}
static void killAllCheckpointTrans(SMnode* pMnode, SVgroupChangeInfo* pChangeInfo) {
void* pIter = NULL;
while((pIter = taosHashIterate(pChangeInfo->pDBMap, pIter)) != NULL) {
char* pDb = (char*) pIter;
size_t len = 0;
void* pKey = taosHashGetKey(pDb, &len);
killActiveCheckpointTrans(pMnode, pKey, len);
}
}
// this function runs by only one thread, so it is not multi-thread safe
static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) {
int32_t code = 0;
@ -2454,7 +2493,8 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) {
SVgroupChangeInfo changeInfo = mndFindChangedNodeInfo(pMnode, execInfo.pNodeList, pNodeSnapshot);
if (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0) {
// kill current active checkpoint transaction, since the transaction is vnode wide.
doKillActiveCheckpointTrans(pMnode);
killAllCheckpointTrans(pMnode, &changeInfo);
code = mndProcessVgroupChange(pMnode, &changeInfo);
// keep the new vnode snapshot
@ -2500,7 +2540,7 @@ static int32_t mndProcessNodeCheck(SRpcMsg *pReq) {
return 0;
}
void keepStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode) {
void saveStreamTasksInfo(SStreamObj *pStream, SStreamExecInfo *pExecNode) {
int32_t level = taosArrayGetSize(pStream->tasks);
for (int32_t i = 0; i < level; i++) {
@ -2543,8 +2583,9 @@ void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode) {
STaskId *pId = taosArrayGet(pExecNode->pTaskList, k);
if (pId->taskId == id.taskId && pId->streamId == id.streamId) {
taosArrayRemove(pExecNode->pTaskList, k);
mInfo("s-task:0x%x removed from buffer, remain:%d", (int32_t)id.taskId,
(int32_t)taosArrayGetSize(pExecNode->pTaskList));
int32_t num = taosArrayGetSize(pExecNode->pTaskList);
mInfo("s-task:0x%x removed from buffer, remain:%d", (int32_t)id.taskId, num);
break;
}
}
@ -2555,15 +2596,15 @@ void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode) {
ASSERT(taosHashGetSize(pExecNode->pTaskMap) == taosArrayGetSize(pExecNode->pTaskList));
}
STrans *doCreateTrans(SMnode *pMnode, SStreamObj *pStream, const char *name) {
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, NULL, name);
STrans *doCreateTrans(SMnode *pMnode, SStreamObj *pStream, SRpcMsg *pReq, const char *name, const char* pMsg) {
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pReq, name);
if (pTrans == NULL) {
mError("failed to build trans:%s, reason: %s", name, tstrerror(TSDB_CODE_OUT_OF_MEMORY));
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
mDebug("start to build stream:0x%" PRIx64 " task DAG update", pStream->uid);
mDebug("s-task:0x%"PRIx64" start to build trans %s", pStream->uid, pMsg);
mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetDb);
if (mndTransCheckConflict(pMnode, pTrans) != 0) {
@ -2578,7 +2619,7 @@ STrans *doCreateTrans(SMnode *pMnode, SStreamObj *pStream, const char *name) {
}
int32_t createStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream) {
STrans *pTrans = doCreateTrans(pMnode, pStream, "stream-task-reset");
STrans *pTrans = doCreateTrans(pMnode, pStream, NULL, MND_STREAM_TASK_RESET_NAME, " reset from failed checkpoint");
if (pTrans == NULL) {
return terrno;
}
@ -2642,43 +2683,36 @@ int32_t createStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream) {
return TSDB_CODE_ACTION_IN_PROGRESS;
}
int32_t doKillActiveCheckpointTrans(SMnode *pMnode) {
int32_t transId = 0;
SSdb * pSdb = pMnode->pSdb;
STrans *pTrans = NULL;
void * pIter = NULL;
while (1) {
pIter = sdbFetch(pSdb, SDB_TRANS, pIter, (void **)&pTrans);
if (pIter == NULL) {
break;
}
if (strncmp(pTrans->opername, MND_STREAM_CHECKPOINT_NAME, tListLen(pTrans->opername) - 1) == 0) {
transId = pTrans->id;
sdbRelease(pSdb, pTrans);
sdbCancelFetch(pSdb, pIter);
break;
}
sdbRelease(pSdb, pTrans);
}
if (transId == 0) {
mDebug("failed to find the checkpoint trans, reset not executed");
int32_t killActiveCheckpointTrans(SMnode *pMnode, const char* pDBName, size_t len) {
// data in the hash table will be removed automatically, no need to remove it here.
SStreamTransInfo* pTransInfo = taosHashGet(execInfo.transMgmt.pDBTrans, pDBName, len);
if (pTransInfo == NULL) {
return TSDB_CODE_SUCCESS;
}
pTrans = mndAcquireTrans(pMnode, transId);
mInfo("kill checkpoint trans:%d", transId);
// not checkpoint trans, ignore
if (strcmp(pTransInfo->name, MND_STREAM_CHECKPOINT_NAME) != 0) {
mDebug("not checkpoint trans, not kill it, name:%s, transId:%d", pTransInfo->name, pTransInfo->transId);
return TSDB_CODE_SUCCESS;
}
STrans* pTrans = mndAcquireTrans(pMnode, pTransInfo->transId);
if (pTrans != NULL) {
mInfo("kill checkpoint transId:%d in Db:%s", pTransInfo->transId, pDBName);
mndKillTrans(pMnode, pTrans);
mndReleaseTrans(pMnode, pTrans);
}
mndKillTrans(pMnode, pTrans);
mndReleaseTrans(pMnode, pTrans);
return TSDB_CODE_SUCCESS;
}
int32_t mndResetFromCheckpoint(SMnode *pMnode) {
doKillActiveCheckpointTrans(pMnode);
int32_t mndResetStatusFromCheckpoint(SMnode *pMnode, int32_t transId) {
STrans* pTrans = mndAcquireTrans(pMnode, transId);
if (pTrans != NULL) {
mInfo("kill checkpoint transId:%d to reset task status", transId);
mndKillTrans(pMnode, pTrans);
mndReleaseTrans(pMnode, pTrans);
}
// set all tasks status to be normal, refactor later to be stream level, instead of vnode level.
SSdb * pSdb = pMnode->pSdb;
@ -2690,7 +2724,13 @@ int32_t mndResetFromCheckpoint(SMnode *pMnode) {
break;
}
// todo this transaction should exist be only one
bool conflict = streamTransConflictOtherTrans(pMnode, pStream->sourceDb, pStream->targetDb);
if (conflict) {
mError("stream:%s other trans exists in DB:%s & %s failed to start reset-status trans",
pStream->name, pStream->sourceDb, pStream->targetDb);
continue;
}
mDebug("stream:%s (0x%" PRIx64 ") reset checkpoint procedure, create reset trans", pStream->name, pStream->uid);
int32_t code = createStreamResetStatusTrans(pMnode, pStream);
if (code != TSDB_CODE_SUCCESS) {
@ -2817,7 +2857,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
// if the execInfo.activeCheckpoint == 0, the checkpoint is restoring from wal
mInfo("checkpointId:%" PRId64 " failed, issue task-reset trans to reset all tasks status",
execInfo.activeCheckpoint);
mndResetFromCheckpoint(pMnode);
mndResetStatusFromCheckpoint(pMnode, activeCheckpointId);
} else {
mInfo("not all vgroups are ready, wait for next HB from stream tasks");
}

View File

@ -0,0 +1,105 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "mndTrans.h"
#include "mndStream.h"
typedef struct SKeyInfo {
void* pKey;
int32_t keyLen;
} SKeyInfo;
static int32_t clearFinishedTrans(SMnode* pMnode);
int32_t mndStreamRegisterTrans(STrans* pTrans, const char* pName, const char* pSrcDb, const char* pDstDb) {
SStreamTransInfo info = {.transId = pTrans->id, .startTime = taosGetTimestampMs(), .name = pName};
taosHashPut(execInfo.transMgmt.pDBTrans, pSrcDb, strlen(pSrcDb), &info, sizeof(SStreamTransInfo));
if (strcmp(pSrcDb, pDstDb) != 0) {
taosHashPut(execInfo.transMgmt.pDBTrans, pDstDb, strlen(pDstDb), &info, sizeof(SStreamTransInfo));
}
return 0;
}
int32_t clearFinishedTrans(SMnode* pMnode) {
SArray* pList = taosArrayInit(4, sizeof(SKeyInfo));
size_t keyLen = 0;
taosThreadMutexLock(&execInfo.lock);
void* pIter = NULL;
while ((pIter = taosHashIterate(execInfo.transMgmt.pDBTrans, pIter)) != NULL) {
SStreamTransInfo *pEntry = (SStreamTransInfo *)pIter;
STrans* pTrans = mndAcquireTrans(pMnode, pEntry->transId);
// let's clear the finished trans
if (pTrans == NULL) {
void* pKey = taosHashGetKey(pEntry, &keyLen);
// key is the name of src/dst db name
SKeyInfo info = {.pKey = pKey, .keyLen = keyLen};
mDebug("transId:%d %s startTs:%" PRId64 "cleared due to finished", pEntry->transId, pEntry->name,
pEntry->startTime);
taosArrayPush(pList, &info);
} else {
mndReleaseTrans(pMnode, pTrans);
}
}
size_t num = taosArrayGetSize(pList);
for(int32_t i = 0; i < num; ++i) {
SKeyInfo* pKey = taosArrayGet(pList, i);
taosHashRemove(execInfo.transMgmt.pDBTrans, pKey->pKey, pKey->keyLen);
}
mDebug("clear %d finished stream-trans, remained:%d", (int32_t) num, taosHashGetSize(execInfo.transMgmt.pDBTrans));
taosThreadMutexUnlock(&execInfo.lock);
terrno = TSDB_CODE_SUCCESS;
taosArrayDestroy(pList);
return 0;
}
bool streamTransConflictOtherTrans(SMnode* pMnode, const char* pSrcDb, const char* pDstDb) {
clearFinishedTrans(pMnode);
taosThreadMutexLock(&execInfo.lock);
int32_t num = taosHashGetSize(execInfo.transMgmt.pDBTrans);
if (num <= 0) {
taosThreadMutexUnlock(&execInfo.lock);
return false;
}
SStreamTransInfo *pEntry = taosHashGet(execInfo.transMgmt.pDBTrans, pSrcDb, strlen(pSrcDb));
if (pEntry != NULL) {
taosThreadMutexUnlock(&execInfo.lock);
mWarn("conflict with other transId:%d in Db:%s, trans:%s", pEntry->transId, pSrcDb, pEntry->name);
return true;
}
pEntry = taosHashGet(execInfo.transMgmt.pDBTrans, pDstDb, strlen(pDstDb));
if (pEntry != NULL) {
taosThreadMutexUnlock(&execInfo.lock);
mWarn("conflict with other transId:%d in Db:%s, trans:%s", pEntry->transId, pSrcDb, pEntry->name);
return true;
}
taosThreadMutexUnlock(&execInfo.lock);
return false;
}

View File

@ -1139,16 +1139,10 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
// let's decide which step should be executed now
if (pTask->execInfo.step1Start == 0) {
ASSERT(pTask->status.pauseAllowed == false);
int64_t ts = taosGetTimestampMs();
pTask->execInfo.step1Start = ts;
tqDebug("s-task:%s start scan-history stage(step 1), status:%s, step1 startTs:%" PRId64, id, pStatus, ts);
// NOTE: in case of stream task, scan-history data in wal is not allowed to pause
if (pTask->info.fillHistory == 1) {
streamTaskEnablePause(pTask);
}
} else {
if (pTask->execInfo.step2Start == 0) {
tqDebug("s-task:%s continue exec scan-history(step1), original step1 startTs:%" PRId64 ", already elapsed:%.2fs",

View File

@ -7958,7 +7958,7 @@ static int32_t translateDropStream(STranslateContext* pCxt, SDropStreamStmt* pSt
tNameGetFullDbName(&name, dropReq.name);
dropReq.igNotExists = pStmt->ignoreNotExists;
int32_t code = buildCmdMsg(pCxt, TDMT_MND_DROP_STREAM, (FSerializeFunc)tSerializeSMDropStreamReq, &dropReq);
tFreeSMDropStreamReq(&dropReq);
tFreeMDropStreamReq(&dropReq);
return code;
}

View File

@ -1102,6 +1102,13 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
int32_t vgId = pTask->pMeta->vgId;
int32_t msgId = pTask->execInfo.dispatch;
#if 0
// for test purpose, build the failure case
if (pTask->msgInfo.dispatchMsgType == STREAM_INPUT__CHECKPOINT_TRIGGER) {
pRsp->inputStatus = TASK_INPUT_STATUS__REFUSED;
}
#endif
// follower not handle the dispatch rsp
if ((pTask->pMeta->role == NODE_ROLE_FOLLOWER) || (pTask->status.downstreamReady != 1)) {
stError("s-task:%s vgId:%d is follower or task just re-launched, not handle the dispatch rsp, discard it", id, vgId);
@ -1143,8 +1150,21 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
stWarn("s-task:%s inputQ of downstream task:0x%x(vgId:%d) is full, wait for %dms and retry dispatch", id,
pRsp->downstreamTaskId, pRsp->downstreamNodeId, DISPATCH_RETRY_INTERVAL_MS);
} else if (pRsp->inputStatus == TASK_INPUT_STATUS__REFUSED) {
stError("s-task:%s downstream task:0x%x(vgId:%d) refused the dispatch msg, treat it as success", id,
pRsp->downstreamTaskId, pRsp->downstreamNodeId);
// todo handle the agg task failure, add test case
if (pTask->msgInfo.dispatchMsgType == STREAM_INPUT__CHECKPOINT_TRIGGER &&
pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
stError("s-task:%s failed to dispatch checkpoint-trigger msg, checkpointId:%" PRId64
", set the current checkpoint failed, and send rsp to mnode",
id, pTask->checkpointingId);
{ // send checkpoint failure msg to mnode directly
pTask->chkInfo.failedId = pTask->checkpointingId; // record the latest failed checkpoint id
pTask->checkpointingId = pTask->checkpointingId;
streamTaskSendCheckpointSourceRsp(pTask);
}
} else {
stError("s-task:%s downstream task:0x%x(vgId:%d) refused the dispatch msg, treat it as success", id,
pRsp->downstreamTaskId, pRsp->downstreamNodeId);
}
}
}

View File

@ -169,7 +169,6 @@ int32_t streamTaskStartScanHistory(SStreamTask* pTask) {
} else if (level == TASK_LEVEL__AGG) {
if (pTask->info.fillHistory) {
streamSetParamForScanHistory(pTask);
streamTaskEnablePause(pTask);
}
} else if (level == TASK_LEVEL__SINK) {
stDebug("s-task:%s sink task do nothing to handle scan-history", pTask->id.idStr);
@ -346,7 +345,6 @@ int32_t onNormalTaskReady(SStreamTask* pTask) {
stDebug("s-task:%s level:%d status:%s sched-status:%d", id, pTask->info.taskLevel, p, pTask->status.schedStatus);
}
streamTaskEnablePause(pTask);
return TSDB_CODE_SUCCESS;
}
@ -659,9 +657,6 @@ int32_t streamProcessScanHistoryFinishRsp(SStreamTask* pTask) {
streamMetaCommit(pMeta);
streamMetaWUnLock(pMeta);
// history data scan in the stream time window finished, now let's enable the pause
streamTaskEnablePause(pTask);
// for source tasks, let's continue execute.
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
streamSchedExec(pTask);
@ -1040,11 +1035,6 @@ void streamTaskResume(SStreamTask* pTask) {
}
}
void streamTaskEnablePause(SStreamTask* pTask) {
stDebug("s-task:%s enable task pause", pTask->id.idStr);
pTask->status.pauseAllowed = 1;
}
static void displayStatusInfo(SStreamMeta* pMeta, SHashObj* pTaskSet, bool succ) {
int32_t vgId = pMeta->vgId;
void* pIter = NULL;

View File

@ -269,6 +269,7 @@ int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event) {
pTask->id.idStr, pSM->current.name, GET_EVT_NAME(evt));
taosMsleep(100);
} else {
// no active event trans exists, handle this event directly
pTrans = streamTaskFindTransform(pSM->current.state, event);
if (pTrans == NULL) {
stDebug("s-task:%s failed to handle event:%s", pTask->id.idStr, GET_EVT_NAME(event));
@ -451,60 +452,43 @@ int32_t initStateTransferTable() {
return TSDB_CODE_SUCCESS;
}
//clang-format off
void doInitStateTransferTable(void) {
streamTaskSMTrans = taosArrayInit(8, sizeof(STaskStateTrans));
// initialization event handle
STaskStateTrans trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__READY, TASK_EVENT_INIT,
streamTaskInitStatus, onNormalTaskReady, false, false);
STaskStateTrans trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__READY, TASK_EVENT_INIT, streamTaskInitStatus, onNormalTaskReady, false, false);
taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__SCAN_HISTORY, TASK_EVENT_INIT_SCANHIST,
streamTaskInitStatus, onScanhistoryTaskReady, false, false);
trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__SCAN_HISTORY, TASK_EVENT_INIT_SCANHIST, streamTaskInitStatus, onScanhistoryTaskReady, false, false);
taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__STREAM_SCAN_HISTORY, TASK_EVENT_INIT_STREAM_SCANHIST,
streamTaskInitStatus, onScanhistoryTaskReady, false, false);
trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__STREAM_SCAN_HISTORY, TASK_EVENT_INIT_STREAM_SCANHIST, streamTaskInitStatus, onScanhistoryTaskReady, false, false);
taosArrayPush(streamTaskSMTrans, &trans);
// scan-history related event
trans = createStateTransform(TASK_STATUS__SCAN_HISTORY, TASK_STATUS__READY, TASK_EVENT_SCANHIST_DONE, NULL, NULL,
NULL, true);
trans = createStateTransform(TASK_STATUS__SCAN_HISTORY, TASK_STATUS__READY, TASK_EVENT_SCANHIST_DONE, NULL, NULL, NULL, true);
taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS__STREAM_SCAN_HISTORY, TASK_STATUS__READY, TASK_EVENT_SCANHIST_DONE, NULL,
NULL, NULL, true);
trans = createStateTransform(TASK_STATUS__STREAM_SCAN_HISTORY, TASK_STATUS__READY, TASK_EVENT_SCANHIST_DONE, NULL, NULL, NULL, true);
taosArrayPush(streamTaskSMTrans, &trans);
// halt stream task, from other task status
trans = createStateTransform(TASK_STATUS__READY, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL,
streamTaskKeepCurrentVerInWal, NULL, true);
trans = createStateTransform(TASK_STATUS__READY, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, streamTaskKeepCurrentVerInWal, NULL, true);
taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS__HALT, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL,
streamTaskKeepCurrentVerInWal, NULL, true);
trans = createStateTransform(TASK_STATUS__HALT, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, streamTaskKeepCurrentVerInWal, NULL, true);
taosArrayPush(streamTaskSMTrans, &trans);
SAttachedEventInfo info = {.status = TASK_STATUS__READY, .event = TASK_EVENT_HALT};
trans = createStateTransform(TASK_STATUS__STREAM_SCAN_HISTORY, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL,
streamTaskKeepCurrentVerInWal, &info, true);
taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, streamTaskKeepCurrentVerInWal,
&info, true);
trans = createStateTransform(TASK_STATUS__STREAM_SCAN_HISTORY, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, streamTaskKeepCurrentVerInWal, &info, true);
taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS__PAUSE, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL,
streamTaskKeepCurrentVerInWal, NULL, true);
trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, streamTaskKeepCurrentVerInWal, &info, true);
taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS__PAUSE, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, streamTaskKeepCurrentVerInWal, NULL, true);
taosArrayPush(streamTaskSMTrans, &trans);
// checkpoint related event
trans = createStateTransform(TASK_STATUS__READY, TASK_STATUS__CK, TASK_EVENT_GEN_CHECKPOINT, NULL,
streamTaskDoCheckpoint, NULL, true);
trans = createStateTransform(TASK_STATUS__READY, TASK_STATUS__CK, TASK_EVENT_GEN_CHECKPOINT, NULL, streamTaskDoCheckpoint, NULL, true);
taosArrayPush(streamTaskSMTrans, &trans);
trans =
createStateTransform(TASK_STATUS__CK, TASK_STATUS__READY, TASK_EVENT_CHECKPOINT_DONE, NULL, NULL, NULL, true);
trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__READY, TASK_EVENT_CHECKPOINT_DONE, NULL, NULL, NULL, true);
taosArrayPush(streamTaskSMTrans, &trans);
// pause & resume related event handle
@ -571,4 +555,5 @@ void doInitStateTransferTable(void) {
taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS__STREAM_SCAN_HISTORY, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL, true);
taosArrayPush(streamTaskSMTrans, &trans);
}
}
//clang-format on

View File

@ -16,9 +16,8 @@ sql create table ts2 using st tags(2,2,2);
sql create table ts3 using st tags(3,2,2);
sql create table ts4 using st tags(4,2,2);
sql create stream streams1 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 watermark 1d into streamt1 as select _wstart, count(*) c1, sum(a) c3 from st interval(10s);
sleep 1000
sleep 2000
sleep 1000
sql pause stream streams1;
sql insert into ts1 values(1648791213001,1,12,3,1.0);