Merge remote-tracking branch 'origin/enh/triggerCheckPoint2' into enh/triggerCheckPoint2

This commit is contained in:
Haojun Liao 2023-07-14 18:35:53 +08:00
commit 6798e31bb5
5 changed files with 230 additions and 184 deletions

View File

@ -47,7 +47,7 @@ enum {
TASK_STATUS__SCAN_HISTORY, // stream task scan history data by using tsdbread in the stream scanner
TASK_STATUS__HALT, // stream task will handle all data in the input queue, and then paused
TASK_STATUS__PAUSE,
TASK_STATUS__CK, // stream task is in checkpoint status, no data are allowed to put into inputQ anymore
TASK_STATUS__CK, // stream task is in checkpoint status, no data are allowed to put into inputQ anymore
TASK_STATUS__CK_READY,
};
@ -103,7 +103,7 @@ typedef struct {
} SStreamQueueItem;
typedef void FTbSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data);
typedef int32_t FTaskExpand(void* ahandle, SStreamTask* pTask, int64_t ver);
typedef int32_t FTaskExpand(void* ahandle, SStreamTask* pTask, int64_t ver, int64_t checkpointId);
typedef struct {
int8_t type;
@ -120,7 +120,7 @@ typedef struct {
} SStreamMergedSubmit;
typedef struct {
int8_t type;
int8_t type;
int32_t srcVgId;
int32_t srcTaskId;
@ -249,7 +249,7 @@ typedef struct SStreamChildEpInfo {
int32_t childId;
int32_t taskId;
SEpSet epSet;
bool dataAllowed; // denote if the data from this upstream task is allowed to put into inputQ, not serialize it
bool dataAllowed; // denote if the data from this upstream task is allowed to put into inputQ, not serialize it
} SStreamChildEpInfo;
typedef struct SStreamId {
@ -260,17 +260,17 @@ typedef struct SStreamId {
typedef struct SCheckpointInfo {
int64_t checkpointId;
int64_t checkpointVer; // latest checkpointId version
int64_t currentVer; // current offset in WAL, not serialize it
int64_t checkpointVer; // latest checkpointId version
int64_t currentVer; // current offset in WAL, not serialize it
} SCheckpointInfo;
typedef struct SStreamStatus {
int8_t taskStatus;
int8_t downstreamReady; // downstream tasks are all ready now, if this flag is set
int8_t schedStatus;
int8_t keepTaskStatus;
bool transferState;
int8_t timerActive; // timer is active
int8_t taskStatus;
int8_t downstreamReady; // downstream tasks are all ready now, if this flag is set
int8_t schedStatus;
int8_t keepTaskStatus;
bool transferState;
int8_t timerActive; // timer is active
} SStreamStatus;
typedef struct SHistDataRange {
@ -311,8 +311,8 @@ struct SStreamTask {
SHistDataRange dataRange;
SStreamId historyTaskId;
SStreamId streamTaskId;
SArray* pUpstreamInfoList; // SArray<SStreamChildEpInfo*>, // children info
SArray* pReadyMsgList; // SArray<SStreamChkptReadyInfo*>
SArray* pUpstreamInfoList; // SArray<SStreamChildEpInfo*>, // children info
SArray* pReadyMsgList; // SArray<SStreamChkptReadyInfo*>
// output
union {
@ -533,7 +533,7 @@ int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq);
int32_t tDecodeStreamRetrieveReq(SDecoder* pDecoder, SStreamRetrieveReq* pReq);
void tDeleteStreamRetrieveReq(SStreamRetrieveReq* pReq);
void tDeleteStreamDispatchReq(SStreamDispatchReq* pReq);
void tDeleteStreamDispatchReq(SStreamDispatchReq* pReq);
int32_t streamSetupScheduleTrigger(SStreamTask* pTask);
@ -541,10 +541,10 @@ int32_t streamProcessRunReq(SStreamTask* pTask);
int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pMsg, bool exec);
int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, int32_t code);
int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pMsg);
void streamTaskOpenAllUpstreamInput(SStreamTask* pTask);
void streamTaskCloseUpstreamInput(SStreamTask* pTask, int32_t taskId);
SStreamChildEpInfo * streamTaskGetUpstreamTaskEpInfo(SStreamTask* pTask, int32_t taskId);
int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pMsg);
void streamTaskOpenAllUpstreamInput(SStreamTask* pTask);
void streamTaskCloseUpstreamInput(SStreamTask* pTask, int32_t taskId);
SStreamChildEpInfo* streamTaskGetUpstreamTaskEpInfo(SStreamTask* pTask, int32_t taskId);
void streamTaskInputFail(SStreamTask* pTask);
int32_t streamTryExec(SStreamTask* pTask);
@ -556,7 +556,7 @@ bool streamTaskIsIdle(const SStreamTask* pTask);
int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz);
char* createStreamTaskIdStr(int64_t streamId, int32_t taskId);
char* createStreamTaskIdStr(int64_t streamId, int32_t taskId);
// recover and fill history
void streamPrepareNdoCheckDownstream(SStreamTask* pTask);
@ -600,7 +600,7 @@ void streamMetaClose(SStreamMeta* streamMeta);
int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask);
int32_t streamMetaAddDeployedTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask);
int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, int64_t checkpointVer, char* msg, int32_t msgLen);
int32_t streamMetaGetNumOfTasks(const SStreamMeta* pMeta); // todo remove it
int32_t streamMetaGetNumOfTasks(const SStreamMeta* pMeta); // todo remove it
SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int32_t taskId);
void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask);
void streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId);
@ -616,8 +616,8 @@ int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask);
int32_t streamTaskReleaseState(SStreamTask* pTask);
int32_t streamTaskReloadState(SStreamTask* pTask);
int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo,
SStreamTask* pTask);
int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SStreamTask* pTask);
int32_t streamAddCheckpointReadyMsg(SStreamTask* pTask, int32_t srcTaskId, int32_t index, int64_t checkpointId);
#ifdef __cplusplus
}

View File

@ -836,48 +836,48 @@ _OVER:
return code;
}
static int32_t mndCreateCheckpoint(SMnode *pMnode, int32_t vgId, SList *pStreamList) {
void *buf = NULL;
int32_t tlen = 0;
int32_t checkpointId = tGenIdPI64();
// static int32_t mndCreateCheckpoint(SMnode *pMnode, int32_t vgId, SList *pStreamList) {
// void *buf = NULL;
// int32_t tlen = 0;
// int32_t checkpointId = tGenIdPI64();
SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId);
SArray *stream = taosArrayInit(64, sizeof(void *));
// SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId);
// SArray *stream = taosArrayInit(64, sizeof(void *));
SListIter iter = {0};
tdListInitIter(pStreamList, &iter, TD_LIST_FORWARD);
SListNode *pNode = NULL;
while ((pNode = tdListNext(&iter)) != NULL) {
char streamName[TSDB_STREAM_FNAME_LEN] = {0};
tdListNodeGetData(pStreamList, pNode, streamName);
SStreamObj *pStream = mndAcquireStream(pMnode, streamName);
taosArrayPush(stream, &pStream);
}
// SListIter iter = {0};
// tdListInitIter(pStreamList, &iter, TD_LIST_FORWARD);
// SListNode *pNode = NULL;
// while ((pNode = tdListNext(&iter)) != NULL) {
// char streamName[TSDB_STREAM_FNAME_LEN] = {0};
// tdListNodeGetData(pStreamList, pNode, streamName);
// SStreamObj *pStream = mndAcquireStream(pMnode, streamName);
// taosArrayPush(stream, &pStream);
// }
if (mndBuildStreamCheckpointSourceReq2(&buf, &tlen, vgId, checkpointId, 0, 0) < 0) {
mndReleaseVgroup(pMnode, pVgObj);
for (int i = 0; i < taosArrayGetSize(stream); i++) {
SStreamObj *p = taosArrayGetP(stream, i);
mndReleaseStream(pMnode, p);
}
taosArrayDestroy(stream);
return -1;
// if (mndBuildStreamCheckpointSourceReq2(&buf, &tlen, vgId, checkpointId, 0, 0) < 0) {
// mndReleaseVgroup(pMnode, pVgObj);
// for (int i = 0; i < taosArrayGetSize(stream); i++) {
// SStreamObj *p = taosArrayGetP(stream, i);
// mndReleaseStream(pMnode, p);
// }
// taosArrayDestroy(stream);
// return -1;
STransAction action = {0};
action.epSet = mndGetVgroupEpset(pMnode, pVgObj);
action.pCont = buf;
action.contLen = tlen;
action.msgType = TDMT_VND_STREAM_CHECK_POINT_SOURCE;
}
mndReleaseVgroup(pMnode, pVgObj);
// STransAction action = {0};
// action.epSet = mndGetVgroupEpset(pMnode, pVgObj);
// action.pCont = buf;
// action.contLen = tlen;
// action.msgType = TDMT_VND_STREAM_CHECK_POINT_SOURCE;
// }
// mndReleaseVgroup(pMnode, pVgObj);
for (int i = 0; i < taosArrayGetSize(stream); i++) {
SStreamObj *p = taosArrayGetP(stream, i);
mndReleaseStream(pMnode, p);
}
taosArrayDestroy(stream);
return 0;
}
// for (int i = 0; i < taosArrayGetSize(stream); i++) {
// SStreamObj *p = taosArrayGetP(stream, i);
// mndReleaseStream(pMnode, p);
// }
// taosArrayDestroy(stream);
// return 0;
// }
static int32_t mndProcessStreamCheckpointTmr(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
SSdb *pSdb = pMnode->pSdb;
@ -979,106 +979,107 @@ static int32_t mndBuildStreamCheckpointSourceReq2(void **pBuf, int32_t *pLen, in
return 0;
}
static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStream, int64_t checkpointId) {
int64_t timestampMs = taosGetTimestampMs();
if (timestampMs - pStream->checkpointFreq < tsStreamCheckpointTickInterval * 1000) {
return -1;
}
// static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStream, int64_t checkpointId) {
// int64_t timestampMs = taosGetTimestampMs();
// if (timestampMs - pStream->checkpointFreq < tsStreamCheckpointTickInterval * 1000) {
// return -1;
// }
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, NULL, "stream-checkpoint");
if (pTrans == NULL) return -1;
mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetDb);
if (mndTrancCheckConflict(pMnode, pTrans) != 0) {
mError("failed to checkpoint of stream name%s, checkpointId: %" PRId64 ", reason:%s", pStream->name, checkpointId,
tstrerror(TSDB_CODE_MND_TRANS_CONFLICT));
mndTransDrop(pTrans);
return -1;
}
mDebug("start to trigger checkpoint for stream:%s, checkpoint: %" PRId64 "", pStream->name, checkpointId);
atomic_store_64(&pStream->currentTick, 1);
taosWLockLatch(&pStream->lock);
// 1. redo action: broadcast checkpoint source msg for all source vg
int32_t totLevel = taosArrayGetSize(pStream->tasks);
for (int32_t i = 0; i < totLevel; i++) {
SArray *pLevel = taosArrayGetP(pStream->tasks, i);
SStreamTask *pTask = taosArrayGetP(pLevel, 0);
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
int32_t sz = taosArrayGetSize(pLevel);
for (int32_t j = 0; j < sz; j++) {
SStreamTask *pTask = taosArrayGetP(pLevel, j);
/*A(pTask->info.nodeId > 0);*/
SVgObj *pVgObj = mndAcquireVgroup(pMnode, pTask->info.nodeId);
if (pVgObj == NULL) {
taosWUnLockLatch(&pStream->lock);
mndTransDrop(pTrans);
return -1;
}
// STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, NULL, "stream-checkpoint");
// if (pTrans == NULL) return -1;
// mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetDb);
// if (mndTrancCheckConflict(pMnode, pTrans) != 0) {
// mError("failed to checkpoint of stream name%s, checkpointId: %" PRId64 ", reason:%s", pStream->name,
// checkpointId,
// tstrerror(TSDB_CODE_MND_TRANS_CONFLICT));
// mndTransDrop(pTrans);
// return -1;
// }
// mDebug("start to trigger checkpoint for stream:%s, checkpoint: %" PRId64 "", pStream->name, checkpointId);
// atomic_store_64(&pStream->currentTick, 1);
// taosWLockLatch(&pStream->lock);
// // 1. redo action: broadcast checkpoint source msg for all source vg
// int32_t totLevel = taosArrayGetSize(pStream->tasks);
// for (int32_t i = 0; i < totLevel; i++) {
// SArray *pLevel = taosArrayGetP(pStream->tasks, i);
// SStreamTask *pTask = taosArrayGetP(pLevel, 0);
// if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
// int32_t sz = taosArrayGetSize(pLevel);
// for (int32_t j = 0; j < sz; j++) {
// SStreamTask *pTask = taosArrayGetP(pLevel, j);
// /*A(pTask->info.nodeId > 0);*/
// SVgObj *pVgObj = mndAcquireVgroup(pMnode, pTask->info.nodeId);
// if (pVgObj == NULL) {
// taosWUnLockLatch(&pStream->lock);
// mndTransDrop(pTrans);
// return -1;
// }
void *buf;
int32_t tlen;
if (mndBuildStreamCheckpointSourceReq2(&buf, &tlen, pTask->info.nodeId, checkpointId, pTask->id.streamId,
pTask->id.taskId) < 0) {
mndReleaseVgroup(pMnode, pVgObj);
taosWUnLockLatch(&pStream->lock);
mndTransDrop(pTrans);
return -1;
}
// void *buf;
// int32_t tlen;
// if (mndBuildStreamCheckpointSourceReq2(&buf, &tlen, pTask->info.nodeId, checkpointId, pTask->id.streamId,
// pTask->id.taskId) < 0) {
// mndReleaseVgroup(pMnode, pVgObj);
// taosWUnLockLatch(&pStream->lock);
// mndTransDrop(pTrans);
// return -1;
// }
STransAction action = {0};
action.epSet = mndGetVgroupEpset(pMnode, pVgObj);
action.pCont = buf;
action.contLen = tlen;
action.msgType = TDMT_VND_STREAM_CHECK_POINT_SOURCE;
// STransAction action = {0};
// action.epSet = mndGetVgroupEpset(pMnode, pVgObj);
// action.pCont = buf;
// action.contLen = tlen;
// action.msgType = TDMT_VND_STREAM_CHECK_POINT_SOURCE;
mndReleaseVgroup(pMnode, pVgObj);
// mndReleaseVgroup(pMnode, pVgObj);
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
taosMemoryFree(buf);
taosWUnLockLatch(&pStream->lock);
mndReleaseStream(pMnode, pStream);
mndTransDrop(pTrans);
return -1;
}
}
}
}
// 2. reset tick
pStream->checkpointFreq = checkpointId;
pStream->checkpointId = checkpointId;
pStream->checkpointFreq = taosGetTimestampMs();
atomic_store_64(&pStream->currentTick, 0);
// 3. commit log: stream checkpoint info
pStream->version = pStream->version + 1;
taosWUnLockLatch(&pStream->lock);
// if (mndTransAppendRedoAction(pTrans, &action) != 0) {
// taosMemoryFree(buf);
// taosWUnLockLatch(&pStream->lock);
// mndReleaseStream(pMnode, pStream);
// mndTransDrop(pTrans);
// return -1;
// }
// }
// }
// }
// // 2. reset tick
// pStream->checkpointFreq = checkpointId;
// pStream->checkpointId = checkpointId;
// pStream->checkpointFreq = taosGetTimestampMs();
// atomic_store_64(&pStream->currentTick, 0);
// // 3. commit log: stream checkpoint info
// pStream->version = pStream->version + 1;
// taosWUnLockLatch(&pStream->lock);
// // code condtion
// // // code condtion
SSdbRaw *pCommitRaw = mndStreamActionEncode(pStream);
if (pCommitRaw == NULL) {
mError("failed to prepare trans rebalance since %s", terrstr());
goto _ERR;
}
if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
sdbFreeRaw(pCommitRaw);
mError("failed to prepare trans rebalance since %s", terrstr());
goto _ERR;
}
if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY) != 0) {
sdbFreeRaw(pCommitRaw);
mError("failed to prepare trans rebalance since %s", terrstr());
goto _ERR;
}
// SSdbRaw *pCommitRaw = mndStreamActionEncode(pStream);
// if (pCommitRaw == NULL) {
// mError("failed to prepare trans rebalance since %s", terrstr());
// goto _ERR;
// }
// if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
// sdbFreeRaw(pCommitRaw);
// mError("failed to prepare trans rebalance since %s", terrstr());
// goto _ERR;
// }
// if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY) != 0) {
// sdbFreeRaw(pCommitRaw);
// mError("failed to prepare trans rebalance since %s", terrstr());
// goto _ERR;
// }
if (mndTransPrepare(pMnode, pTrans) != 0) {
mError("failed to prepare trans rebalance since %s", terrstr());
goto _ERR;
}
mndTransDrop(pTrans);
return 0;
_ERR:
mndTransDrop(pTrans);
return -1;
}
// if (mndTransPrepare(pMnode, pTrans) != 0) {
// mError("failed to prepare trans rebalance since %s", terrstr());
// goto _ERR;
// }
// mndTransDrop(pTrans);
// return 0;
// _ERR:
// mndTransDrop(pTrans);
// return -1;
// }
static int32_t mndAddStreamCheckpointToTrans(STrans *pTrans, SStreamObj *pStream, SMnode *pMnode,
int64_t checkpointId) {

View File

@ -168,7 +168,7 @@ char* tqOffsetBuildFName(const char* path, int32_t fVer);
int32_t tqOffsetRestoreFromFile(STqOffsetStore* pStore, const char* fname);
// tqStream
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver);
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver, int64_t checkpointId);
int32_t tqStreamTasksScanWal(STQ* pTq);
int32_t tqStreamTasksStatusCheck(STQ* pTq);

View File

@ -162,7 +162,7 @@ static bool hasStreamTaskInTimer(SStreamMeta* pMeta) {
taosWLockLatch(&pMeta->lock);
void* pIter = NULL;
while(1) {
while (1) {
pIter = taosHashIterate(pMeta->pTasks, pIter);
if (pIter == NULL) {
break;
@ -207,13 +207,14 @@ void tqNotifyClose(STQ* pTq) {
int64_t st = taosGetTimestampMs();
while(hasStreamTaskInTimer(pTq->pStreamMeta)) {
while (hasStreamTaskInTimer(pTq->pStreamMeta)) {
tqDebug("vgId:%d some tasks in timer, wait for 100ms and recheck", pTq->pStreamMeta->vgId);
taosMsleep(100);
}
int64_t el = taosGetTimestampMs() - st;
tqDebug("vgId:%d all stream tasks are not in timer, continue close, elapsed time:%"PRId64" ms", pTq->pStreamMeta->vgId, el);
tqDebug("vgId:%d all stream tasks are not in timer, continue close, elapsed time:%" PRId64 " ms",
pTq->pStreamMeta->vgId, el);
}
}
@ -249,8 +250,8 @@ int32_t tqSendDataRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq*
tFormatOffset(buf1, TSDB_OFFSET_LEN, &pRsp->reqOffset);
tFormatOffset(buf2, TSDB_OFFSET_LEN, &pRsp->rspOffset);
tqDebug("tmq poll vgId:%d consumer:0x%" PRIx64 " (epoch %d) send rsp, block num:%d, req:%s, rsp:%s, reqId:0x%" PRIx64, vgId,
pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2, pReq->reqId);
tqDebug("tmq poll vgId:%d consumer:0x%" PRIx64 " (epoch %d) send rsp, block num:%d, req:%s, rsp:%s, reqId:0x%" PRIx64,
vgId, pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2, pReq->reqId);
return 0;
}
@ -419,8 +420,11 @@ int32_t tqProcessPollPush(STQ* pTq, SRpcMsg* pMsg) {
if (ASSERT(pHandle->msg != NULL)) {
tqError("pHandle->msg should not be null");
break;
}else{
SRpcMsg msg = {.msgType = TDMT_VND_TMQ_CONSUME, .pCont = pHandle->msg->pCont, .contLen = pHandle->msg->contLen, .info = pHandle->msg->info};
} else {
SRpcMsg msg = {.msgType = TDMT_VND_TMQ_CONSUME,
.pCont = pHandle->msg->pCont,
.contLen = pHandle->msg->contLen,
.info = pHandle->msg->info};
tmsgPutToQueue(&pTq->pVnode->msgCb, QUERY_QUEUE, &msg);
taosMemoryFree(pHandle->msg);
pHandle->msg = NULL;
@ -679,9 +683,9 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
req.oldConsumerId, req.newConsumerId);
STqHandle* pHandle = NULL;
while(1){
while (1) {
pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
if (pHandle || tqMetaGetHandle(pTq, req.subKey) < 0){
if (pHandle || tqMetaGetHandle(pTq, req.subKey) < 0) {
break;
}
}
@ -697,7 +701,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
}
STqHandle handle = {0};
ret = tqCreateHandle(pTq, &req, &handle);
if(ret < 0){
if (ret < 0) {
tqDestroyTqHandle(&handle);
goto end;
}
@ -739,7 +743,7 @@ end:
void freePtr(void* ptr) { taosMemoryFree(*(void**)ptr); }
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver, int64_t checkpointId) {
int32_t vgId = TD_VID(pTq->pVnode);
pTask->id.idStr = createStreamTaskIdStr(pTask->id.streamId, pTask->id.taskId);
@ -758,16 +762,16 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
pTask->pMeta = pTq->pStreamMeta;
// checkpoint exists, restore from the last checkpoint
if (pTask->chkInfo.checkpointId != 0) {
ASSERT(pTask->chkInfo.checkpointVer > 0);
pTask->chkInfo.currentVer = pTask->chkInfo.checkpointVer;
pTask->dataRange.range.maxVer = pTask->chkInfo.checkpointVer;
pTask->dataRange.range.minVer = pTask->chkInfo.checkpointVer;
} else {
pTask->chkInfo.currentVer = ver;
pTask->dataRange.range.maxVer = ver;
pTask->dataRange.range.minVer = ver;
}
// if (pTask->chkInfo.checkpointId != 0) {
// ASSERT(pTask->chkInfo.checkpointVer > 0);
// pTask->chkInfo.currentVer = pTask->chkInfo.checkpointVer;
// pTask->dataRange.range.maxVer = pTask->chkInfo.checkpointVer;
// pTask->dataRange.range.minVer = pTask->chkInfo.checkpointVer;
// } else {
pTask->chkInfo.currentVer = ver;
pTask->dataRange.range.maxVer = ver;
pTask->dataRange.range.minVer = ver;
//}
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
SStreamTask* pSateTask = pTask;
@ -915,7 +919,8 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) {
streamGetTaskStatusStr(pTask->status.taskStatus), rsp.status);
} else {
rsp.status = 0;
tqDebug("tq recv task check(taskId:0x%x not built yet) req(reqId:0x%" PRIx64 ") from task:0x%x (vgId:%d), rsp status %d",
tqDebug("tq recv task check(taskId:0x%x not built yet) req(reqId:0x%" PRIx64
") from task:0x%x (vgId:%d), rsp status %d",
taskId, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);
}
@ -1092,15 +1097,16 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
// now we can stop the stream task execution
pStreamTask->status.taskStatus = TASK_STATUS__HALT;
tqDebug("s-task:%s level:%d status is set to halt by history scan task:%s", pId,
pStreamTask->info.taskLevel, pId);
tqDebug("s-task:%s level:%d status is set to halt by history scan task:%s", pId, pStreamTask->info.taskLevel,
pId);
// if it's an source task, extract the last version in wal.
streamHistoryTaskSetVerRangeStep2(pTask);
}
if (!streamTaskRecoverScanStep1Finished(pTask)) {
tqDebug("s-task:%s level:%d verRange:%" PRId64 " - %" PRId64 " do secondary scan-history-data after halt the related stream task:%s",
tqDebug("s-task:%s level:%d verRange:%" PRId64 " - %" PRId64
" do secondary scan-history-data after halt the related stream task:%s",
pId, pTask->info.taskLevel, pRange->minVer, pRange->maxVer, pId);
ASSERT(pTask->status.schedStatus == TASK_SCHED_STATUS__WAITING);
@ -1356,7 +1362,7 @@ int32_t tqProcessTaskPauseImpl(SStreamMeta* pStreamMeta, SStreamTask* pTask) {
int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
SVPauseStreamTaskReq* pReq = (SVPauseStreamTaskReq*)msg;
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId);
int32_t code = tqProcessTaskPauseImpl(pTq->pStreamMeta, pTask);
int32_t code = tqProcessTaskPauseImpl(pTq->pStreamMeta, pTask);
if (code != 0) {
return code;
}
@ -1403,8 +1409,8 @@ int32_t tqProcessTaskResumeImpl(STQ* pTq, SStreamTask* pTask, int64_t sversion,
int32_t tqProcessTaskResumeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
SVResumeStreamTaskReq* pReq = (SVResumeStreamTaskReq*)msg;
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId);
int32_t code = tqProcessTaskResumeImpl(pTq, pTask, sversion, pReq->igUntreated);
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId);
int32_t code = tqProcessTaskResumeImpl(pTq, pTask, sversion, pReq->igUntreated);
if (code != 0) {
return code;
}

View File

@ -241,9 +241,11 @@ int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask) {
// add to the ready tasks hash map, not the restored tasks hash map
int32_t streamMetaAddDeployedTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask) {
int64_t checkpointId = 0;
void* p = taosHashGet(pMeta->pTasks, &pTask->id.taskId, sizeof(pTask->id.taskId));
if (p == NULL) {
if (pMeta->expandFunc(pMeta->ahandle, pTask, ver) < 0) {
if (pMeta->expandFunc(pMeta->ahandle, pTask, ver, checkpointId) < 0) {
tFreeStreamTask(pTask);
return -1;
}
@ -404,7 +406,45 @@ int32_t streamMetaAbort(SStreamMeta* pMeta) {
return 0;
}
int64_t streamGetLatestCheckpointId(SStreamMeta* pMeta) {
int64_t chkpId = 0;
TBC* pCur = NULL;
if (tdbTbcOpen(pMeta->pTaskDb, &pCur, NULL) < 0) {
return chkpId;
}
void* pKey = NULL;
int32_t kLen = 0;
void* pVal = NULL;
int32_t vLen = 0;
SDecoder decoder;
tdbTbcMoveToFirst(pCur);
while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) {
SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
if (pTask == NULL) {
goto _err;
}
tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
tDecodeStreamTask(&decoder, pTask);
tDecoderClear(&decoder);
chkpId = TMAX(chkpId, pTask->chkInfo.checkpointId);
taosMemoryFree(pTask); // fix mem leak later
}
_err:
tdbFree(pKey);
tdbFree(pVal);
tdbTbcClose(pCur);
return chkpId;
}
int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) {
int64_t checkpointId = streamGetLatestCheckpointId(pMeta);
TBC* pCur = NULL;
if (tdbTbcOpen(pMeta->pTaskDb, &pCur, NULL) < 0) {
return -1;
@ -417,7 +457,6 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) {
SDecoder decoder;
tdbTbcMoveToFirst(pCur);
while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) {
SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
if (pTask == NULL) {
@ -434,7 +473,7 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) {
// remove duplicate
void* p = taosHashGet(pMeta->pTasks, &pTask->id.taskId, sizeof(pTask->id.taskId));
if (p == NULL) {
if (pMeta->expandFunc(pMeta->ahandle, pTask, pTask->chkInfo.checkpointVer) < 0) {
if (pMeta->expandFunc(pMeta->ahandle, pTask, pTask->chkInfo.checkpointVer, checkpointId) < 0) {
tdbFree(pKey);
tdbFree(pVal);
tdbTbcClose(pCur);