fix(stream): set checkpoint id when expanding task.
This commit is contained in:
parent
773ae39d33
commit
2f90a53548
|
@ -41,13 +41,12 @@ typedef struct {
|
||||||
} SLocalFetch;
|
} SLocalFetch;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
void* tqReader;
|
void* tqReader; // todo remove it
|
||||||
void* config;
|
|
||||||
void* vnode;
|
void* vnode;
|
||||||
void* mnd;
|
void* mnd;
|
||||||
SMsgCb* pMsgCb;
|
SMsgCb* pMsgCb;
|
||||||
int64_t version;
|
int64_t version;
|
||||||
bool initMetaReader;
|
uint64_t checkpointId;
|
||||||
bool initTableReader;
|
bool initTableReader;
|
||||||
bool initTqReader;
|
bool initTqReader;
|
||||||
int32_t numOfVgroups;
|
int32_t numOfVgroups;
|
||||||
|
|
|
@ -258,9 +258,9 @@ typedef struct SStreamId {
|
||||||
} SStreamId;
|
} SStreamId;
|
||||||
|
|
||||||
typedef struct SCheckpointInfo {
|
typedef struct SCheckpointInfo {
|
||||||
int64_t keptCheckpointId;
|
int64_t checkpointId;
|
||||||
int64_t version; // latest checkpointId version
|
int64_t checkpointVer; // latest checkpointId version
|
||||||
int64_t currentVer; // current offset in WAL, not serialize it
|
int64_t currentVer; // current offset in WAL, not serialize it
|
||||||
} SCheckpointInfo;
|
} SCheckpointInfo;
|
||||||
|
|
||||||
typedef struct SStreamStatus {
|
typedef struct SStreamStatus {
|
||||||
|
|
|
@ -92,7 +92,7 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t ver) {
|
||||||
streamSetupScheduleTrigger(pTask);
|
streamSetupScheduleTrigger(pTask);
|
||||||
|
|
||||||
qDebug("snode:%d expand stream task on snode, s-task:%s, checkpoint ver:%" PRId64 " child id:%d, level:%d", SNODE_HANDLE,
|
qDebug("snode:%d expand stream task on snode, s-task:%s, checkpoint ver:%" PRId64 " child id:%d, level:%d", SNODE_HANDLE,
|
||||||
pTask->id.idStr, pTask->chkInfo.version, pTask->info.selfChildId, pTask->info.taskLevel);
|
pTask->id.idStr, pTask->chkInfo.checkpointVer, pTask->info.selfChildId, pTask->info.taskLevel);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
|
@ -758,12 +758,11 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
|
||||||
pTask->pMeta = pTq->pStreamMeta;
|
pTask->pMeta = pTq->pStreamMeta;
|
||||||
|
|
||||||
// checkpoint exists, restore from the last checkpoint
|
// checkpoint exists, restore from the last checkpoint
|
||||||
if (pTask->chkInfo.keptCheckpointId != 0) {
|
if (pTask->chkInfo.checkpointId != 0) {
|
||||||
ASSERT(pTask->chkInfo.version > 0);
|
ASSERT(pTask->chkInfo.checkpointVer > 0);
|
||||||
pTask->chkInfo.currentVer = pTask->chkInfo.version;
|
pTask->chkInfo.currentVer = pTask->chkInfo.checkpointVer;
|
||||||
pTask->dataRange.range.maxVer = pTask->chkInfo.version;
|
pTask->dataRange.range.maxVer = pTask->chkInfo.checkpointVer;
|
||||||
pTask->dataRange.range.minVer = pTask->chkInfo.version;
|
pTask->dataRange.range.minVer = pTask->chkInfo.checkpointVer;
|
||||||
pTask->chkInfo.currentVer = pTask->chkInfo.version;
|
|
||||||
} else {
|
} else {
|
||||||
pTask->chkInfo.currentVer = ver;
|
pTask->chkInfo.currentVer = ver;
|
||||||
pTask->dataRange.range.maxVer = ver;
|
pTask->dataRange.range.maxVer = ver;
|
||||||
|
@ -785,7 +784,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
|
||||||
}
|
}
|
||||||
|
|
||||||
SReadHandle handle = {
|
SReadHandle handle = {
|
||||||
.version = pTask->chkInfo.currentVer,
|
.checkpointId = pTask->chkInfo.checkpointId,
|
||||||
.vnode = pTq->pVnode,
|
.vnode = pTq->pVnode,
|
||||||
.initTqReader = 1,
|
.initTqReader = 1,
|
||||||
.pStateBackend = pTask->pState,
|
.pStateBackend = pTask->pState,
|
||||||
|
@ -817,7 +816,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
|
||||||
|
|
||||||
int32_t numOfVgroups = (int32_t)taosArrayGetSize(pTask->pUpstreamEpInfoList);
|
int32_t numOfVgroups = (int32_t)taosArrayGetSize(pTask->pUpstreamEpInfoList);
|
||||||
SReadHandle handle = {
|
SReadHandle handle = {
|
||||||
.version = pTask->chkInfo.currentVer,
|
.checkpointId = pTask->chkInfo.checkpointId,
|
||||||
.vnode = NULL,
|
.vnode = NULL,
|
||||||
.numOfVgroups = numOfVgroups,
|
.numOfVgroups = numOfVgroups,
|
||||||
.pStateBackend = pTask->pState,
|
.pStateBackend = pTask->pState,
|
||||||
|
@ -871,12 +870,12 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
|
||||||
|
|
||||||
tqInfo("vgId:%d expand stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64 " currentVer:%" PRId64
|
tqInfo("vgId:%d expand stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64 " currentVer:%" PRId64
|
||||||
" child id:%d, level:%d, scan-history:%d, trigger:%" PRId64 " ms",
|
" child id:%d, level:%d, scan-history:%d, trigger:%" PRId64 " ms",
|
||||||
vgId, pTask->id.idStr, pChkInfo->keptCheckpointId, pChkInfo->version, pChkInfo->currentVer,
|
vgId, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->currentVer,
|
||||||
pTask->info.selfChildId, pTask->info.taskLevel, pTask->info.fillHistory, pTask->triggerParam);
|
pTask->info.selfChildId, pTask->info.taskLevel, pTask->info.fillHistory, pTask->triggerParam);
|
||||||
|
|
||||||
if (pTask->chkInfo.keptCheckpointId != 0) {
|
if (pTask->chkInfo.checkpointId != 0) {
|
||||||
tqInfo("s-task:%s restore from the checkpointId:%" PRId64 " ver:%" PRId64 " currentVer:%" PRId64, pTask->id.idStr,
|
tqInfo("s-task:%s restore from the checkpointId:%" PRId64 " ver:%" PRId64 " currentVer:%" PRId64, pTask->id.idStr,
|
||||||
pChkInfo->keptCheckpointId, pChkInfo->version, pChkInfo->currentVer);
|
pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->currentVer);
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -1277,7 +1276,7 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
int8_t status = pTask->status.taskStatus;
|
int8_t status = pTask->status.taskStatus;
|
||||||
if (status == TASK_STATUS__NORMAL || status == TASK_STATUS__HALT || status == TASK_STATUS__CK) {
|
if (status == TASK_STATUS__NORMAL || status == TASK_STATUS__HALT || status == TASK_STATUS__CK) {
|
||||||
tqDebug("vgId:%d s-task:%s start to process block from inputQ, last chk point:%" PRId64, vgId, pTask->id.idStr,
|
tqDebug("vgId:%d s-task:%s start to process block from inputQ, last chk point:%" PRId64, vgId, pTask->id.idStr,
|
||||||
pTask->chkInfo.version);
|
pTask->chkInfo.checkpointVer);
|
||||||
streamProcessRunReq(pTask);
|
streamProcessRunReq(pTask);
|
||||||
} else {
|
} else {
|
||||||
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
|
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
|
||||||
|
|
|
@ -295,7 +295,7 @@ static int32_t vnodePreProcessDeleteMsg(SVnode *pVnode, SRpcMsg *pMsg) {
|
||||||
SEncoder *pCoder = &(SEncoder){0};
|
SEncoder *pCoder = &(SEncoder){0};
|
||||||
SDeleteRes res = {0};
|
SDeleteRes res = {0};
|
||||||
|
|
||||||
SReadHandle handle = {.config = &pVnode->config, .vnode = pVnode, .pMsgCb = &pVnode->msgCb};
|
SReadHandle handle = {.vnode = pVnode, .pMsgCb = &pVnode->msgCb};
|
||||||
initStorageAPI(&handle.api);
|
initStorageAPI(&handle.api);
|
||||||
|
|
||||||
code = qWorkerProcessDeleteMsg(&handle, pVnode->pQuery, pMsg, &res);
|
code = qWorkerProcessDeleteMsg(&handle, pVnode->pQuery, pMsg, &res);
|
||||||
|
@ -580,7 +580,7 @@ int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
SReadHandle handle = {.config = &pVnode->config, .vnode = pVnode, .pMsgCb = &pVnode->msgCb};
|
SReadHandle handle = {.vnode = pVnode, .pMsgCb = &pVnode->msgCb};
|
||||||
initStorageAPI(&handle.api);
|
initStorageAPI(&handle.api);
|
||||||
|
|
||||||
switch (pMsg->msgType) {
|
switch (pMsg->msgType) {
|
||||||
|
|
|
@ -241,8 +241,8 @@ int32_t streamSaveTasks(SStreamMeta* pMeta, int64_t checkpointId) {
|
||||||
uint32_t* pTaskId = taosArrayGet(pMeta->pTaskList, i);
|
uint32_t* pTaskId = taosArrayGet(pMeta->pTaskList, i);
|
||||||
SStreamTask* p = *(SStreamTask**)taosHashGet(pMeta->pTasks, pTaskId, sizeof(*pTaskId));
|
SStreamTask* p = *(SStreamTask**)taosHashGet(pMeta->pTasks, pTaskId, sizeof(*pTaskId));
|
||||||
|
|
||||||
ASSERT(p->chkInfo.keptCheckpointId < p->checkpointingId && p->checkpointingId == checkpointId);
|
ASSERT(p->chkInfo.checkpointId < p->checkpointingId && p->checkpointingId == checkpointId);
|
||||||
p->chkInfo.keptCheckpointId = p->checkpointingId;
|
p->chkInfo.checkpointId = p->checkpointingId;
|
||||||
|
|
||||||
int8_t prev = p->status.taskStatus;
|
int8_t prev = p->status.taskStatus;
|
||||||
p->status.taskStatus = TASK_STATUS__NORMAL;
|
p->status.taskStatus = TASK_STATUS__NORMAL;
|
||||||
|
@ -250,7 +250,7 @@ int32_t streamSaveTasks(SStreamMeta* pMeta, int64_t checkpointId) {
|
||||||
streamMetaSaveTask(pMeta, p);
|
streamMetaSaveTask(pMeta, p);
|
||||||
qDebug("vgId:%d s-task:%s commit task status after checkpoint completed, checkpointId:%" PRId64 ", ver:%" PRId64
|
qDebug("vgId:%d s-task:%s commit task status after checkpoint completed, checkpointId:%" PRId64 ", ver:%" PRId64
|
||||||
" currentVer:%" PRId64 ", status to be normal, prev:%s",
|
" currentVer:%" PRId64 ", status to be normal, prev:%s",
|
||||||
pMeta->vgId, p->id.idStr, checkpointId, p->chkInfo.version, p->chkInfo.currentVer,
|
pMeta->vgId, p->id.idStr, checkpointId, p->chkInfo.checkpointVer, p->chkInfo.currentVer,
|
||||||
streamGetTaskStatusStr(prev));
|
streamGetTaskStatusStr(prev));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -482,8 +482,8 @@ int32_t streamExecForAll(SStreamTask* pTask) {
|
||||||
const SStreamQueueItem* pItem = pInput;
|
const SStreamQueueItem* pItem = pInput;
|
||||||
qDebug("s-task:%s start to process batch of blocks, num:%d, type:%d", id, batchSize, pItem->type);
|
qDebug("s-task:%s start to process batch of blocks, num:%d, type:%d", id, batchSize, pItem->type);
|
||||||
|
|
||||||
int64_t ver = pTask->chkInfo.version;
|
int64_t ver = pTask->chkInfo.checkpointVer;
|
||||||
doSetStreamInputBlock(pTask, pInput, &pTask->chkInfo.version, id);
|
doSetStreamInputBlock(pTask, pInput, &pTask->chkInfo.checkpointVer, id);
|
||||||
|
|
||||||
int64_t resSize = 0;
|
int64_t resSize = 0;
|
||||||
int32_t totalBlocks = 0;
|
int32_t totalBlocks = 0;
|
||||||
|
@ -494,11 +494,11 @@ int32_t streamExecForAll(SStreamTask* pTask) {
|
||||||
resSize / 1048576.0, totalBlocks);
|
resSize / 1048576.0, totalBlocks);
|
||||||
|
|
||||||
// update the currentVer if processing the submit blocks.
|
// update the currentVer if processing the submit blocks.
|
||||||
ASSERT(pTask->chkInfo.version <= pTask->chkInfo.currentVer && ver <= pTask->chkInfo.version);
|
ASSERT(pTask->chkInfo.checkpointVer <= pTask->chkInfo.currentVer && ver <= pTask->chkInfo.checkpointVer);
|
||||||
|
|
||||||
if (ver != pTask->chkInfo.version) {
|
if (ver != pTask->chkInfo.checkpointVer) {
|
||||||
qDebug("s-task:%s update checkpoint ver from %" PRId64 " to %" PRId64, pTask->id.idStr, ver,
|
qDebug("s-task:%s update checkpoint ver from %" PRId64 " to %" PRId64, pTask->id.idStr, ver,
|
||||||
pTask->chkInfo.version);
|
pTask->chkInfo.checkpointVer);
|
||||||
}
|
}
|
||||||
|
|
||||||
streamFreeQitem(pInput);
|
streamFreeQitem(pInput);
|
||||||
|
|
|
@ -434,7 +434,7 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) {
|
||||||
// remove duplicate
|
// remove duplicate
|
||||||
void* p = taosHashGet(pMeta->pTasks, &pTask->id.taskId, sizeof(pTask->id.taskId));
|
void* p = taosHashGet(pMeta->pTasks, &pTask->id.taskId, sizeof(pTask->id.taskId));
|
||||||
if (p == NULL) {
|
if (p == NULL) {
|
||||||
if (pMeta->expandFunc(pMeta->ahandle, pTask, pTask->chkInfo.version) < 0) {
|
if (pMeta->expandFunc(pMeta->ahandle, pTask, pTask->chkInfo.checkpointVer) < 0) {
|
||||||
tdbFree(pKey);
|
tdbFree(pKey);
|
||||||
tdbFree(pVal);
|
tdbFree(pVal);
|
||||||
tdbTbcClose(pCur);
|
tdbTbcClose(pCur);
|
||||||
|
|
|
@ -85,8 +85,8 @@ int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
|
||||||
if (tEncodeI32(pEncoder, pTask->info.nodeId) < 0) return -1;
|
if (tEncodeI32(pEncoder, pTask->info.nodeId) < 0) return -1;
|
||||||
if (tEncodeSEpSet(pEncoder, &pTask->info.epSet) < 0) return -1;
|
if (tEncodeSEpSet(pEncoder, &pTask->info.epSet) < 0) return -1;
|
||||||
|
|
||||||
if (tEncodeI64(pEncoder, pTask->chkInfo.keptCheckpointId) < 0) return -1;
|
if (tEncodeI64(pEncoder, pTask->chkInfo.checkpointId) < 0) return -1;
|
||||||
if (tEncodeI64(pEncoder, pTask->chkInfo.version) < 0) return -1;
|
if (tEncodeI64(pEncoder, pTask->chkInfo.checkpointVer) < 0) return -1;
|
||||||
if (tEncodeI8(pEncoder, pTask->info.fillHistory) < 0) return -1;
|
if (tEncodeI8(pEncoder, pTask->info.fillHistory) < 0) return -1;
|
||||||
|
|
||||||
if (tEncodeI64(pEncoder, pTask->historyTaskId.streamId)) return -1;
|
if (tEncodeI64(pEncoder, pTask->historyTaskId.streamId)) return -1;
|
||||||
|
@ -148,8 +148,8 @@ int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
|
||||||
if (tDecodeI32(pDecoder, &pTask->info.nodeId) < 0) return -1;
|
if (tDecodeI32(pDecoder, &pTask->info.nodeId) < 0) return -1;
|
||||||
if (tDecodeSEpSet(pDecoder, &pTask->info.epSet) < 0) return -1;
|
if (tDecodeSEpSet(pDecoder, &pTask->info.epSet) < 0) return -1;
|
||||||
|
|
||||||
if (tDecodeI64(pDecoder, &pTask->chkInfo.keptCheckpointId) < 0) return -1;
|
if (tDecodeI64(pDecoder, &pTask->chkInfo.checkpointId) < 0) return -1;
|
||||||
if (tDecodeI64(pDecoder, &pTask->chkInfo.version) < 0) return -1;
|
if (tDecodeI64(pDecoder, &pTask->chkInfo.checkpointVer) < 0) return -1;
|
||||||
if (tDecodeI8(pDecoder, &pTask->info.fillHistory) < 0) return -1;
|
if (tDecodeI8(pDecoder, &pTask->info.fillHistory) < 0) return -1;
|
||||||
|
|
||||||
if (tDecodeI64(pDecoder, &pTask->historyTaskId.streamId)) return -1;
|
if (tDecodeI64(pDecoder, &pTask->historyTaskId.streamId)) return -1;
|
||||||
|
|
Loading…
Reference in New Issue