Merge pull request #24552 from taosdata/fix/3_liaohj

fix(stream): do checkpoint after fill-history task completed.
This commit is contained in:
Haojun Liao 2024-01-25 16:53:11 +08:00 committed by GitHub
commit 834f08a2e0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
28 changed files with 430 additions and 392 deletions

View File

@ -217,6 +217,7 @@
TD_DEF_MSG_TYPE(TDMT_MND_VIEW_META, "view-meta", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_VIEW_META, "view-meta", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_KILL_COMPACT, "kill-compact", SKillCompactReq, NULL) TD_DEF_MSG_TYPE(TDMT_MND_KILL_COMPACT, "kill-compact", SKillCompactReq, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_COMPACT_TIMER, "compact-tmr", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_COMPACT_TIMER, "compact-tmr", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_STREAM_REQ_CHKPT, "stream-req-checkpoint", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_MAX_MSG, "mnd-max", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_MAX_MSG, "mnd-max", NULL, NULL)
TD_CLOSE_MSG_SEG(TDMT_END_MND_MSG) TD_CLOSE_MSG_SEG(TDMT_END_MND_MSG)
@ -301,7 +302,6 @@
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_PAUSE, "stream-task-pause", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_PAUSE, "stream-task-pause", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_RESUME, "stream-task-resume", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_RESUME, "stream-task-resume", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_STOP, "stream-task-stop", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_STOP, "stream-task-stop", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_HTASK_DROP, "stream-htask-drop", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_MAX_MSG, "stream-max", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_MAX_MSG, "stream-max", NULL, NULL)
TD_CLOSE_MSG_SEG(TDMT_END_STREAM_MSG) TD_CLOSE_MSG_SEG(TDMT_END_STREAM_MSG)

View File

@ -535,7 +535,7 @@ SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, bool fillHistory
SArray* pTaskList, bool hasFillhistory); SArray* pTaskList, bool hasFillhistory);
int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask); int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask);
int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask); int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask);
void tFreeStreamTask(SStreamTask* pTask); void tFreeStreamTask(SStreamTask* pTask, bool metaLock);
int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, int64_t ver); int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, int64_t ver);
int32_t tDecodeStreamTaskChkInfo(SDecoder* pDecoder, SCheckpointInfo* pChkpInfo); int32_t tDecodeStreamTaskChkInfo(SDecoder* pDecoder, SCheckpointInfo* pChkpInfo);
@ -640,6 +640,7 @@ typedef struct {
int32_t tEncodeStreamScanHistoryFinishReq(SEncoder* pEncoder, const SStreamScanHistoryFinishReq* pReq); int32_t tEncodeStreamScanHistoryFinishReq(SEncoder* pEncoder, const SStreamScanHistoryFinishReq* pReq);
int32_t tDecodeStreamScanHistoryFinishReq(SDecoder* pDecoder, SStreamScanHistoryFinishReq* pReq); int32_t tDecodeStreamScanHistoryFinishReq(SDecoder* pDecoder, SStreamScanHistoryFinishReq* pReq);
// mndTrigger: denote if this checkpoint is triggered by mnode or as requested from tasks when transfer-state finished
typedef struct { typedef struct {
int64_t streamId; int64_t streamId;
int64_t checkpointId; int64_t checkpointId;
@ -648,6 +649,7 @@ typedef struct {
SEpSet mgmtEps; SEpSet mgmtEps;
int32_t mnodeId; int32_t mnodeId;
int32_t transId; int32_t transId;
int8_t mndTrigger;
int64_t expireTime; int64_t expireTime;
} SStreamCheckpointSourceReq; } SStreamCheckpointSourceReq;
@ -770,6 +772,15 @@ int32_t tDecodeStreamRetrieveReq(SDecoder* pDecoder, SStreamRetrieveReq* pReq);
void tDeleteStreamRetrieveReq(SStreamRetrieveReq* pReq); void tDeleteStreamRetrieveReq(SStreamRetrieveReq* pReq);
void tDeleteStreamDispatchReq(SStreamDispatchReq* pReq); void tDeleteStreamDispatchReq(SStreamDispatchReq* pReq);
typedef struct SStreamTaskCheckpointReq {
int64_t streamId;
int32_t taskId;
int32_t nodeId;
} SStreamTaskCheckpointReq;
int32_t tEncodeStreamTaskCheckpointReq(SEncoder* pEncoder, const SStreamTaskCheckpointReq* pReq);
int32_t tDecodeStreamTaskCheckpointReq(SDecoder* pDecoder, SStreamTaskCheckpointReq* pReq);
int32_t streamSetupScheduleTrigger(SStreamTask* pTask); int32_t streamSetupScheduleTrigger(SStreamTask* pTask);
int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pMsg); int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pMsg);
@ -792,6 +803,7 @@ SStreamTaskState* streamTaskGetStatus(const SStreamTask* pTask);
const char* streamTaskGetStatusStr(ETaskStatus status); const char* streamTaskGetStatusStr(ETaskStatus status);
void streamTaskResetStatus(SStreamTask* pTask); void streamTaskResetStatus(SStreamTask* pTask);
void streamTaskSetStatusReady(SStreamTask* pTask); void streamTaskSetStatusReady(SStreamTask* pTask);
ETaskStatus streamTaskGetPrevStatus(const SStreamTask* pTask);
void initRpcMsg(SRpcMsg* pMsg, int32_t msgType, void* pCont, int32_t contLen); void initRpcMsg(SRpcMsg* pMsg, int32_t msgType, void* pCont, int32_t contLen);
@ -806,7 +818,7 @@ bool streamTaskIsAllUpstreamClosed(SStreamTask* pTask);
bool streamTaskSetSchedStatusWait(SStreamTask* pTask); bool streamTaskSetSchedStatusWait(SStreamTask* pTask);
int8_t streamTaskSetSchedStatusActive(SStreamTask* pTask); int8_t streamTaskSetSchedStatusActive(SStreamTask* pTask);
int8_t streamTaskSetSchedStatusInactive(SStreamTask* pTask); int8_t streamTaskSetSchedStatusInactive(SStreamTask* pTask);
int32_t streamTaskClearHTaskAttr(SStreamTask* pTask); int32_t streamTaskClearHTaskAttr(SStreamTask* pTask, bool metaLock);
int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event); int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event);
int32_t streamTaskHandleEventAsync(SStreamTaskSM* pSM, EStreamTaskEvent event, void* pFn); int32_t streamTaskHandleEventAsync(SStreamTaskSM* pSM, EStreamTaskEvent event, void* pFn);
@ -839,6 +851,7 @@ void streamTaskCloseUpstreamInput(SStreamTask* pTask, int32_t taskId);
void streamTaskOpenAllUpstreamInput(SStreamTask* pTask); void streamTaskOpenAllUpstreamInput(SStreamTask* pTask);
int32_t streamTaskSetDb(SStreamMeta* pMeta, void* pTask, char* key); int32_t streamTaskSetDb(SStreamMeta* pMeta, void* pTask, char* key);
bool streamTaskIsSinkTask(const SStreamTask* pTask); bool streamTaskIsSinkTask(const SStreamTask* pTask);
int32_t streamTaskSendCheckpointReq(SStreamTask* pTask);
void streamTaskStatusInit(STaskStatusEntry* pEntry, const SStreamTask* pTask); void streamTaskStatusInit(STaskStatusEntry* pEntry, const SStreamTask* pTask);
void streamTaskStatusCopy(STaskStatusEntry* pDst, const STaskStatusEntry* pSrc); void streamTaskStatusCopy(STaskStatusEntry* pDst, const STaskStatusEntry* pSrc);
@ -866,6 +879,7 @@ int32_t streamMetaGetNumOfTasks(SStreamMeta* pMeta);
SStreamTask* streamMetaAcquireTaskNoLock(SStreamMeta* pMeta, int64_t streamId, int32_t taskId); SStreamTask* streamMetaAcquireTaskNoLock(SStreamMeta* pMeta, int64_t streamId, int32_t taskId);
SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId); SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId);
void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask); void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask);
SStreamTask* streamMetaAcquireOneTask(SStreamTask* pTask);
void streamMetaClear(SStreamMeta* pMeta); void streamMetaClear(SStreamMeta* pMeta);
void streamMetaInitBackend(SStreamMeta* pMeta); void streamMetaInitBackend(SStreamMeta* pMeta);
int32_t streamMetaCommit(SStreamMeta* pMeta); int32_t streamMetaCommit(SStreamMeta* pMeta);

View File

@ -223,6 +223,7 @@ SArray *mmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_UPDATE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_UPDATE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_RESET_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_RESET_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_REQ_CHKPT, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_KILL_COMPACT_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_KILL_COMPACT_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;

View File

@ -84,7 +84,6 @@ SArray *smGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_PAUSE, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_PAUSE, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RESUME, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RESUME, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_STOP, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_STOP, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_HTASK_DROP, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_SCAN_HISTORY_FINISH, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_SCAN_HISTORY_FINISH, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;

View File

@ -835,7 +835,6 @@ SArray *vmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_PAUSE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_PAUSE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RESUME, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RESUME, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_STOP, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_STOP, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_HTASK_DROP, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_CHECK_POINT_SOURCE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_CHECK_POINT_SOURCE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_UPDATE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_UPDATE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;

View File

@ -50,6 +50,7 @@ typedef struct SStreamExecInfo {
SHashObj *pTaskMap; SHashObj *pTaskMap;
SArray *pTaskList; SArray *pTaskList;
TdThreadMutex lock; TdThreadMutex lock;
SHashObj *pTransferStateStreams;
} SStreamExecInfo; } SStreamExecInfo;
#define MND_STREAM_CREATE_NAME "stream-create" #define MND_STREAM_CREATE_NAME "stream-create"

View File

@ -17,6 +17,8 @@
#include "mndDef.h" #include "mndDef.h"
#include "mndConsumer.h" #include "mndConsumer.h"
static void *freeStreamTasks(SArray *pTaskLevel);
int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) { int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) {
if (tStartEncode(pEncoder) < 0) return -1; if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeCStr(pEncoder, pObj->name) < 0) return -1; if (tEncodeCStr(pEncoder, pObj->name) < 0) return -1;
@ -121,11 +123,18 @@ int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj, int32_t sver) {
if (tDecodeCStrAlloc(pDecoder, &pObj->ast) < 0) return -1; if (tDecodeCStrAlloc(pDecoder, &pObj->ast) < 0) return -1;
if (tDecodeCStrAlloc(pDecoder, &pObj->physicalPlan) < 0) return -1; if (tDecodeCStrAlloc(pDecoder, &pObj->physicalPlan) < 0) return -1;
pObj->tasks = NULL; if (pObj->tasks != NULL) {
pObj->tasks = freeStreamTasks(pObj->tasks);
}
int32_t sz; int32_t sz;
if (tDecodeI32(pDecoder, &sz) < 0) return -1; if (tDecodeI32(pDecoder, &sz) < 0) {
return -1;
}
if (sz != 0) { if (sz != 0) {
pObj->tasks = taosArrayInit(sz, sizeof(void *)); pObj->tasks = taosArrayInit(sz, sizeof(void *));
for (int32_t i = 0; i < sz; i++) { for (int32_t i = 0; i < sz; i++) {
int32_t innerSz; int32_t innerSz;
if (tDecodeI32(pDecoder, &innerSz) < 0) return -1; if (tDecodeI32(pDecoder, &innerSz) < 0) return -1;
@ -165,14 +174,15 @@ int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj, int32_t sver) {
return 0; return 0;
} }
static void *freeStreamTasks(SArray *pTaskLevel) { void *freeStreamTasks(SArray *pTaskLevel) {
int32_t numOfLevel = taosArrayGetSize(pTaskLevel); int32_t numOfLevel = taosArrayGetSize(pTaskLevel);
for (int32_t i = 0; i < numOfLevel; i++) { for (int32_t i = 0; i < numOfLevel; i++) {
SArray *pLevel = taosArrayGetP(pTaskLevel, i); SArray *pLevel = taosArrayGetP(pTaskLevel, i);
int32_t taskSz = taosArrayGetSize(pLevel); int32_t taskSz = taosArrayGetSize(pLevel);
for (int32_t j = 0; j < taskSz; j++) { for (int32_t j = 0; j < taskSz; j++) {
SStreamTask *pTask = taosArrayGetP(pLevel, j); SStreamTask *pTask = taosArrayGetP(pLevel, j);
tFreeStreamTask(pTask); tFreeStreamTask(pTask, true);
} }
taosArrayDestroy(pLevel); taosArrayDestroy(pLevel);

View File

@ -767,7 +767,7 @@ _OVER:
pMsg->msgType == TDMT_MND_TRIM_DB_TIMER || pMsg->msgType == TDMT_MND_UPTIME_TIMER || pMsg->msgType == TDMT_MND_TRIM_DB_TIMER || pMsg->msgType == TDMT_MND_UPTIME_TIMER ||
pMsg->msgType == TDMT_MND_COMPACT_TIMER || pMsg->msgType == TDMT_MND_NODECHECK_TIMER || pMsg->msgType == TDMT_MND_COMPACT_TIMER || pMsg->msgType == TDMT_MND_NODECHECK_TIMER ||
pMsg->msgType == TDMT_MND_GRANT_HB_TIMER || pMsg->msgType == TDMT_MND_STREAM_CHECKPOINT_CANDIDITATE || pMsg->msgType == TDMT_MND_GRANT_HB_TIMER || pMsg->msgType == TDMT_MND_STREAM_CHECKPOINT_CANDIDITATE ||
pMsg->msgType == TDMT_MND_STREAM_CHECKPOINT_TIMER) { pMsg->msgType == TDMT_MND_STREAM_CHECKPOINT_TIMER || pMsg->msgType == TDMT_MND_STREAM_REQ_CHKPT) {
mTrace("timer not process since mnode restored:%d stopped:%d, sync restored:%d role:%s ", pMnode->restored, mTrace("timer not process since mnode restored:%d stopped:%d, sync restored:%d role:%s ", pMnode->restored,
pMnode->stopped, state.restored, syncStr(state.state)); pMnode->stopped, state.restored, syncStr(state.state));
return -1; return -1;

View File

@ -61,15 +61,15 @@ static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock
static void mndCancelGetNextStreamTask(SMnode *pMnode, void *pIter); static void mndCancelGetNextStreamTask(SMnode *pMnode, void *pIter);
static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq); static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq);
static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq); static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq);
static int32_t mndBuildStreamCheckpointSourceReq2(void **pBuf, int32_t *pLen, int32_t nodeId, int64_t checkpointId, static int32_t mndBuildStreamCheckpointSourceReq(void **pBuf, int32_t *pLen, int32_t nodeId, int64_t checkpointId,
int64_t streamId, int32_t taskId, int32_t transId); int64_t streamId, int32_t taskId, int32_t transId, int8_t mndTrigger);
static int32_t mndProcessNodeCheck(SRpcMsg *pReq); static int32_t mndProcessNodeCheck(SRpcMsg *pReq);
static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg); static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg);
static SArray *extractNodeListFromStream(SMnode *pMnode); static SArray *extractNodeListFromStream(SMnode *pMnode);
static SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady); static SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady);
static int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq);
static SStreamObj *mndGetStreamObj(SMnode *pMnode, int64_t streamId); static SStreamObj *mndGetStreamObj(SMnode *pMnode, int64_t streamId);
static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeList, const SArray *pNodeList); static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeList, const SArray *pNodeList);
static STrans *doCreateTrans(SMnode *pMnode, SStreamObj *pStream, SRpcMsg *pReq, const char *name, const char *pMsg); static STrans *doCreateTrans(SMnode *pMnode, SStreamObj *pStream, SRpcMsg *pReq, const char *name, const char *pMsg);
@ -85,6 +85,7 @@ static void killTransImpl(SMnode *pMnode, int32_t transId, const char *pDbNam
static int32_t setNodeEpsetExpiredFlag(const SArray *pNodeList); static int32_t setNodeEpsetExpiredFlag(const SArray *pNodeList);
static void freeCheckpointCandEntry(void *); static void freeCheckpointCandEntry(void *);
static void freeTaskList(void *param);
static SSdbRaw *mndStreamActionEncode(SStreamObj *pStream); static SSdbRaw *mndStreamActionEncode(SStreamObj *pStream);
static SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw); static SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw);
@ -130,6 +131,7 @@ int32_t mndInitStream(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_VND_STREAM_CHECK_POINT_SOURCE_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_VND_STREAM_CHECK_POINT_SOURCE_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_MND_STREAM_CHECKPOINT_TIMER, mndProcessStreamCheckpointTmr); mndSetMsgHandle(pMnode, TDMT_MND_STREAM_CHECKPOINT_TIMER, mndProcessStreamCheckpointTmr);
mndSetMsgHandle(pMnode, TDMT_MND_STREAM_BEGIN_CHECKPOINT, mndProcessStreamDoCheckpoint); mndSetMsgHandle(pMnode, TDMT_MND_STREAM_BEGIN_CHECKPOINT, mndProcessStreamDoCheckpoint);
mndSetMsgHandle(pMnode, TDMT_MND_STREAM_REQ_CHKPT, mndProcessStreamReqCheckpoint);
mndSetMsgHandle(pMnode, TDMT_MND_STREAM_CHECKPOINT_CANDIDITATE, mndProcessStreamCheckpointInCandid); mndSetMsgHandle(pMnode, TDMT_MND_STREAM_CHECKPOINT_CANDIDITATE, mndProcessStreamCheckpointInCandid);
mndSetMsgHandle(pMnode, TDMT_MND_STREAM_HEARTBEAT, mndProcessStreamHb); mndSetMsgHandle(pMnode, TDMT_MND_STREAM_HEARTBEAT, mndProcessStreamHb);
mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_REPORT_CHECKPOINT, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_REPORT_CHECKPOINT, mndTransProcessRsp);
@ -150,7 +152,10 @@ int32_t mndInitStream(SMnode *pMnode) {
execInfo.pTaskMap = taosHashInit(64, fn, true, HASH_NO_LOCK); execInfo.pTaskMap = taosHashInit(64, fn, true, HASH_NO_LOCK);
execInfo.transMgmt.pDBTrans = taosHashInit(32, fn, true, HASH_NO_LOCK); execInfo.transMgmt.pDBTrans = taosHashInit(32, fn, true, HASH_NO_LOCK);
execInfo.transMgmt.pWaitingList = taosHashInit(32, fn, true, HASH_NO_LOCK); execInfo.transMgmt.pWaitingList = taosHashInit(32, fn, true, HASH_NO_LOCK);
execInfo.pTransferStateStreams = taosHashInit(32, fn, true, HASH_NO_LOCK);
taosHashSetFreeFp(execInfo.transMgmt.pWaitingList, freeCheckpointCandEntry); taosHashSetFreeFp(execInfo.transMgmt.pWaitingList, freeCheckpointCandEntry);
taosHashSetFreeFp(execInfo.pTransferStateStreams, freeTaskList);
if (sdbSetTable(pMnode->pSdb, table) != 0) { if (sdbSetTable(pMnode->pSdb, table) != 0) {
return -1; return -1;
@ -166,6 +171,7 @@ void mndCleanupStream(SMnode *pMnode) {
taosHashCleanup(execInfo.pTaskMap); taosHashCleanup(execInfo.pTaskMap);
taosHashCleanup(execInfo.transMgmt.pDBTrans); taosHashCleanup(execInfo.transMgmt.pDBTrans);
taosHashCleanup(execInfo.transMgmt.pWaitingList); taosHashCleanup(execInfo.transMgmt.pWaitingList);
taosHashCleanup(execInfo.pTransferStateStreams);
taosThreadMutexDestroy(&execInfo.lock); taosThreadMutexDestroy(&execInfo.lock);
mDebug("mnd stream exec info cleanup"); mDebug("mnd stream exec info cleanup");
} }
@ -219,11 +225,12 @@ STREAM_ENCODE_OVER:
SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw) { SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
SSdbRow *pRow = NULL; SSdbRow *pRow = NULL;
SStreamObj *pStream = NULL; SStreamObj *pStream = NULL;
void *buf = NULL; void *buf = NULL;
int8_t sver = 0; int8_t sver = 0;
if (sdbGetRawSoftVer(pRaw, &sver) != 0) { if (sdbGetRawSoftVer(pRaw, &sver) != 0) {
goto STREAM_DECODE_OVER; goto STREAM_DECODE_OVER;
} }
@ -235,16 +242,24 @@ SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw) {
} }
pRow = sdbAllocRow(sizeof(SStreamObj)); pRow = sdbAllocRow(sizeof(SStreamObj));
if (pRow == NULL) goto STREAM_DECODE_OVER; if (pRow == NULL) {
goto STREAM_DECODE_OVER;
}
pStream = sdbGetRowObj(pRow); pStream = sdbGetRowObj(pRow);
if (pStream == NULL) goto STREAM_DECODE_OVER; if (pStream == NULL) {
goto STREAM_DECODE_OVER;
}
int32_t tlen; int32_t tlen;
int32_t dataPos = 0; int32_t dataPos = 0;
SDB_GET_INT32(pRaw, dataPos, &tlen, STREAM_DECODE_OVER); SDB_GET_INT32(pRaw, dataPos, &tlen, STREAM_DECODE_OVER);
buf = taosMemoryMalloc(tlen + 1); buf = taosMemoryMalloc(tlen + 1);
if (buf == NULL) goto STREAM_DECODE_OVER; if (buf == NULL) {
goto STREAM_DECODE_OVER;
}
SDB_GET_BINARY(pRaw, dataPos, buf, tlen, STREAM_DECODE_OVER); SDB_GET_BINARY(pRaw, dataPos, buf, tlen, STREAM_DECODE_OVER);
SDecoder decoder; SDecoder decoder;
@ -260,13 +275,13 @@ SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw) {
STREAM_DECODE_OVER: STREAM_DECODE_OVER:
taosMemoryFreeClear(buf); taosMemoryFreeClear(buf);
if (terrno != TSDB_CODE_SUCCESS) { if (terrno != TSDB_CODE_SUCCESS) {
mError("stream:%s, failed to decode from raw:%p since %s", pStream == NULL ? "null" : pStream->name, pRaw, char* p = (pStream == NULL) ? "null" : pStream->name;
terrstr()); mError("stream:%s, failed to decode from raw:%p since %s", p, pRaw, terrstr());
taosMemoryFreeClear(pRow); taosMemoryFreeClear(pRow);
return NULL; return NULL;
} }
mTrace("stream:%s, decode from raw:%p, row:%p, checkpoint:%" PRId64 "", pStream->name, pRaw, pStream, mTrace("stream:%s, decode from raw:%p, row:%p, checkpoint:%" PRId64, pStream->name, pRaw, pStream,
pStream->checkpointId); pStream->checkpointId);
return pRow; return pRow;
} }
@ -980,24 +995,9 @@ static int32_t mndProcessStreamCheckpointTmr(SRpcMsg *pReq) {
return 0; return 0;
} }
static int32_t mndProcessStreamRemainChkptTmr(SRpcMsg *pReq) { static int32_t mndBuildStreamCheckpointSourceReq(void **pBuf, int32_t *pLen, int32_t nodeId, int64_t checkpointId,
SMnode *pMnode = pReq->info.node; int64_t streamId, int32_t taskId, int32_t transId,
SSdb *pSdb = pMnode->pSdb; int8_t mndTrigger) {
if (sdbGetSize(pSdb, SDB_STREAM) <= 0) {
return 0;
}
SMStreamDoCheckpointMsg *pMsg = rpcMallocCont(sizeof(SMStreamDoCheckpointMsg));
pMsg->checkpointId = 0;
int32_t size = sizeof(SMStreamDoCheckpointMsg);
SRpcMsg rpcMsg = {.msgType = TDMT_MND_STREAM_CHECKPOINT_CANDIDITATE, .pCont = pMsg, .contLen = size};
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
return 0;
}
static int32_t mndBuildStreamCheckpointSourceReq2(void **pBuf, int32_t *pLen, int32_t nodeId, int64_t checkpointId,
int64_t streamId, int32_t taskId, int32_t transId) {
SStreamCheckpointSourceReq req = {0}; SStreamCheckpointSourceReq req = {0};
req.checkpointId = checkpointId; req.checkpointId = checkpointId;
req.nodeId = nodeId; req.nodeId = nodeId;
@ -1005,6 +1005,7 @@ static int32_t mndBuildStreamCheckpointSourceReq2(void **pBuf, int32_t *pLen, in
req.streamId = streamId; // pTask->id.streamId; req.streamId = streamId; // pTask->id.streamId;
req.taskId = taskId; // pTask->id.taskId; req.taskId = taskId; // pTask->id.taskId;
req.transId = transId; req.transId = transId;
req.mndTrigger = mndTrigger;
int32_t code; int32_t code;
int32_t blen; int32_t blen;
@ -1040,14 +1041,16 @@ static int32_t mndBuildStreamCheckpointSourceReq2(void **pBuf, int32_t *pLen, in
return 0; return 0;
} }
static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStream, int64_t checkpointId) { static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStream, int64_t checkpointId,
int8_t mndTrigger, bool lock) {
int32_t code = -1; int32_t code = -1;
int64_t timestampMs = taosGetTimestampMs(); int64_t ts = taosGetTimestampMs();
if (timestampMs - pStream->checkpointFreq < tsStreamCheckpointInterval * 1000) { if (mndTrigger == 1 && (ts - pStream->checkpointFreq < tsStreamCheckpointInterval * 1000)) {
// mWarn("checkpoint interval less than the threshold, ignore it");
return -1; return -1;
} }
bool conflict = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_CHECKPOINT_NAME, true); bool conflict = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_CHECKPOINT_NAME, lock);
if (conflict) { if (conflict) {
mndAddtoCheckpointWaitingList(pStream, checkpointId); mndAddtoCheckpointWaitingList(pStream, checkpointId);
mWarn("checkpoint conflict with other trans in %s, ignore the checkpoint for stream:%s %" PRIx64, pStream->sourceDb, mWarn("checkpoint conflict with other trans in %s, ignore the checkpoint for stream:%s %" PRIx64, pStream->sourceDb,
@ -1093,8 +1096,8 @@ static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStre
void *buf; void *buf;
int32_t tlen; int32_t tlen;
if (mndBuildStreamCheckpointSourceReq2(&buf, &tlen, pTask->info.nodeId, checkpointId, pTask->id.streamId, if (mndBuildStreamCheckpointSourceReq(&buf, &tlen, pTask->info.nodeId, checkpointId, pTask->id.streamId,
pTask->id.taskId, pTrans->id) < 0) { pTask->id.taskId, pTrans->id, mndTrigger) < 0) {
mndReleaseVgroup(pMnode, pVgObj); mndReleaseVgroup(pMnode, pVgObj);
taosWUnLockLatch(&pStream->lock); taosWUnLockLatch(&pStream->lock);
goto _ERR; goto _ERR;
@ -1128,7 +1131,7 @@ static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStre
} }
if ((code = mndTransPrepare(pMnode, pTrans)) != TSDB_CODE_SUCCESS) { if ((code = mndTransPrepare(pMnode, pTrans)) != TSDB_CODE_SUCCESS) {
mError("failed to prepare trans rebalance since %s", terrstr()); mError("failed to prepare checkpoint trans since %s", terrstr());
goto _ERR; goto _ERR;
} }
@ -1138,80 +1141,6 @@ _ERR:
return code; return code;
} }
static int32_t mndAddStreamCheckpointToTrans(STrans *pTrans, SStreamObj *pStream, SMnode *pMnode, int64_t chkptId) {
taosWLockLatch(&pStream->lock);
int32_t totLevel = taosArrayGetSize(pStream->tasks);
for (int32_t i = 0; i < totLevel; i++) {
SArray *pLevel = taosArrayGetP(pStream->tasks, i);
SStreamTask *pTask = taosArrayGetP(pLevel, 0);
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
int32_t sz = taosArrayGetSize(pLevel);
for (int32_t j = 0; j < sz; j++) {
pTask = taosArrayGetP(pLevel, j);
if (pTask->info.fillHistory == 1) {
continue;
}
/*A(pTask->info.nodeId > 0);*/
SVgObj *pVgObj = mndAcquireVgroup(pMnode, pTask->info.nodeId);
if (pVgObj == NULL) {
taosWUnLockLatch(&pStream->lock);
return -1;
}
void *buf;
int32_t tlen;
if (mndBuildStreamCheckpointSourceReq2(&buf, &tlen, pTask->info.nodeId, chkptId, pTask->id.streamId,
pTask->id.taskId, pTrans->id) < 0) {
mndReleaseVgroup(pMnode, pVgObj);
taosWUnLockLatch(&pStream->lock);
return -1;
}
STransAction action = {0};
SEpSet epset = mndGetVgroupEpset(pMnode, pVgObj);
mndReleaseVgroup(pMnode, pVgObj);
initTransAction(&action, buf, tlen, TDMT_VND_STREAM_CHECK_POINT_SOURCE, &epset,
TSDB_CODE_SYN_PROPOSE_NOT_READY);
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
taosMemoryFree(buf);
taosWUnLockLatch(&pStream->lock);
return -1;
}
}
}
}
pStream->checkpointId = chkptId;
pStream->checkpointFreq = taosGetTimestampMs();
pStream->currentTick = 0;
// 3. commit log: stream checkpoint info
pStream->version = pStream->version + 1;
taosWUnLockLatch(&pStream->lock);
SSdbRaw *pCommitRaw = mndStreamActionEncode(pStream);
if (pCommitRaw == NULL) {
mError("failed to prepare trans rebalance since %s", terrstr());
return -1;
}
if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
sdbFreeRaw(pCommitRaw);
mError("failed to prepare trans rebalance since %s", terrstr());
return -1;
}
if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY) != 0) {
sdbFreeRaw(pCommitRaw);
mError("failed to prepare trans rebalance since %s", terrstr());
return -1;
}
return 0;
}
static int32_t initStreamNodeList(SMnode *pMnode) { static int32_t initStreamNodeList(SMnode *pMnode) {
if (execInfo.pNodeList == NULL || (taosArrayGetSize(execInfo.pNodeList) == 0)) { if (execInfo.pNodeList == NULL || (taosArrayGetSize(execInfo.pNodeList) == 0)) {
execInfo.pNodeList = taosArrayDestroy(execInfo.pNodeList); execInfo.pNodeList = taosArrayDestroy(execInfo.pNodeList);
@ -1287,7 +1216,7 @@ static int32_t mndCheckNodeStatus(SMnode *pMnode) {
if (pEntry->status != TASK_STATUS__READY) { if (pEntry->status != TASK_STATUS__READY) {
mDebug("s-task:0x%" PRIx64 "-0x%x (nodeId:%d) status:%s not ready, checkpoint msg not issued", mDebug("s-task:0x%" PRIx64 "-0x%x (nodeId:%d) status:%s not ready, checkpoint msg not issued",
pEntry->id.streamId, (int32_t)pEntry->id.taskId, 0, streamTaskGetStatusStr(pEntry->status)); pEntry->id.streamId, (int32_t)pEntry->id.taskId, pEntry->nodeId, streamTaskGetStatusStr(pEntry->status));
ready = false; ready = false;
break; break;
} }
@ -1308,9 +1237,10 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) {
return code; return code;
} }
// make sure the time interval between two consecutive checkpoint trans is long enough
SMStreamDoCheckpointMsg *pMsg = (SMStreamDoCheckpointMsg *)pReq->pCont; SMStreamDoCheckpointMsg *pMsg = (SMStreamDoCheckpointMsg *)pReq->pCont;
while ((pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream)) != NULL) { while ((pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream)) != NULL) {
code = mndProcessStreamCheckpointTrans(pMnode, pStream, pMsg->checkpointId); code = mndProcessStreamCheckpointTrans(pMnode, pStream, pMsg->checkpointId, 1, true);
sdbRelease(pSdb, pStream); sdbRelease(pSdb, pStream);
if (code == -1) { if (code == -1) {
break; break;
@ -1347,7 +1277,7 @@ static int32_t mndProcessStreamCheckpointInCandid(SRpcMsg *pReq) {
mDebug("start to launch checkpoint for stream:%s %" PRIx64 " in candidate list", pEntry->pName, pEntry->streamId); mDebug("start to launch checkpoint for stream:%s %" PRIx64 " in candidate list", pEntry->pName, pEntry->streamId);
code = mndProcessStreamCheckpointTrans(pMnode, ps, pEntry->checkpointId); code = mndProcessStreamCheckpointTrans(pMnode, ps, pEntry->checkpointId, 1, true);
mndReleaseStream(pMnode, ps); mndReleaseStream(pMnode, ps);
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
@ -2894,10 +2824,10 @@ static int32_t mndResetStatusFromCheckpoint(SMnode *pMnode, int64_t streamId, in
mDebug("stream:%s (0x%" PRIx64 ") reset checkpoint procedure, transId:%d, create reset trans", pStream->name, mDebug("stream:%s (0x%" PRIx64 ") reset checkpoint procedure, transId:%d, create reset trans", pStream->name,
pStream->uid, transId); pStream->uid, transId);
code = createStreamResetStatusTrans(pMnode, pStream); code = createStreamResetStatusTrans(pMnode, pStream);
mndReleaseStream(pMnode, pStream);
} }
} }
mndReleaseStream(pMnode, pStream);
return code; return code;
} }
@ -2917,79 +2847,14 @@ static SStreamTask *mndGetStreamTask(STaskId *pId, SStreamObj *pStream) {
return NULL; return NULL;
} }
// static bool needDropRelatedFillhistoryTask(STaskStatusEntry *pTaskEntry, SStreamExecInfo *pExecNode) { static int32_t mndGetNumOfStreamTasks(const SStreamObj *pStream) {
// if (pTaskEntry->status == TASK_STATUS__STREAM_SCAN_HISTORY && pTaskEntry->statusLastDuration >= 10) { int32_t num = 0;
// if (!pTaskEntry->inputQChanging && pTaskEntry->inputQUnchangeCounter > 10) { for(int32_t i = 0; i < taosArrayGetSize(pStream->tasks); ++i) {
// int32_t numOfReady = 0; SArray* pLevel = taosArrayGetP(pStream->tasks, i);
// int32_t numOfTotal = 0; num += taosArrayGetSize(pLevel);
// for (int32_t k = 0; k < taosArrayGetSize(pExecNode->pTaskList); ++k) {
// STaskId *pId = taosArrayGet(pExecNode->pTaskList, k);
// if (pTaskEntry->id.streamId == pId->streamId) {
// numOfTotal++;
//
// if (pTaskEntry->id.taskId != pId->taskId) {
// STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, pId, sizeof(*pId));
// if (pEntry->status == TASK_STATUS__READY) {
// numOfReady++;
// }
// }
// }
// }
//
// if (numOfReady > 0) {
// mDebug("stream:0x%" PRIx64
// " %d tasks are ready, %d tasks in stream-scan-history for more than 50s, drop related fill-history
// task", pTaskEntry->id.streamId, numOfReady, numOfTotal - numOfReady);
// return true;
// } else {
// return false;
// }
// }
// }
//
// return false;
// }
// currently only handle the sink task
// 1. sink task, drop related fill-history task msg is missing
// 2. other tasks are in ready state for at least 3 * hb_interval
static int32_t mndDropRelatedFillhistoryTask(SMnode *pMnode, STaskStatusEntry *pTaskEntry, SStreamObj *pStream) {
SStreamTask *pTask = mndGetStreamTask(&pTaskEntry->id, pStream);
if (pTask == NULL) {
mError("failed to get the stream task:0x%x, may have been dropped", (int32_t)pTaskEntry->id.taskId);
return -1;
} }
SVDropHTaskReq *pReq = rpcMallocCont(sizeof(SVDropHTaskReq)); return num;
if (pReq == NULL) {
mError("failed to malloc in drop related fill-history task, size:%" PRIzu ", code:%s", sizeof(SVDropHTaskReq),
tstrerror(TSDB_CODE_OUT_OF_MEMORY));
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
pReq->head.vgId = htonl(pTask->info.nodeId);
pReq->taskId = pTask->id.taskId;
pReq->streamId = pTask->id.streamId;
SRpcMsg msg = {.info.noResp = 1};
initRpcMsg(&msg, TDMT_STREAM_HTASK_DROP, pReq, sizeof(SVDropHTaskReq));
mDebug("build and send drop related fill-history task for task:0x%x", pTask->id.taskId);
SEpSet epset = {0};
bool hasEpset = false;
int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
if (hasEpset) {
tmsgSendReq(&epset, &msg);
}
return TSDB_CODE_SUCCESS;
} }
int32_t setNodeEpsetExpiredFlag(const SArray *pNodeList) { int32_t setNodeEpsetExpiredFlag(const SArray *pNodeList) {
@ -3030,14 +2895,33 @@ static void updateStageInfo(STaskStatusEntry *pTaskEntry, int64_t stage) {
} }
} }
typedef struct SFailedCheckpointInfo {
int64_t streamUid;
int64_t checkpointId;
int32_t transId;
} SFailedCheckpointInfo;
static void addIntoCheckpointList(SArray* pList, const SFailedCheckpointInfo* pInfo) {
int32_t num = taosArrayGetSize(pList);
for(int32_t i = 0; i < num; ++i) {
SFailedCheckpointInfo* p = taosArrayGet(pList, i);
if (p->transId == pInfo->transId) {
return;
}
}
taosArrayPush(pList, pInfo);
}
int32_t mndProcessStreamHb(SRpcMsg *pReq) { int32_t mndProcessStreamHb(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node; SMnode *pMnode = pReq->info.node;
SStreamHbMsg req = {0}; SStreamHbMsg req = {0};
bool checkpointFailed = false; // bool checkpointFailed = false;
int64_t checkpointId = 0; // int64_t checkpointId = 0;
int64_t streamId = 0; // int64_t streamId = 0;
int32_t transId = 0; // int32_t transId = 0;
SArray* pList = taosArrayInit(4, sizeof(SFailedCheckpointInfo));
SDecoder decoder = {0}; SDecoder decoder = {0};
tDecoderInit(&decoder, pReq->pCont, pReq->contLen); tDecoderInit(&decoder, pReq->pCont, pReq->contLen);
@ -3098,19 +2982,13 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
streamTaskStatusCopy(pTaskEntry, p); streamTaskStatusCopy(pTaskEntry, p);
if (p->checkpointId != 0) { if (p->checkpointId != 0) {
if (checkpointId != 0) {
ASSERT(checkpointId == p->checkpointId);
} else {
checkpointId = p->checkpointId;
}
if (p->checkpointFailed) { if (p->checkpointFailed) {
mError("stream task:0x%" PRIx64 " checkpointId:%" PRIx64 " transId:%d failed, kill it", p->id.taskId, mError("stream task:0x%" PRIx64 " checkpointId:%" PRIx64 " transId:%d failed, kill it", p->id.taskId,
p->checkpointId, p->chkpointTransId); p->checkpointId, p->chkpointTransId);
checkpointFailed = p->checkpointFailed; SFailedCheckpointInfo info = {
streamId = p->id.streamId; .transId = p->chkpointTransId, .checkpointId = p->checkpointId, .streamUid = p->id.streamId};
transId = p->chkpointTransId; addIntoCheckpointList(pList, &info);
} }
} }
} }
@ -3129,15 +3007,20 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
// current checkpoint is failed, rollback from the checkpoint trans // current checkpoint is failed, rollback from the checkpoint trans
// kill the checkpoint trans and then set all tasks status to be normal // kill the checkpoint trans and then set all tasks status to be normal
if (checkpointFailed && checkpointId != 0) { if (taosArrayGetSize(pList) > 0) {
bool allReady = true; bool allReady = true;
SArray *p = mndTakeVgroupSnapshot(pMnode, &allReady); SArray *p = mndTakeVgroupSnapshot(pMnode, &allReady);
taosArrayDestroy(p); taosArrayDestroy(p);
if (allReady || snodeChanged) { if (allReady || snodeChanged) {
// if the execInfo.activeCheckpoint == 0, the checkpoint is restoring from wal // if the execInfo.activeCheckpoint == 0, the checkpoint is restoring from wal
mInfo("checkpointId:%" PRId64 " failed, issue task-reset trans to reset all tasks status", checkpointId); for(int32_t i = 0; i < taosArrayGetSize(pList); ++i) {
mndResetStatusFromCheckpoint(pMnode, streamId, transId); SFailedCheckpointInfo *pInfo = taosArrayGet(pList, i);
mInfo("checkpointId:%" PRId64 " transId:%d failed, issue task-reset trans to reset all tasks status",
pInfo->checkpointId, pInfo->transId);
mndResetStatusFromCheckpoint(pMnode, pInfo->streamUid, pInfo->transId);
}
} else { } else {
mInfo("not all vgroups are ready, wait for next HB from stream tasks to reset the task status"); mInfo("not all vgroups are ready, wait for next HB from stream tasks to reset the task status");
} }
@ -3146,6 +3029,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
taosThreadMutexUnlock(&execInfo.lock); taosThreadMutexUnlock(&execInfo.lock);
streamMetaClearHbMsg(&req); streamMetaClearHbMsg(&req);
taosArrayDestroy(pList);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -3154,6 +3038,11 @@ void freeCheckpointCandEntry(void *param) {
taosMemoryFreeClear(pEntry->pName); taosMemoryFreeClear(pEntry->pName);
} }
void freeTaskList(void* param) {
SArray** pList = (SArray **)param;
taosArrayDestroy(*pList);
}
SStreamObj *mndGetStreamObj(SMnode *pMnode, int64_t streamId) { SStreamObj *mndGetStreamObj(SMnode *pMnode, int64_t streamId) {
void *pIter = NULL; void *pIter = NULL;
SSdb *pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;
@ -3164,7 +3053,79 @@ SStreamObj *mndGetStreamObj(SMnode *pMnode, int64_t streamId) {
sdbCancelFetch(pSdb, pIter); sdbCancelFetch(pSdb, pIter);
return pStream; return pStream;
} }
sdbRelease(pSdb, pStream);
} }
return NULL; return NULL;
} }
static void doAddTaskId(SArray* pList, int32_t taskId, int64_t uid, int32_t numOfTotal) {
int32_t num = taosArrayGetSize(pList);
for(int32_t i = 0; i < num; ++i) {
int32_t* pId = taosArrayGet(pList, i);
if (taskId == *pId) {
return;
}
}
taosArrayPush(pList, &taskId);
int32_t numOfTasks = taosArrayGetSize(pList);
mDebug("stream:0x%" PRIx64 " receive %d reqs for checkpoint, remain:%d", uid, numOfTasks, numOfTotal - numOfTasks);
}
int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
SStreamTaskCheckpointReq req = {0};
SDecoder decoder = {0};
tDecoderInit(&decoder, pReq->pCont, pReq->contLen);
if (tDecodeStreamTaskCheckpointReq(&decoder, &req)) {
tDecoderClear(&decoder);
terrno = TSDB_CODE_INVALID_MSG;
mError("invalid task checkpoint req msg received");
return -1;
}
tDecoderClear(&decoder);
mDebug("receive stream task checkpoint req msg, vgId:%d, s-task:0x%x", req.nodeId, req.taskId);
// register to the stream task done map, if all tasks has sent this kinds of message, start the checkpoint trans.
taosThreadMutexLock(&execInfo.lock);
SStreamObj *pStream = mndGetStreamObj(pMnode, req.streamId);
int32_t numOfTasks = mndGetNumOfStreamTasks(pStream);
SArray **pReqTaskList = (SArray**)taosHashGet(execInfo.pTransferStateStreams, &req.streamId, sizeof(req.streamId));
if (pReqTaskList == NULL) {
SArray *pList = taosArrayInit(4, sizeof(int32_t));
doAddTaskId(pList, req.taskId, pStream->uid, numOfTasks);
taosHashPut(execInfo.pTransferStateStreams, &req.streamId, sizeof(int64_t), &pList, sizeof(void *));
pReqTaskList = (SArray**)taosHashGet(execInfo.pTransferStateStreams, &req.streamId, sizeof(req.streamId));
} else {
doAddTaskId(*pReqTaskList, req.taskId, pStream->uid, numOfTasks);
}
int32_t total = taosArrayGetSize(*pReqTaskList);
if (total == numOfTasks) { // all tasks has send the reqs
int64_t checkpointId = mndStreamGenChkpId(pMnode);
mDebug("stream:0x%" PRIx64 " all tasks req, start checkpointId:%" PRId64, pStream->uid, checkpointId);
// TODO:handle error
int32_t code = mndProcessStreamCheckpointTrans(pMnode, pStream, checkpointId, 0, false);
// remove this entry
taosHashRemove(execInfo.pTransferStateStreams, &req.streamId, sizeof(int64_t));
int32_t numOfStreams = taosHashGetSize(execInfo.pTransferStateStreams);
mDebug("stream:0x%" PRIx64 " removed, remain streams:%d fill-history not completed", pStream->uid, numOfStreams);
}
mndReleaseStream(pMnode, pStream);
taosThreadMutexUnlock(&execInfo.lock);
return 0;
}

View File

@ -235,7 +235,6 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
int32_t tqProcessTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskDropHTask(STQ* pTq, SRpcMsg* pMsg);
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver); int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver);
int32_t tqScanWal(STQ* pTq); int32_t tqScanWal(STQ* pTq);

View File

@ -97,7 +97,7 @@ void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo) {
} }
if (pItem->pStreamTask) { if (pItem->pStreamTask) {
tFreeStreamTask(pItem->pStreamTask); tFreeStreamTask(pItem->pStreamTask, true);
} }
taosArrayDestroy(pItem->pResList); taosArrayDestroy(pItem->pResList);
tdRSmaQTaskInfoFree(&pInfo->taskInfo[i], SMA_VID(pSma), i + 1); tdRSmaQTaskInfoFree(&pInfo->taskInfo[i], SMA_VID(pSma), i + 1);

View File

@ -886,7 +886,8 @@ static void doStartFillhistoryStep2(SStreamTask* pTask, SStreamTask* pStreamTask
pTask->execInfo.step2Start = taosGetTimestampMs(); pTask->execInfo.step2Start = taosGetTimestampMs();
if (done) { if (done) {
qDebug("s-task:%s scan-history from WAL stage(step 2) ended, elapsed time:%.2fs", id, 0.0); qDebug("s-task:%s scan wal(step 2) verRange:%" PRId64 "-%" PRId64 " ended, elapsed time:%.2fs", id, pRange->minVer,
pRange->maxVer, 0.0);
streamTaskPutTranstateIntoInputQ(pTask); streamTaskPutTranstateIntoInputQ(pTask);
streamExecTask(pTask); // exec directly streamExecTask(pTask); // exec directly
} else { } else {
@ -1141,8 +1142,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, req.taskId); SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, req.taskId);
if (pTask == NULL) { if (pTask == NULL) {
tqError("vgId:%d failed to find s-task:0x%x, ignore checkpoint msg. it may have been destroyed already", vgId, tqError("vgId:%d failed to find s-task:0x%x, ignore checkpoint msg. it may have been destroyed", vgId, req.taskId);
req.taskId);
SRpcMsg rsp = {0}; SRpcMsg rsp = {0};
buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, 0); buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, 0);
tmsgSendRsp(&rsp); // error occurs tmsgSendRsp(&rsp); // error occurs
@ -1169,8 +1169,9 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
taosThreadMutexLock(&pTask->lock); taosThreadMutexLock(&pTask->lock);
ETaskStatus status = streamTaskGetStatus(pTask)->state; ETaskStatus status = streamTaskGetStatus(pTask)->state;
if (req.mndTrigger == 1) {
if (status == TASK_STATUS__HALT || status == TASK_STATUS__PAUSE) { if (status == TASK_STATUS__HALT || status == TASK_STATUS__PAUSE) {
tqError("s-task:%s not ready for checkpoint, since it is halt, ignore this checkpoint:%" PRId64 ", set it failure", tqError("s-task:%s not ready for checkpoint, since it is halt, ignore checkpoint:%" PRId64 ", set it failure",
pTask->id.idStr, req.checkpointId); pTask->id.idStr, req.checkpointId);
taosThreadMutexUnlock(&pTask->lock); taosThreadMutexUnlock(&pTask->lock);
@ -1182,6 +1183,9 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
} else {
ASSERT(status == TASK_STATUS__HALT);
}
// check if the checkpoint msg already sent or not. // check if the checkpoint msg already sent or not.
if (status == TASK_STATUS__CK) { if (status == TASK_STATUS__CK) {
@ -1198,16 +1202,8 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
streamProcessCheckpointSourceReq(pTask, &req); streamProcessCheckpointSourceReq(pTask, &req);
taosThreadMutexUnlock(&pTask->lock); taosThreadMutexUnlock(&pTask->lock);
int32_t total = 0; qInfo("s-task:%s (vgId:%d) level:%d receive checkpoint-source msg chkpt:%" PRId64 ", transId:%d",
streamMetaWLock(pMeta); pTask->id.idStr, vgId, pTask->info.taskLevel, req.checkpointId, req.transId);
// set the initial value for generating check point
// set the mgmt epset info according to the checkout source msg from mnode, todo update mgmt epset if needed
total = pMeta->numOfStreamTasks;
streamMetaWUnLock(pMeta);
qInfo("s-task:%s (vgId:%d) level:%d receive checkpoint-source msg chkpt:%" PRId64 ", total checkpoint reqs:%d",
pTask->id.idStr, vgId, pTask->info.taskLevel, req.checkpointId, total);
code = streamAddCheckpointSourceRspMsg(&req, &pMsg->info, pTask, 1); code = streamAddCheckpointSourceRspMsg(&req, &pMsg->info, pTask, 1);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
@ -1233,35 +1229,3 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg) {
return tqStreamTaskProcessTaskResetReq(pTq->pStreamMeta, pMsg); return tqStreamTaskProcessTaskResetReq(pTq->pStreamMeta, pMsg);
} }
// NOTE: here we may receive this message more than once, so need to handle this case
int32_t tqProcessTaskDropHTask(STQ* pTq, SRpcMsg* pMsg) {
SVDropHTaskReq* pReq = (SVDropHTaskReq*)pMsg->pCont;
SStreamMeta* pMeta = pTq->pStreamMeta;
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId);
if (pTask == NULL) {
tqError("vgId:%d process drop fill-history task req, failed to acquire task:0x%x, it may have been dropped already",
pMeta->vgId, pReq->taskId);
return TSDB_CODE_SUCCESS;
}
tqDebug("s-task:%s receive drop fill-history msg from mnode", pTask->id.idStr);
if (pTask->hTaskInfo.id.taskId == 0) {
tqError("vgId:%d s-task:%s not have related fill-history task", pMeta->vgId, pTask->id.idStr);
streamMetaReleaseTask(pMeta, pTask);
return TSDB_CODE_SUCCESS;
}
taosThreadMutexLock(&pTask->lock);
SStreamTaskId id = {.streamId = pTask->hTaskInfo.id.streamId, .taskId = pTask->hTaskInfo.id.taskId};
streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &id);
taosThreadMutexUnlock(&pTask->lock);
// clear the scheduler status
streamTaskSetSchedStatusInactive(pTask);
tqDebug("s-task:%s set scheduler status:%d after drop fill-history task", pTask->id.idStr, pTask->status.schedStatus);
streamMetaReleaseTask(pMeta, pTask);
return TSDB_CODE_SUCCESS;
}

View File

@ -465,7 +465,7 @@ bool tqNextBlockImpl(STqReader* pReader, const char* idstr) {
int32_t numOfBlocks = taosArrayGetSize(pReader->submit.aSubmitTbData); int32_t numOfBlocks = taosArrayGetSize(pReader->submit.aSubmitTbData);
while (pReader->nextBlk < numOfBlocks) { while (pReader->nextBlk < numOfBlocks) {
tqDebug("try next data block, len:%d ver:%" PRId64 " index:%d/%d, %s", pReader->msg.msgLen, pReader->msg.ver, tqDebug("try next data block, len:%d ver:%" PRId64 " index:%d/%d, %s", pReader->msg.msgLen, pReader->msg.ver,
pReader->nextBlk, numOfBlocks, idstr); (pReader->nextBlk + 1), numOfBlocks, idstr);
SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk); SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
if (pReader->tbIdHash == NULL) { if (pReader->tbIdHash == NULL) {

View File

@ -617,7 +617,7 @@ int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, SMsgCb* cb, int64_t sve
if (code < 0) { if (code < 0) {
tqError("failed to add s-task:0x%x into vgId:%d meta, total:%d, code:%s", vgId, taskId, numOfTasks, tqError("failed to add s-task:0x%x into vgId:%d meta, total:%d, code:%s", vgId, taskId, numOfTasks,
tstrerror(code)); tstrerror(code));
tFreeStreamTask(pTask); tFreeStreamTask(pTask, true);
return code; return code;
} }
@ -645,7 +645,7 @@ int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, SMsgCb* cb, int64_t sve
} }
} else { } else {
tqWarn("vgId:%d failed to add s-task:0x%x, since already exists in meta store", vgId, taskId); tqWarn("vgId:%d failed to add s-task:0x%x, since already exists in meta store", vgId, taskId);
tFreeStreamTask(pTask); tFreeStreamTask(pTask, true);
} }
return code; return code;
@ -663,7 +663,8 @@ int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
STaskId* pHTaskId = &pTask->hTaskInfo.id; STaskId* pHTaskId = &pTask->hTaskInfo.id;
streamMetaUnregisterTask(pMeta, pHTaskId->streamId, pHTaskId->taskId); streamMetaUnregisterTask(pMeta, pHTaskId->streamId, pHTaskId->taskId);
tqDebug("vgId:%d drop fill-history task:0x%x dropped firstly", vgId, (int32_t)pHTaskId->taskId); tqDebug("s-task:0x%x vgId:%d drop fill-history task:0x%x firstly", pReq->taskId, vgId,
(int32_t)pHTaskId->taskId);
} }
streamMetaReleaseTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pTask);
} }

View File

@ -600,11 +600,6 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t ver, SRpcMsg
tqProcessTaskResetReq(pVnode->pTq, pMsg); tqProcessTaskResetReq(pVnode->pTq, pMsg);
} }
} break; } break;
case TDMT_STREAM_HTASK_DROP: {
if (pVnode->restored && vnodeIsLeader(pVnode)) {
tqProcessTaskDropHTask(pVnode->pTq, pMsg);
}
} break;
case TDMT_VND_ALTER_CONFIRM: case TDMT_VND_ALTER_CONFIRM:
needCommit = pVnode->config.hashChange; needCommit = pVnode->config.hashChange;
if (vnodeProcessAlterConfirmReq(pVnode, ver, pReq, len, pRsp) < 0) { if (vnodeProcessAlterConfirmReq(pVnode, ver, pReq, len, pRsp) < 0) {

View File

@ -2142,7 +2142,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
pTSInfo->base.cond.startVersion = pStreamInfo->fillHistoryVer.minVer; pTSInfo->base.cond.startVersion = pStreamInfo->fillHistoryVer.minVer;
pTSInfo->base.cond.endVersion = pStreamInfo->fillHistoryVer.maxVer; pTSInfo->base.cond.endVersion = pStreamInfo->fillHistoryVer.maxVer;
pTSInfo->base.cond.twindows = pStreamInfo->fillHistoryWindow; pTSInfo->base.cond.twindows = pStreamInfo->fillHistoryWindow;
qDebug("stream recover step2, verRange:%" PRId64 " - %" PRId64 ", window:%" PRId64 "-%" PRId64 ", %s", qDebug("stream scan step2 (scan wal), verRange:%" PRId64 " - %" PRId64 ", window:%" PRId64 "-%" PRId64 ", %s",
pTSInfo->base.cond.startVersion, pTSInfo->base.cond.endVersion, pTSInfo->base.cond.twindows.skey, pTSInfo->base.cond.startVersion, pTSInfo->base.cond.endVersion, pTSInfo->base.cond.twindows.skey,
pTSInfo->base.cond.twindows.ekey, id); pTSInfo->base.cond.twindows.ekey, id);
pStreamInfo->recoverStep = STREAM_RECOVER_STEP__NONE; pStreamInfo->recoverStep = STREAM_RECOVER_STEP__NONE;

View File

@ -56,13 +56,6 @@ struct SStreamTaskSM {
SArray* pWaitingEventList; SArray* pWaitingEventList;
}; };
typedef struct SStreamEventInfo {
EStreamTaskEvent event;
const char* name;
} SStreamEventInfo;
// SStreamTaskSM* streamCreateStateMachine(SStreamTask* pTask);
// void* streamDestroyStateMachine(SStreamTaskSM* pSM);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

View File

@ -1768,8 +1768,8 @@ void taskDbInitOpt(STaskDbWrapper* pTaskDb) {
rocksdb_options_set_recycle_log_file_num(opts, 6); rocksdb_options_set_recycle_log_file_num(opts, 6);
rocksdb_options_set_max_write_buffer_number(opts, 3); rocksdb_options_set_max_write_buffer_number(opts, 3);
rocksdb_options_set_info_log_level(opts, 1); rocksdb_options_set_info_log_level(opts, 1);
rocksdb_options_set_db_write_buffer_size(opts, 64 << 20); rocksdb_options_set_db_write_buffer_size(opts, 256 << 20);
rocksdb_options_set_write_buffer_size(opts, 32 << 20); rocksdb_options_set_write_buffer_size(opts, 128 << 20);
rocksdb_options_set_atomic_flush(opts, 1); rocksdb_options_set_atomic_flush(opts, 1);
pTaskDb->dbOpt = opts; pTaskDb->dbOpt = opts;
@ -1780,6 +1780,7 @@ void taskDbInitOpt(STaskDbWrapper* pTaskDb) {
rocksdb_options_set_compaction_filter_factory(pTaskDb->dbOpt, pTaskDb->filterFactory); rocksdb_options_set_compaction_filter_factory(pTaskDb->dbOpt, pTaskDb->filterFactory);
pTaskDb->readOpt = rocksdb_readoptions_create(); pTaskDb->readOpt = rocksdb_readoptions_create();
pTaskDb->writeOpt = rocksdb_writeoptions_create(); pTaskDb->writeOpt = rocksdb_writeoptions_create();
rocksdb_writeoptions_disable_WAL(pTaskDb->writeOpt, 1);
size_t nCf = sizeof(ginitDict) / sizeof(ginitDict[0]); size_t nCf = sizeof(ginitDict) / sizeof(ginitDict[0]);
pTaskDb->pCf = taosMemoryCalloc(nCf, sizeof(rocksdb_column_family_handle_t*)); pTaskDb->pCf = taosMemoryCalloc(nCf, sizeof(rocksdb_column_family_handle_t*));

View File

@ -36,6 +36,7 @@ int32_t tEncodeStreamCheckpointSourceReq(SEncoder* pEncoder, const SStreamCheckp
if (tEncodeI32(pEncoder, pReq->mnodeId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->mnodeId) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->expireTime) < 0) return -1; if (tEncodeI64(pEncoder, pReq->expireTime) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->transId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->transId) < 0) return -1;
if (tEncodeI8(pEncoder, pReq->mndTrigger) < 0) return -1;
tEndEncode(pEncoder); tEndEncode(pEncoder);
return pEncoder->pos; return pEncoder->pos;
} }
@ -50,6 +51,7 @@ int32_t tDecodeStreamCheckpointSourceReq(SDecoder* pDecoder, SStreamCheckpointSo
if (tDecodeI32(pDecoder, &pReq->mnodeId) < 0) return -1; if (tDecodeI32(pDecoder, &pReq->mnodeId) < 0) return -1;
if (tDecodeI64(pDecoder, &pReq->expireTime) < 0) return -1; if (tDecodeI64(pDecoder, &pReq->expireTime) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->transId) < 0) return -1; if (tDecodeI32(pDecoder, &pReq->transId) < 0) return -1;
if (tDecodeI8(pDecoder, &pReq->mndTrigger) < 0) return -1;
tEndDecode(pDecoder); tEndDecode(pDecoder);
return 0; return 0;
} }
@ -151,7 +153,8 @@ int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSo
// todo this status may not be set here. // todo this status may not be set here.
// 1. set task status to be prepared for check point, no data are allowed to put into inputQ. // 1. set task status to be prepared for check point, no data are allowed to put into inputQ.
streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_GEN_CHECKPOINT); int32_t code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_GEN_CHECKPOINT);
ASSERT(code == TSDB_CODE_SUCCESS);
pTask->chkInfo.transId = pReq->transId; pTask->chkInfo.transId = pReq->transId;
pTask->chkInfo.checkpointingId = pReq->checkpointId; pTask->chkInfo.checkpointingId = pReq->checkpointId;
@ -160,8 +163,7 @@ int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSo
pTask->execInfo.checkpoint += 1; pTask->execInfo.checkpoint += 1;
// 2. Put the checkpoint block into inputQ, to make sure all blocks with less version have been handled by this task // 2. Put the checkpoint block into inputQ, to make sure all blocks with less version have been handled by this task
int32_t code = appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT_TRIGGER); return appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT_TRIGGER);
return code;
} }
static int32_t continueDispatchCheckpointBlock(SStreamDataBlock* pBlock, SStreamTask* pTask) { static int32_t continueDispatchCheckpointBlock(SStreamDataBlock* pBlock, SStreamTask* pTask) {
@ -459,6 +461,7 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) {
int64_t startTs = pTask->chkInfo.startTs; int64_t startTs = pTask->chkInfo.startTs;
int64_t ckId = pTask->chkInfo.checkpointingId; int64_t ckId = pTask->chkInfo.checkpointingId;
const char* id = pTask->id.idStr; const char* id = pTask->id.idStr;
bool dropRelHTask = (streamTaskGetPrevStatus(pTask) == TASK_STATUS__HALT);
// sink task do not need to save the status, and generated the checkpoint // sink task do not need to save the status, and generated the checkpoint
if (pTask->info.taskLevel != TASK_LEVEL__SINK) { if (pTask->info.taskLevel != TASK_LEVEL__SINK) {
@ -497,6 +500,21 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) {
} }
} }
if ((code == TSDB_CODE_SUCCESS) && dropRelHTask) {
// transferred from the halt status, it is done the fill-history procedure and finish with the checkpoint
// free it and remove fill-history task from disk meta-store
taosThreadMutexLock(&pTask->lock);
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
SStreamTaskId hTaskId = {.streamId = pTask->hTaskInfo.id.streamId, .taskId = pTask->hTaskInfo.id.taskId};
stDebug("s-task:%s fill-history finish checkpoint done, drop related fill-history task:0x%x", id, hTaskId.taskId);
streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pTask->pMeta->vgId, &hTaskId);
} else {
stWarn("s-task:%s related fill-history task:0x%x is erased", id, (int32_t)pTask->hTaskInfo.id.taskId);
}
taosThreadMutexUnlock(&pTask->lock);
}
// clear the checkpoint info if failed // clear the checkpoint info if failed
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
taosThreadMutexLock(&pTask->lock); taosThreadMutexLock(&pTask->lock);

View File

@ -340,7 +340,7 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
} else { } else {
double el = (taosGetTimestampMs() - pTask->execInfo.step2Start) / 1000.; double el = (taosGetTimestampMs() - pTask->execInfo.step2Start) / 1000.;
stDebug( stDebug(
"s-task:%s fill-history task end, scal wal elapsed time:%.2fSec,update related stream task:%s info, transfer " "s-task:%s fill-history task end, scan wal elapsed time:%.2fSec,update related stream task:%s info, transfer "
"exec state", "exec state",
id, el, pStreamTask->id.idStr); id, el, pStreamTask->id.idStr);
} }
@ -380,56 +380,34 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
return TSDB_CODE_STREAM_TASK_IVLD_STATUS; return TSDB_CODE_STREAM_TASK_IVLD_STATUS;
} }
// 1. expand the query time window for stream task of WAL scanner
if (pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) { if (pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) {
// update the scan data range for source task. // update the scan data range for source task.
stDebug("s-task:%s level:%d stream task window %" PRId64 " - %" PRId64 " update to %" PRId64 " - %" PRId64 stDebug("s-task:%s level:%d stream task window %" PRId64 " - %" PRId64 " update to %" PRId64 " - %" PRId64
", status:%s, sched-status:%d", ", status:%s, sched-status:%d",
pStreamTask->id.idStr, TASK_LEVEL__SOURCE, pTimeWindow->skey, pTimeWindow->ekey, INT64_MIN, pStreamTask->id.idStr, TASK_LEVEL__SOURCE, pTimeWindow->skey, pTimeWindow->ekey, INT64_MIN,
pTimeWindow->ekey, p, pStreamTask->status.schedStatus); pTimeWindow->ekey, p, pStreamTask->status.schedStatus);
} else {
stDebug("s-task:%s no need to update time window for non-source task", pStreamTask->id.idStr);
}
// 1. expand the query time window for stream task of WAL scanner
if (pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) {
pTimeWindow->skey = INT64_MIN; pTimeWindow->skey = INT64_MIN;
qStreamInfoResetTimewindowFilter(pStreamTask->exec.pExecutor); qStreamInfoResetTimewindowFilter(pStreamTask->exec.pExecutor);
} else { } else {
stDebug("s-task:%s non-source task no need to reset filter window", pStreamTask->id.idStr); stDebug("s-task:%s no need to update/reset filter time window for non-source tasks", pStreamTask->id.idStr);
} }
// 2. transfer the ownership of executor state // 2. transfer the ownership of executor state
streamTaskReleaseState(pTask); streamTaskReleaseState(pTask);
streamTaskReloadState(pStreamTask); streamTaskReloadState(pStreamTask);
// 3. resume the state of stream task, after this function, the stream task will run immediately. // 3. send msg to mnode to launch a checkpoint to keep the state for current stream
streamTaskResume(pStreamTask); streamTaskSendCheckpointReq(pStreamTask);
// streamTaskResume(pStreamTask);
stDebug("s-task:%s fill-history task set status to be dropping, save the state into disk", id); // 4. assign the status to the value that will be kept in disk
// 4. free it and remove fill-history task from disk meta-store
streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &pTask->id);
// 5. assign the status to the value that will be kept in disk
pStreamTask->status.taskStatus = streamTaskGetStatus(pStreamTask)->state; pStreamTask->status.taskStatus = streamTaskGetStatus(pStreamTask)->state;
// 6. open the inputQ for all upstream tasks // 5. open the inputQ for all upstream tasks
streamTaskOpenAllUpstreamInput(pStreamTask); streamTaskOpenAllUpstreamInput(pStreamTask);
// 7. add empty delete block
if ((pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) && taosQueueEmpty(pStreamTask->inputq.queue->pQueue)) {
SStreamRefDataBlock* pItem = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0);
SSDataBlock* pDelBlock = createSpecialDataBlock(STREAM_DELETE_DATA);
pDelBlock->info.rows = 0;
pDelBlock->info.version = 0;
pItem->type = STREAM_INPUT__REF_DATA_BLOCK;
pItem->pBlock = pDelBlock;
int32_t code = streamTaskPutDataIntoInputQ(pStreamTask, (SStreamQueueItem*)pItem);
stDebug("s-task:%s append dummy delete block,res:%d", pStreamTask->id.idStr, code);
}
streamSchedExec(pStreamTask);
streamMetaReleaseTask(pMeta, pStreamTask); streamMetaReleaseTask(pMeta, pStreamTask);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -447,14 +425,24 @@ int32_t streamTransferStateToStreamTask(SStreamTask* pTask) {
if (level == TASK_LEVEL__AGG || level == TASK_LEVEL__SOURCE) { // do transfer task operator states. if (level == TASK_LEVEL__AGG || level == TASK_LEVEL__SOURCE) { // do transfer task operator states.
code = streamDoTransferStateToStreamTask(pTask); code = streamDoTransferStateToStreamTask(pTask);
} else { // drop fill-history task and open inputQ of sink task } else { // no state transfer for sink tasks, and drop fill-history task, followed by opening inputQ of sink task.
SStreamTask* pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.streamId, pTask->streamTaskId.taskId); SStreamTask* pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.streamId, pTask->streamTaskId.taskId);
if (pStreamTask != NULL) { if (pStreamTask != NULL) {
streamTaskOpenAllUpstreamInput(pStreamTask); // halt the related stream sink task
code = streamTaskHandleEvent(pStreamTask->status.pSM, TASK_EVENT_HALT);
if (code != TSDB_CODE_SUCCESS) {
stError("s-task:%s halt stream task:%s failed, code:%s not transfer state to stream task", pTask->id.idStr,
pStreamTask->id.idStr, tstrerror(code));
streamMetaReleaseTask(pMeta, pStreamTask); streamMetaReleaseTask(pMeta, pStreamTask);
return code;
} else {
stDebug("s-task:%s halt by related fill-history task:%s", pStreamTask->id.idStr, pTask->id.idStr);
} }
streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &pTask->id); streamTaskOpenAllUpstreamInput(pStreamTask);
streamTaskSendCheckpointReq(pStreamTask);
streamMetaReleaseTask(pMeta, pStreamTask);
}
} }
return code; return code;
@ -718,7 +706,8 @@ bool streamTaskReadyToRun(const SStreamTask* pTask, char** pStatus) {
return (st == TASK_STATUS__READY || st == TASK_STATUS__SCAN_HISTORY || st == TASK_STATUS__CK || return (st == TASK_STATUS__READY || st == TASK_STATUS__SCAN_HISTORY || st == TASK_STATUS__CK ||
st == TASK_STATUS__PAUSE || st == TASK_STATUS__HALT); st == TASK_STATUS__PAUSE || st == TASK_STATUS__HALT);
} else { } else {
return (st == TASK_STATUS__READY || st == TASK_STATUS__SCAN_HISTORY || st == TASK_STATUS__CK); return (st == TASK_STATUS__READY || st == TASK_STATUS__SCAN_HISTORY || st == TASK_STATUS__CK ||
st == TASK_STATUS__HALT);
} }
} }
@ -771,8 +760,7 @@ static int32_t schedTaskInFuture(SStreamTask* pTask) {
pTask->status.schedIdleTime, ref); pTask->status.schedIdleTime, ref);
// add one ref count for task // add one ref count for task
// todo this may be failed, and add ref may be failed. /*SStreamTask* pAddRefTask = */streamMetaAcquireOneTask(pTask);
SStreamTask* pAddRefTask = streamMetaAcquireTask(pTask->pMeta, pTask->id.streamId, pTask->id.taskId);
if (pTask->schedInfo.pIdleTimer == NULL) { if (pTask->schedInfo.pIdleTimer == NULL) {
pTask->schedInfo.pIdleTimer = taosTmrStart(doStreamExecTaskHelper, pTask->status.schedIdleTime, pTask, streamTimer); pTask->schedInfo.pIdleTimer = taosTmrStart(doStreamExecTaskHelper, pTask->status.schedIdleTime, pTask, streamTimer);
@ -791,18 +779,10 @@ int32_t streamResumeTask(SStreamTask* pTask) {
/*int32_t code = */ doStreamExecTask(pTask); /*int32_t code = */ doStreamExecTask(pTask);
taosThreadMutexLock(&pTask->lock); taosThreadMutexLock(&pTask->lock);
// check if this task needs to be idle for a while
if (pTask->status.schedIdleTime > 0) {
schedTaskInFuture(pTask);
taosThreadMutexUnlock(&pTask->lock);
setLastExecTs(pTask, taosGetTimestampMs());
return 0;
} else {
int32_t numOfItems = streamQueueGetNumOfItems(pTask->inputq.queue); int32_t numOfItems = streamQueueGetNumOfItems(pTask->inputq.queue);
if ((numOfItems == 0) || streamTaskShouldStop(pTask) || streamTaskShouldPause(pTask)) { if ((numOfItems == 0) || streamTaskShouldStop(pTask) || streamTaskShouldPause(pTask)) {
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
clearTaskSchedInfo(pTask);
taosThreadMutexUnlock(&pTask->lock); taosThreadMutexUnlock(&pTask->lock);
setLastExecTs(pTask, taosGetTimestampMs()); setLastExecTs(pTask, taosGetTimestampMs());
@ -812,6 +792,14 @@ int32_t streamResumeTask(SStreamTask* pTask) {
pTask->status.schedStatus, pTask->status.lastExecTs); pTask->status.schedStatus, pTask->status.lastExecTs);
return 0; return 0;
} else {
// check if this task needs to be idle for a while
if (pTask->status.schedIdleTime > 0) {
schedTaskInFuture(pTask);
taosThreadMutexUnlock(&pTask->lock);
setLastExecTs(pTask, taosGetTimestampMs());
return 0;
} }
} }

View File

@ -467,7 +467,6 @@ void streamMetaClear(SStreamMeta* pMeta) {
} }
taosRemoveRef(streamBackendId, pMeta->streamBackendRid); taosRemoveRef(streamBackendId, pMeta->streamBackendRid);
taosHashClear(pMeta->pTasksMap); taosHashClear(pMeta->pTasksMap);
taosArrayClear(pMeta->pTaskList); taosArrayClear(pMeta->pTaskList);
@ -505,7 +504,9 @@ void streamMetaCloseImpl(void* arg) {
return; return;
} }
streamMetaWLock(pMeta);
streamMetaClear(pMeta); streamMetaClear(pMeta);
streamMetaWUnLock(pMeta);
tdbAbort(pMeta->db, pMeta->txn); tdbAbort(pMeta->db, pMeta->txn);
tdbTbClose(pMeta->pTaskDb); tdbTbClose(pMeta->pTaskDb);
@ -519,7 +520,6 @@ void streamMetaCloseImpl(void* arg) {
taosHashCleanup(pMeta->pTasksMap); taosHashCleanup(pMeta->pTasksMap);
taosHashCleanup(pMeta->pTaskDbUnique); taosHashCleanup(pMeta->pTaskDbUnique);
taosHashCleanup(pMeta->pUpdateTaskSet); taosHashCleanup(pMeta->pUpdateTaskSet);
// taosHashCleanup(pMeta->pTaskBackendUnique);
taosHashCleanup(pMeta->updateInfo.pTasks); taosHashCleanup(pMeta->updateInfo.pTasks);
taosHashCleanup(pMeta->startInfo.pReadyTaskSet); taosHashCleanup(pMeta->startInfo.pReadyTaskSet);
taosHashCleanup(pMeta->startInfo.pFailedTaskSet); taosHashCleanup(pMeta->startInfo.pFailedTaskSet);
@ -534,6 +534,8 @@ void streamMetaCloseImpl(void* arg) {
bkdMgtDestroy(pMeta->bkdChkptMgt); bkdMgtDestroy(pMeta->bkdChkptMgt);
pMeta->role = NODE_ROLE_UNINIT; pMeta->role = NODE_ROLE_UNINIT;
taosThreadRwlockDestroy(&pMeta->lock);
taosMemoryFree(pMeta); taosMemoryFree(pMeta);
stDebug("end to close stream meta"); stDebug("end to close stream meta");
} }
@ -597,19 +599,19 @@ int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTa
} }
if (pMeta->expandFunc(pMeta->ahandle, pTask, ver) < 0) { if (pMeta->expandFunc(pMeta->ahandle, pTask, ver) < 0) {
tFreeStreamTask(pTask); tFreeStreamTask(pTask, false);
return -1; return -1;
} }
taosArrayPush(pMeta->pTaskList, &pTask->id); taosArrayPush(pMeta->pTaskList, &pTask->id);
if (streamMetaSaveTask(pMeta, pTask) < 0) { if (streamMetaSaveTask(pMeta, pTask) < 0) {
tFreeStreamTask(pTask); tFreeStreamTask(pTask, false);
return -1; return -1;
} }
if (streamMetaCommit(pMeta) < 0) { if (streamMetaCommit(pMeta) < 0) {
tFreeStreamTask(pTask); tFreeStreamTask(pTask, false);
return -1; return -1;
} }
@ -647,13 +649,19 @@ SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int64_t streamId, int32_t
return p; return p;
} }
SStreamTask* streamMetaAcquireOneTask(SStreamTask* pTask) {
int32_t ref = atomic_add_fetch_32(&pTask->refCnt, 1);
stTrace("s-task:%s acquire task, ref:%d", pTask->id.idStr, ref);
return pTask;
}
void streamMetaReleaseTask(SStreamMeta* UNUSED_PARAM(pMeta), SStreamTask* pTask) { void streamMetaReleaseTask(SStreamMeta* UNUSED_PARAM(pMeta), SStreamTask* pTask) {
int32_t ref = atomic_sub_fetch_32(&pTask->refCnt, 1); int32_t ref = atomic_sub_fetch_32(&pTask->refCnt, 1);
if (ref > 0) { if (ref > 0) {
stTrace("s-task:%s release task, ref:%d", pTask->id.idStr, ref); stTrace("s-task:%s release task, ref:%d", pTask->id.idStr, ref);
} else if (ref == 0) { } else if (ref == 0) {
stTrace("s-task:%s all refs are gone, free it", pTask->id.idStr); stTrace("s-task:%s all refs are gone, free it", pTask->id.idStr);
tFreeStreamTask(pTask); tFreeStreamTask(pTask, true);
} else if (ref < 0) { } else if (ref < 0) {
stError("task ref is invalid, ref:%d, %s", ref, pTask->id.idStr); stError("task ref is invalid, ref:%d, %s", ref, pTask->id.idStr);
} }
@ -724,14 +732,16 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t
pTask = *ppTask; pTask = *ppTask;
// it is an fill-history task, remove the related stream task's id that points to it // it is an fill-history task, remove the related stream task's id that points to it
if (pTask->info.fillHistory == 1) {
streamTaskClearHTaskAttr(pTask);
} else {
atomic_sub_fetch_32(&pMeta->numOfStreamTasks, 1); atomic_sub_fetch_32(&pMeta->numOfStreamTasks, 1);
if (pTask->info.fillHistory == 1) {
streamTaskClearHTaskAttr(pTask, false);
} }
taosHashRemove(pMeta->pTasksMap, &id, sizeof(id)); taosHashRemove(pMeta->pTasksMap, &id, sizeof(id));
doRemoveIdFromList(pMeta, (int32_t)taosArrayGetSize(pMeta->pTaskList), &pTask->id); doRemoveIdFromList(pMeta, (int32_t)taosArrayGetSize(pMeta->pTaskList), &pTask->id);
streamMetaRemoveTask(pMeta, &id);
streamMetaWUnLock(pMeta);
ASSERT(pTask->status.timerActive == 0); ASSERT(pTask->status.timerActive == 0);
@ -742,13 +752,12 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t
streamMetaReleaseTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pTask);
} }
streamMetaRemoveTask(pMeta, &id);
streamMetaReleaseTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pTask);
} else { } else {
stDebug("vgId:%d failed to find the task:0x%x, it may have been dropped already", pMeta->vgId, taskId); stDebug("vgId:%d failed to find the task:0x%x, it may have been dropped already", pMeta->vgId, taskId);
streamMetaWUnLock(pMeta);
} }
streamMetaWUnLock(pMeta);
return 0; return 0;
} }
@ -862,7 +871,7 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) {
if (tDecodeStreamTask(&decoder, pTask) < 0) { if (tDecodeStreamTask(&decoder, pTask) < 0) {
tDecoderClear(&decoder); tDecoderClear(&decoder);
doClear(pKey, pVal, pCur, pRecycleList); doClear(pKey, pVal, pCur, pRecycleList);
tFreeStreamTask(pTask); tFreeStreamTask(pTask, false);
stError( stError(
"vgId:%d stream read incompatible data, rm %s/vnode/vnode*/tq/stream if taosd cannot start, and rebuild " "vgId:%d stream read incompatible data, rm %s/vnode/vnode*/tq/stream if taosd cannot start, and rebuild "
"stream manually", "stream manually",
@ -873,7 +882,7 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) {
if (pTask->status.taskStatus == TASK_STATUS__DROPPING) { if (pTask->status.taskStatus == TASK_STATUS__DROPPING) {
int32_t taskId = pTask->id.taskId; int32_t taskId = pTask->id.taskId;
tFreeStreamTask(pTask); tFreeStreamTask(pTask, false);
STaskId id = streamTaskGetTaskId(pTask); STaskId id = streamTaskGetTaskId(pTask);
taosArrayPush(pRecycleList, &id); taosArrayPush(pRecycleList, &id);
@ -889,7 +898,7 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) {
if (p == NULL) { if (p == NULL) {
if (pMeta->expandFunc(pMeta->ahandle, pTask, pTask->chkInfo.checkpointVer + 1) < 0) { if (pMeta->expandFunc(pMeta->ahandle, pTask, pTask->chkInfo.checkpointVer + 1) < 0) {
doClear(pKey, pVal, pCur, pRecycleList); doClear(pKey, pVal, pCur, pRecycleList);
tFreeStreamTask(pTask); tFreeStreamTask(pTask, false);
return -1; return -1;
} }
@ -903,7 +912,7 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) {
if (taosHashPut(pMeta->pTasksMap, &id, sizeof(id), &pTask, POINTER_BYTES) < 0) { if (taosHashPut(pMeta->pTasksMap, &id, sizeof(id), &pTask, POINTER_BYTES) < 0) {
doClear(pKey, pVal, pCur, pRecycleList); doClear(pKey, pVal, pCur, pRecycleList);
tFreeStreamTask(pTask); tFreeStreamTask(pTask, false);
return -1; return -1;
} }
@ -1269,11 +1278,11 @@ void streamMetaNotifyClose(SStreamMeta* pMeta) {
// wait for the stream meta hb function stopping // wait for the stream meta hb function stopping
if (pMeta->role == NODE_ROLE_LEADER) { if (pMeta->role == NODE_ROLE_LEADER) {
// pMeta->pHbInfo->stopFlag = STREAM_META_WILL_STOP; pMeta->pHbInfo->stopFlag = STREAM_META_WILL_STOP;
// while (pMeta->pHbInfo->stopFlag != STREAM_META_OK_TO_STOP) { while (pMeta->pHbInfo->stopFlag != STREAM_META_OK_TO_STOP) {
// taosMsleep(100); taosMsleep(100);
// stDebug("vgId:%d wait for meta to stop timer", pMeta->vgId); stDebug("vgId:%d wait for meta to stop timer", pMeta->vgId);
// } }
} }
stDebug("vgId:%d start to check all tasks", vgId); stDebug("vgId:%d start to check all tasks", vgId);
@ -1306,28 +1315,28 @@ void streamMetaResetStartInfo(STaskStartInfo* pStartInfo) {
} }
void streamMetaRLock(SStreamMeta* pMeta) { void streamMetaRLock(SStreamMeta* pMeta) {
// stTrace("vgId:%d meta-rlock", pMeta->vgId); stTrace("vgId:%d meta-rlock", pMeta->vgId);
taosThreadRwlockRdlock(&pMeta->lock); taosThreadRwlockRdlock(&pMeta->lock);
} }
void streamMetaRUnLock(SStreamMeta* pMeta) { void streamMetaRUnLock(SStreamMeta* pMeta) {
// stTrace("vgId:%d meta-runlock", pMeta->vgId); stTrace("vgId:%d meta-runlock", pMeta->vgId);
int32_t code = taosThreadRwlockUnlock(&pMeta->lock); int32_t code = taosThreadRwlockUnlock(&pMeta->lock);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
stError("vgId:%d meta-runlock failed, code:%d", pMeta->vgId, code); stError("vgId:%d meta-runlock failed, code:%d", pMeta->vgId, code);
} else { } else {
// stDebug("vgId:%d meta-runlock completed", pMeta->vgId); stDebug("vgId:%d meta-runlock completed", pMeta->vgId);
} }
} }
void streamMetaWLock(SStreamMeta* pMeta) { void streamMetaWLock(SStreamMeta* pMeta) {
// stTrace("vgId:%d meta-wlock", pMeta->vgId); stTrace("vgId:%d meta-wlock", pMeta->vgId);
taosThreadRwlockWrlock(&pMeta->lock); taosThreadRwlockWrlock(&pMeta->lock);
// stTrace("vgId:%d meta-wlock completed", pMeta->vgId); stTrace("vgId:%d meta-wlock completed", pMeta->vgId);
} }
void streamMetaWUnLock(SStreamMeta* pMeta) { void streamMetaWUnLock(SStreamMeta* pMeta) {
// stTrace("vgId:%d meta-wunlock", pMeta->vgId); stTrace("vgId:%d meta-wunlock", pMeta->vgId);
taosThreadRwlockUnlock(&pMeta->lock); taosThreadRwlockUnlock(&pMeta->lock);
} }

View File

@ -1054,6 +1054,24 @@ int32_t tDecodeStreamTaskCheckRsp(SDecoder* pDecoder, SStreamTaskCheckRsp* pRsp)
return 0; return 0;
} }
int32_t tEncodeStreamTaskCheckpointReq(SEncoder* pEncoder, const SStreamTaskCheckpointReq* pReq) {
if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->nodeId) < 0) return -1;
tEndEncode(pEncoder);
return 0;
}
int32_t tDecodeStreamTaskCheckpointReq(SDecoder* pDecoder, SStreamTaskCheckpointReq* pReq) {
if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->taskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->nodeId) < 0) return -1;
tEndDecode(pDecoder);
return 0;
}
int32_t tEncodeStreamScanHistoryFinishReq(SEncoder* pEncoder, const SStreamScanHistoryFinishReq* pReq) { int32_t tEncodeStreamScanHistoryFinishReq(SEncoder* pEncoder, const SStreamScanHistoryFinishReq* pReq) {
if (tStartEncode(pEncoder) < 0) return -1; if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1; if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;

View File

@ -340,11 +340,16 @@ int32_t tDecodeStreamTaskId(SDecoder* pDecoder, STaskId* pTaskId) {
return 0; return 0;
} }
void tFreeStreamTask(SStreamTask* pTask) { void tFreeStreamTask(SStreamTask* pTask, bool metaLock) {
char* p = NULL; char* p = NULL;
int32_t taskId = pTask->id.taskId; int32_t taskId = pTask->id.taskId;
STaskExecStatisInfo* pStatis = &pTask->execInfo; STaskExecStatisInfo* pStatis = &pTask->execInfo;
// check for mnode
// if (pTask->pMeta != NULL) {
// streamTaskClearHTaskAttr(pTask, metaLock);
// }
ETaskStatus status1 = TASK_STATUS__UNINIT; ETaskStatus status1 = TASK_STATUS__UNINIT;
taosThreadMutexLock(&pTask->lock); taosThreadMutexLock(&pTask->lock);
if (pTask->status.pSM != NULL) { if (pTask->status.pSM != NULL) {
@ -733,20 +738,30 @@ int8_t streamTaskSetSchedStatusInactive(SStreamTask* pTask) {
return status; return status;
} }
int32_t streamTaskClearHTaskAttr(SStreamTask* pTask) { int32_t streamTaskClearHTaskAttr(SStreamTask* pTask, bool metaLock) {
SStreamMeta* pMeta = pTask->pMeta; SStreamMeta* pMeta = pTask->pMeta;
STaskId sTaskId = {.streamId = pTask->streamTaskId.streamId, .taskId = pTask->streamTaskId.taskId};
if (pTask->info.fillHistory == 0) { if (pTask->info.fillHistory == 0) {
return TSDB_CODE_SUCCESS; return 0;
} }
STaskId sTaskId = {.streamId = pTask->streamTaskId.streamId, .taskId = pTask->streamTaskId.taskId}; if (metaLock) {
SStreamTask** ppStreamTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &sTaskId, sizeof(sTaskId)); streamMetaWLock(pTask->pMeta);
}
SStreamTask** ppStreamTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &sTaskId, sizeof(sTaskId));
if (ppStreamTask != NULL) { if (ppStreamTask != NULL) {
CLEAR_RELATED_FILLHISTORY_TASK((*ppStreamTask));
streamMetaSaveTask(pMeta, *ppStreamTask);
stDebug("s-task:%s clear the related stream task:0x%x attr to fill-history task", pTask->id.idStr, stDebug("s-task:%s clear the related stream task:0x%x attr to fill-history task", pTask->id.idStr,
(int32_t)sTaskId.taskId); (int32_t)sTaskId.taskId);
taosThreadMutexLock(&(*ppStreamTask)->lock);
CLEAR_RELATED_FILLHISTORY_TASK((*ppStreamTask));
streamMetaSaveTask(pMeta, *ppStreamTask);
taosThreadMutexUnlock(&(*ppStreamTask)->lock);
}
if (metaLock) {
streamMetaWUnLock(pTask->pMeta);
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -852,3 +867,41 @@ void streamTaskResume(SStreamTask* pTask) {
bool streamTaskIsSinkTask(const SStreamTask* pTask) { bool streamTaskIsSinkTask(const SStreamTask* pTask) {
return pTask->info.taskLevel == TASK_LEVEL__SINK; return pTask->info.taskLevel == TASK_LEVEL__SINK;
} }
int32_t streamTaskSendCheckpointReq(SStreamTask* pTask) {
int32_t code;
int32_t tlen = 0;
int32_t vgId = pTask->pMeta->vgId;
const char* id = pTask->id.idStr;
SStreamTaskCheckpointReq req = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId, .nodeId = vgId};
tEncodeSize(tEncodeStreamTaskCheckpointReq, &req, tlen, code);
if (code < 0) {
stError("s-task:%s vgId:%d encode stream task req checkpoint failed, code:%s", id, vgId, tstrerror(code));
return -1;
}
void* buf = rpcMallocCont(tlen);
if (buf == NULL) {
stError("s-task:%s vgId:%d encode stream task req checkpoint msg failed, code:%s", id, vgId,
tstrerror(TSDB_CODE_OUT_OF_MEMORY));
return -1;
}
SEncoder encoder;
tEncoderInit(&encoder, buf, tlen);
if ((code = tEncodeStreamTaskCheckpointReq(&encoder, &req)) < 0) {
rpcFreeCont(buf);
stError("s-task:%s vgId:%d encode stream task req checkpoint msg failed, code:%s", id, vgId, tstrerror(code));
return -1;
}
tEncoderClear(&encoder);
SRpcMsg msg = {.info.noResp = 1};
initRpcMsg(&msg, TDMT_MND_STREAM_REQ_CHKPT, buf, tlen);
stDebug("s-task:%s vgId:%d build and send task checkpoint req", id, vgId);
tmsgSendReq(&pTask->info.mnodeEpset, &msg);
return 0;
}

View File

@ -31,9 +31,13 @@ SStreamTaskState StreamTaskStatusList[9] = {
{.state = TASK_STATUS__HALT, .name = "halt"}, {.state = TASK_STATUS__HALT, .name = "halt"},
{.state = TASK_STATUS__PAUSE, .name = "paused"}, {.state = TASK_STATUS__PAUSE, .name = "paused"},
{.state = TASK_STATUS__CK, .name = "checkpoint"}, {.state = TASK_STATUS__CK, .name = "checkpoint"},
// {.state = TASK_STATUS__STREAM_SCAN_HISTORY, .name = "stream-scan-history"},
}; };
typedef struct SStreamEventInfo {
EStreamTaskEvent event;
const char* name;
} SStreamEventInfo;
SStreamEventInfo StreamTaskEventList[12] = { SStreamEventInfo StreamTaskEventList[12] = {
{.event = 0, .name = ""}, // dummy event, place holder {.event = 0, .name = ""}, // dummy event, place holder
{.event = TASK_EVENT_INIT, .name = "initialize"}, {.event = TASK_EVENT_INIT, .name = "initialize"},
@ -94,7 +98,9 @@ int32_t streamTaskSendTransSuccessMsg(SStreamTask* pTask) {
} }
int32_t streamTaskKeepCurrentVerInWal(SStreamTask* pTask) { int32_t streamTaskKeepCurrentVerInWal(SStreamTask* pTask) {
ASSERT(HAS_RELATED_FILLHISTORY_TASK(pTask)); if (!HAS_RELATED_FILLHISTORY_TASK(pTask)) {
stError("s-task:%s no related fill-history task, since it may have been dropped already", pTask->id.idStr);
}
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
pTask->hTaskInfo.haltVer = walReaderGetCurrentVer(pTask->exec.pWalReader); pTask->hTaskInfo.haltVer = walReaderGetCurrentVer(pTask->exec.pWalReader);
@ -402,6 +408,10 @@ SStreamTaskState* streamTaskGetStatus(const SStreamTask* pTask) {
return &pTask->status.pSM->current; // copy one obj in case of multi-thread environment return &pTask->status.pSM->current; // copy one obj in case of multi-thread environment
} }
ETaskStatus streamTaskGetPrevStatus(const SStreamTask* pTask) {
return pTask->status.pSM->prev.state.state;
}
const char* streamTaskGetStatusStr(ETaskStatus status) { const char* streamTaskGetStatusStr(ETaskStatus status) {
return StreamTaskStatusList[status].name; return StreamTaskStatusList[status].name;
} }
@ -497,6 +507,8 @@ void doInitStateTransferTable(void) {
// checkpoint related event // checkpoint related event
trans = createStateTransform(TASK_STATUS__READY, TASK_STATUS__CK, TASK_EVENT_GEN_CHECKPOINT, NULL, streamTaskDoCheckpoint, NULL, true); trans = createStateTransform(TASK_STATUS__READY, TASK_STATUS__CK, TASK_EVENT_GEN_CHECKPOINT, NULL, streamTaskDoCheckpoint, NULL, true);
taosArrayPush(streamTaskSMTrans, &trans); taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS__HALT, TASK_STATUS__CK, TASK_EVENT_GEN_CHECKPOINT, NULL, streamTaskDoCheckpoint, NULL, true);
taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__READY, TASK_EVENT_CHECKPOINT_DONE, NULL, NULL, NULL, true); trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__READY, TASK_EVENT_CHECKPOINT_DONE, NULL, NULL, NULL, true);
taosArrayPush(streamTaskSMTrans, &trans); taosArrayPush(streamTaskSMTrans, &trans);

View File

@ -305,7 +305,7 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver) {
} }
int32_t walSkipFetchBody(SWalReader *pRead) { int32_t walSkipFetchBody(SWalReader *pRead) {
wDebug("vgId:%d, skip fetch body:%" PRId64 ", first:%" PRId64 ", commit:%" PRId64 ", last:%" PRId64 wDebug("vgId:%d, skip:%" PRId64 ", first:%" PRId64 ", commit:%" PRId64 ", last:%" PRId64
", applied:%" PRId64 ", 0x%" PRIx64, ", applied:%" PRId64 ", 0x%" PRIx64,
pRead->pWal->cfg.vgId, pRead->pHead->head.version, pRead->pWal->vers.firstVer, pRead->pWal->vers.commitVer, pRead->pWal->cfg.vgId, pRead->pHead->head.version, pRead->pWal->vers.firstVer, pRead->pWal->vers.commitVer,
pRead->pWal->vers.lastVer, pRead->pWal->vers.appliedVer, pRead->readerId); pRead->pWal->vers.lastVer, pRead->pWal->vers.appliedVer, pRead->readerId);

View File

@ -18,6 +18,7 @@ sql use test;
sql create table t1(ts timestamp, a int, b int , c int, d double); sql create table t1(ts timestamp, a int, b int , c int, d double);
sql create stream stream1 trigger at_once fill_history 1 IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt as select _wstart, count(*) c1, count(d) c2 , sum(a) c3 , max(b) c4, min(c) c5 from t1 interval(10s); sql create stream stream1 trigger at_once fill_history 1 IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt as select _wstart, count(*) c1, count(d) c2 , sum(a) c3 , max(b) c4, min(c) c5 from t1 interval(10s);
sleep 1000
sql insert into t1 values(1648791213000,1,2,3,1.0); sql insert into t1 values(1648791213000,1,2,3,1.0);
sql insert into t1 values(1648791223001,2,2,3,1.1); sql insert into t1 values(1648791223001,2,2,3,1.1);
@ -224,53 +225,53 @@ endi
# row 2 # row 2
if $data21 != 1 then if $data21 != 1 then
print ======$data21 print ======$data21, expect 1
goto loop01 goto loop01
endi endi
if $data22 != 1 then if $data22 != 1 then
print ======$data22 print ======$data22 , expect 1
goto loop01 goto loop01
endi endi
if $data23 != 3 then if $data23 != 3 then
print ======$data23 print ======$data23 , expect 3
goto loop01 goto loop01
endi endi
if $data24 != 2 then if $data24 != 2 then
print ======$data24 print ======$data24 , expect 2
goto loop01 goto loop01
endi endi
if $data25 != 3 then if $data25 != 3 then
print ======$data25 print ======$data25 , expect 3
goto loop01 goto loop01
endi endi
# row 3 # row 3
if $data31 != 1 then if $data31 != 1 then
print ======$data31 print ======$data31 , expect 1
goto loop01 goto loop01
endi endi
if $data32 != 1 then if $data32 != 1 then
print ======$data32 print ======$data32 , expect 1
goto loop01 goto loop01
endi endi
if $data33 != 4 then if $data33 != 4 then
print ======$data33 print ======$data33 , expect 4
goto loop01 goto loop01
endi endi
if $data34 != 2 then if $data34 != 2 then
print ======$data34 print ======$data34 , expect 2
goto loop01 goto loop01
endi endi
if $data35 != 3 then if $data35 != 3 then
print ======$data35 print ======$data35 , expect 3
goto loop01 goto loop01
endi endi

View File

@ -24,6 +24,7 @@ from util.dnodes import tdDnodes
from util.dnodes import * from util.dnodes import *
class TDTestCase: class TDTestCase:
updatecfgDict = {'debugflag':0,'stdebugFlag': 143 ,"tqDebugflag":135}
def init(self, conn, logSql, replicaVar): def init(self, conn, logSql, replicaVar):
tdLog.debug("start to execute %s" % __file__) tdLog.debug("start to execute %s" % __file__)

View File

@ -6,8 +6,8 @@ from util.cases import *
from util.common import * from util.common import *
class TDTestCase: class TDTestCase:
updatecfgDict = {'vdebugFlag': 143, 'qdebugflag':135, 'tqdebugflag':135, 'udebugflag':135, 'rpcdebugflag':135, updatecfgDict = {'debugFlag':0, 'vdebugFlag': 143, 'qdebugflag':135, 'tqdebugflag':135, 'udebugflag':135, 'rpcdebugflag':135,
'asynclog': 0, 'stdebugflag':135} 'asynclog': 0, 'stdebugflag':143}
def init(self, conn, logSql, replicaVar=1): def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar) self.replicaVar = int(replicaVar)
tdLog.debug("start to execute %s" % __file__) tdLog.debug("start to execute %s" % __file__)