fix(stream): add flag to disable the concurrently started consensus-checkpointId procedure.
This commit is contained in:
parent
47b0a0464e
commit
c35c634977
|
@ -272,9 +272,8 @@ typedef struct SCheckpointInfo {
|
||||||
int64_t checkpointTime; // latest checkpoint time
|
int64_t checkpointTime; // latest checkpoint time
|
||||||
int64_t processedVer;
|
int64_t processedVer;
|
||||||
int64_t nextProcessVer; // current offset in WAL, not serialize it
|
int64_t nextProcessVer; // current offset in WAL, not serialize it
|
||||||
|
|
||||||
SActiveCheckpointInfo* pActiveInfo;
|
|
||||||
int64_t msgVer;
|
int64_t msgVer;
|
||||||
|
SActiveCheckpointInfo* pActiveInfo;
|
||||||
} SCheckpointInfo;
|
} SCheckpointInfo;
|
||||||
|
|
||||||
typedef struct SStreamStatus {
|
typedef struct SStreamStatus {
|
||||||
|
@ -289,6 +288,7 @@ typedef struct SStreamStatus {
|
||||||
int32_t inScanHistorySentinel;
|
int32_t inScanHistorySentinel;
|
||||||
bool appendTranstateBlock; // has append the transfer state data block already
|
bool appendTranstateBlock; // has append the transfer state data block already
|
||||||
bool removeBackendFiles; // remove backend files on disk when free stream tasks
|
bool removeBackendFiles; // remove backend files on disk when free stream tasks
|
||||||
|
bool sendConsensusChkptId;
|
||||||
} SStreamStatus;
|
} SStreamStatus;
|
||||||
|
|
||||||
typedef struct SDataRange {
|
typedef struct SDataRange {
|
||||||
|
|
|
@ -1107,6 +1107,17 @@ int32_t streamTaskSendRestoreChkptMsg(SStreamTask* pTask) {
|
||||||
const char* id = pTask->id.idStr;
|
const char* id = pTask->id.idStr;
|
||||||
SCheckpointInfo* pInfo = &pTask->chkInfo;
|
SCheckpointInfo* pInfo = &pTask->chkInfo;
|
||||||
|
|
||||||
|
taosThreadMutexLock(&pTask->lock);
|
||||||
|
if (pTask->status.sendConsensusChkptId == true) {
|
||||||
|
stDebug("s-task:%s already start to consensus-checkpointId, not start again before it completed", id);
|
||||||
|
taosThreadMutexUnlock(&pTask->lock);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
} else {
|
||||||
|
pTask->status.sendConsensusChkptId = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
taosThreadMutexUnlock(&pTask->lock);
|
||||||
|
|
||||||
ASSERT(pTask->pBackend == NULL);
|
ASSERT(pTask->pBackend == NULL);
|
||||||
|
|
||||||
SRestoreCheckpointInfo req = {
|
SRestoreCheckpointInfo req = {
|
||||||
|
|
|
@ -1198,7 +1198,7 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
if ((pTask->pBackend == NULL) && (pTask->info.fillHistory == 1 || HAS_RELATED_FILLHISTORY_TASK(pTask))) {
|
if ((pTask->pBackend == NULL) && ((pTask->info.fillHistory == 1) || HAS_RELATED_FILLHISTORY_TASK(pTask))) {
|
||||||
code = pMeta->expandTaskFn(pTask);
|
code = pMeta->expandTaskFn(pTask);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
stError("s-task:0x%x vgId:%d failed to expand stream backend", pTaskId->taskId, vgId);
|
stError("s-task:0x%x vgId:%d failed to expand stream backend", pTaskId->taskId, vgId);
|
||||||
|
@ -1392,17 +1392,24 @@ int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int3
|
||||||
|
|
||||||
streamMetaWLock(pMeta);
|
streamMetaWLock(pMeta);
|
||||||
|
|
||||||
if (pStartInfo->startAllTasks != 1) {
|
SStreamTask** p = taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
|
||||||
int64_t el = endTs - startTs;
|
if (p == NULL) { // task does not exists in current vnode, not record the complete info
|
||||||
stDebug("vgId:%d not start all task(s), not record status, s-task:0x%x launch succ:%d elapsed time:%" PRId64 "ms",
|
stError("vgId:%d s-task:0x%x not exists discard the check downstream info", pMeta->vgId, taskId);
|
||||||
pMeta->vgId, taskId, ready, el);
|
|
||||||
streamMetaWUnLock(pMeta);
|
streamMetaWUnLock(pMeta);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
void* p = taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
|
// clear the send consensus-checkpointId flag
|
||||||
if (p == NULL) { // task does not exists in current vnode, not record the complete info
|
taosThreadMutexLock(&(*p)->lock);
|
||||||
stError("vgId:%d s-task:0x%x not exists discard the check downstream info", pMeta->vgId, taskId);
|
(*p)->status.sendConsensusChkptId = false;
|
||||||
|
taosThreadMutexUnlock(&(*p)->lock);
|
||||||
|
|
||||||
|
if (pStartInfo->startAllTasks != 1) {
|
||||||
|
int64_t el = endTs - startTs;
|
||||||
|
stDebug(
|
||||||
|
"vgId:%d not in start all task(s) process, not record launch result status, s-task:0x%x launch succ:%d elapsed "
|
||||||
|
"time:%" PRId64 "ms",
|
||||||
|
pMeta->vgId, taskId, ready, el);
|
||||||
streamMetaWUnLock(pMeta);
|
streamMetaWUnLock(pMeta);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue