enh(stream): add more info in meta table.
This commit is contained in:
parent
31a728b1b8
commit
665107ad3a
|
@ -304,9 +304,9 @@ typedef struct SStreamTaskId {
|
||||||
|
|
||||||
typedef struct SCheckpointInfo {
|
typedef struct SCheckpointInfo {
|
||||||
int64_t startTs;
|
int64_t startTs;
|
||||||
int64_t checkpointId;
|
int64_t checkpointId; // latest checkpoint id
|
||||||
|
int64_t checkpointVer; // latest checkpoint offset in wal
|
||||||
int64_t checkpointVer; // latest checkpointId version
|
int64_t checkpointTime; // latest checkpoint time
|
||||||
int64_t processedVer;
|
int64_t processedVer;
|
||||||
int64_t nextProcessVer; // current offset in WAL, not serialize it
|
int64_t nextProcessVer; // current offset in WAL, not serialize it
|
||||||
int64_t failedId; // record the latest failed checkpoint id
|
int64_t failedId; // record the latest failed checkpoint id
|
||||||
|
@ -386,6 +386,9 @@ typedef struct STaskExecStatisInfo {
|
||||||
int64_t created;
|
int64_t created;
|
||||||
int64_t init;
|
int64_t init;
|
||||||
int64_t start;
|
int64_t start;
|
||||||
|
int64_t startCheckpointId;
|
||||||
|
int64_t startCheckpointVer;
|
||||||
|
|
||||||
int64_t step1Start;
|
int64_t step1Start;
|
||||||
double step1El;
|
double step1El;
|
||||||
int64_t step2Start;
|
int64_t step2Start;
|
||||||
|
@ -672,24 +675,34 @@ typedef struct {
|
||||||
int32_t tEncodeStreamCheckpointReadyMsg(SEncoder* pEncoder, const SStreamCheckpointReadyMsg* pRsp);
|
int32_t tEncodeStreamCheckpointReadyMsg(SEncoder* pEncoder, const SStreamCheckpointReadyMsg* pRsp);
|
||||||
int32_t tDecodeStreamCheckpointReadyMsg(SDecoder* pDecoder, SStreamCheckpointReadyMsg* pRsp);
|
int32_t tDecodeStreamCheckpointReadyMsg(SDecoder* pDecoder, SStreamCheckpointReadyMsg* pRsp);
|
||||||
|
|
||||||
|
typedef struct STaskCkptInfo {
|
||||||
|
int64_t latestId; // saved checkpoint id
|
||||||
|
int64_t latestVer; // saved checkpoint ver
|
||||||
|
int64_t latestTime; // latest checkpoint time
|
||||||
|
int64_t activeId; // current active checkpoint id
|
||||||
|
int32_t activeTransId; // checkpoint trans id
|
||||||
|
int8_t failed; // denote if the checkpoint is failed or not
|
||||||
|
} STaskCkptInfo;
|
||||||
|
|
||||||
typedef struct STaskStatusEntry {
|
typedef struct STaskStatusEntry {
|
||||||
STaskId id;
|
STaskId id;
|
||||||
int32_t status;
|
int32_t status;
|
||||||
int32_t statusLastDuration; // to record the last duration of current status
|
int32_t statusLastDuration; // to record the last duration of current status
|
||||||
int64_t stage;
|
int64_t stage;
|
||||||
int32_t nodeId;
|
int32_t nodeId;
|
||||||
int64_t verStart; // start version in WAL, only valid for source task
|
SVersionRange verRange; // start/end version in WAL, only valid for source task
|
||||||
int64_t verEnd; // end version in WAL, only valid for source task
|
int64_t processedVer; // only valid for source task
|
||||||
int64_t processedVer; // only valid for source task
|
bool inputQChanging; // inputQ is changing or not
|
||||||
int64_t checkpointId; // current active checkpoint id
|
int64_t inputQUnchangeCounter;
|
||||||
int32_t chkpointTransId; // checkpoint trans id
|
double inputQUsed; // in MiB
|
||||||
int8_t checkpointFailed; // denote if the checkpoint is failed or not
|
double inputRate;
|
||||||
bool inputQChanging; // inputQ is changing or not
|
double sinkQuota; // existed quota size for sink task
|
||||||
int64_t inputQUnchangeCounter;
|
double sinkDataSize; // sink to dst data size
|
||||||
double inputQUsed; // in MiB
|
int64_t startTime;
|
||||||
double inputRate;
|
int64_t startCheckpointId;
|
||||||
double sinkQuota; // existed quota size for sink task
|
int64_t startCheckpointVer;
|
||||||
double sinkDataSize; // sink to dst data size
|
int64_t hTaskId;
|
||||||
|
STaskCkptInfo checkpointInfo;
|
||||||
} STaskStatusEntry;
|
} STaskStatusEntry;
|
||||||
|
|
||||||
typedef struct SStreamHbMsg {
|
typedef struct SStreamHbMsg {
|
||||||
|
|
|
@ -159,6 +159,8 @@ static const SSysDbTableSchema userStbsSchema[] = {
|
||||||
static const SSysDbTableSchema streamSchema[] = {
|
static const SSysDbTableSchema streamSchema[] = {
|
||||||
{.name = "stream_name", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
{.name = "stream_name", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
||||||
{.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false},
|
{.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false},
|
||||||
|
{.name = "stream_id", .bytes = 16 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
||||||
|
{.name = "history_id", .bytes = 16 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
||||||
{.name = "sql", .bytes = TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
{.name = "sql", .bytes = TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
||||||
{.name = "status", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
{.name = "status", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
||||||
{.name = "source_db", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
{.name = "source_db", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
||||||
|
@ -182,9 +184,13 @@ static const SSysDbTableSchema streamTaskSchema[] = {
|
||||||
// {.name = "out_queue", .bytes = 20, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
// {.name = "out_queue", .bytes = 20, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
||||||
{.name = "info", .bytes = 35, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
{.name = "info", .bytes = 35, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
||||||
{.name = "start_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false},
|
{.name = "start_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false},
|
||||||
{.name = "checkpointId", .bytes = 25, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false},
|
{.name = "start_id", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false},
|
||||||
{.name = "checkpointInfo", .bytes = 25, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
{.name = "start_ver", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false},
|
||||||
|
{.name = "checkpoint_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false},
|
||||||
|
{.name = "checkpoint_id", .bytes = 25, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false},
|
||||||
|
{.name = "checkpoint_version", .bytes = 25, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false},
|
||||||
{.name = "ds_err_info", .bytes = 25, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
{.name = "ds_err_info", .bytes = 25, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
||||||
|
{.name = "history_task_id", .bytes = 16 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
||||||
{.name = "history_task_status", .bytes = 12 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
{.name = "history_task_status", .bytes = 12 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
|
@ -1304,9 +1304,31 @@ static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
|
||||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
colDataSetVal(pColInfo, numOfRows, (const char *)streamName, false);
|
colDataSetVal(pColInfo, numOfRows, (const char *)streamName, false);
|
||||||
|
|
||||||
|
// create time
|
||||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
colDataSetVal(pColInfo, numOfRows, (const char *)&pStream->createTime, false);
|
colDataSetVal(pColInfo, numOfRows, (const char *)&pStream->createTime, false);
|
||||||
|
|
||||||
|
// stream id
|
||||||
|
char buf[128] = {0};
|
||||||
|
int32_t len = tintToHex(pStream->uid, &buf[4]);
|
||||||
|
buf[2] = '0';
|
||||||
|
buf[3] = 'x';
|
||||||
|
varDataSetLen(buf, len + 2);
|
||||||
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
|
colDataSetVal(pColInfo, numOfRows, buf, false);
|
||||||
|
|
||||||
|
// related fill-history stream id
|
||||||
|
memset(buf, 0, tListLen(buf));
|
||||||
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
|
if (pStream->hTaskUid != 0) {
|
||||||
|
len = tintToHex(pStream->hTaskUid, &buf[4]);
|
||||||
|
varDataSetLen(buf, len + 2);
|
||||||
|
colDataSetVal(pColInfo, numOfRows, buf, false);
|
||||||
|
} else {
|
||||||
|
colDataSetVal(pColInfo, numOfRows, buf, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
// related fill-history stream id
|
||||||
char sql[TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE] = {0};
|
char sql[TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||||
STR_WITH_MAXSIZE_TO_VARSTR(sql, pStream->sql, sizeof(sql));
|
STR_WITH_MAXSIZE_TO_VARSTR(sql, pStream->sql, sizeof(sql));
|
||||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
|
@ -1469,13 +1491,14 @@ static int32_t setTaskAttrInResBlock(SStreamObj *pStream, SStreamTask *pTask, SS
|
||||||
// pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
// pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
// colDataSetVal(pColInfo, numOfRows, (const char*)vbuf, false);
|
// colDataSetVal(pColInfo, numOfRows, (const char*)vbuf, false);
|
||||||
|
|
||||||
|
// info
|
||||||
if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
|
if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
|
||||||
const char *sinkStr = "%.2fMiB";
|
const char *sinkStr = "%.2fMiB";
|
||||||
sprintf(buf, sinkStr, pe->sinkDataSize);
|
sprintf(buf, sinkStr, pe->sinkDataSize);
|
||||||
} else if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
|
} else if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
|
||||||
// offset info
|
// offset info
|
||||||
const char *offsetStr = "%" PRId64 " [%" PRId64 ", %" PRId64 "]";
|
const char *offsetStr = "%" PRId64 " [%" PRId64 ", %" PRId64 "]";
|
||||||
sprintf(buf, offsetStr, pe->processedVer, pe->verStart, pe->verEnd);
|
sprintf(buf, offsetStr, pe->processedVer, pe->verRange.minVer, pe->verRange.maxVer);
|
||||||
}
|
}
|
||||||
|
|
||||||
STR_TO_VARSTR(vbuf, buf);
|
STR_TO_VARSTR(vbuf, buf);
|
||||||
|
@ -1483,6 +1506,50 @@ static int32_t setTaskAttrInResBlock(SStreamObj *pStream, SStreamTask *pTask, SS
|
||||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
colDataSetVal(pColInfo, numOfRows, (const char *)vbuf, false);
|
colDataSetVal(pColInfo, numOfRows, (const char *)vbuf, false);
|
||||||
|
|
||||||
|
// start_time
|
||||||
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
|
colDataSetVal(pColInfo, numOfRows, (const char*)&pe->startTime, false);
|
||||||
|
|
||||||
|
// start id
|
||||||
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
|
colDataSetVal(pColInfo, numOfRows, (const char*)&pe->startCheckpointId, false);
|
||||||
|
|
||||||
|
// start ver
|
||||||
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
|
colDataSetVal(pColInfo, numOfRows, (const char*)&pe->startCheckpointVer, false);
|
||||||
|
|
||||||
|
// checkpoint time
|
||||||
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
|
if (pe->checkpointInfo.latestTime != 0) {
|
||||||
|
colDataSetVal(pColInfo, numOfRows, (const char *)&pe->checkpointInfo.latestTime, false);
|
||||||
|
} else {
|
||||||
|
colDataSetVal(pColInfo, numOfRows, 0, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
// checkpoint_id
|
||||||
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
|
colDataSetVal(pColInfo, numOfRows, (const char*)&pe->checkpointInfo.latestId, false);
|
||||||
|
|
||||||
|
// checkpoint info
|
||||||
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
|
colDataSetVal(pColInfo, numOfRows, (const char*)&pe->checkpointInfo.latestVer, false);
|
||||||
|
|
||||||
|
// ds_err_info
|
||||||
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
|
colDataSetVal(pColInfo, numOfRows, 0, true);
|
||||||
|
|
||||||
|
// history_task_id
|
||||||
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
|
if (pe->hTaskId != 0) {
|
||||||
|
colDataSetVal(pColInfo, numOfRows, (const char*)&pe->hTaskId, false);
|
||||||
|
} else {
|
||||||
|
colDataSetVal(pColInfo, numOfRows, 0, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
// history_task_status
|
||||||
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
|
colDataSetVal(pColInfo, numOfRows, 0, true);
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -294,12 +294,14 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
|
||||||
}
|
}
|
||||||
|
|
||||||
streamTaskStatusCopy(pTaskEntry, p);
|
streamTaskStatusCopy(pTaskEntry, p);
|
||||||
if ((p->checkpointId != 0) && p->checkpointFailed) {
|
|
||||||
|
STaskCkptInfo *pChkInfo = &p->checkpointInfo;
|
||||||
|
if ((pChkInfo->activeId != 0) && pChkInfo->failed) {
|
||||||
mError("stream task:0x%" PRIx64 " checkpointId:%" PRIx64 " transId:%d failed, kill it", p->id.taskId,
|
mError("stream task:0x%" PRIx64 " checkpointId:%" PRIx64 " transId:%d failed, kill it", p->id.taskId,
|
||||||
p->checkpointId, p->chkpointTransId);
|
pChkInfo->activeId, pChkInfo->activeTransId);
|
||||||
|
|
||||||
SFailedCheckpointInfo info = {
|
SFailedCheckpointInfo info = {
|
||||||
.transId = p->chkpointTransId, .checkpointId = p->checkpointId, .streamUid = p->id.streamId};
|
.transId = pChkInfo->activeTransId, .checkpointId = pChkInfo->activeId, .streamUid = p->id.streamId};
|
||||||
addIntoCheckpointList(pFailedTasks, &info);
|
addIntoCheckpointList(pFailedTasks, &info);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -63,6 +63,9 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t nextProcessVer
|
||||||
if (pChkInfo->checkpointId != 0) {
|
if (pChkInfo->checkpointId != 0) {
|
||||||
pChkInfo->nextProcessVer = pChkInfo->checkpointVer + 1;
|
pChkInfo->nextProcessVer = pChkInfo->checkpointVer + 1;
|
||||||
pChkInfo->processedVer = pChkInfo->checkpointVer;
|
pChkInfo->processedVer = pChkInfo->checkpointVer;
|
||||||
|
pTask->execInfo.startCheckpointVer = pChkInfo->nextProcessVer;
|
||||||
|
pTask->execInfo.startCheckpointId = pChkInfo->checkpointId;
|
||||||
|
|
||||||
sndInfo("s-task:%s restore from the checkpointId:%" PRId64 " ver:%" PRId64 " nextProcessVer:%" PRId64,
|
sndInfo("s-task:%s restore from the checkpointId:%" PRId64 " ver:%" PRId64 " nextProcessVer:%" PRId64,
|
||||||
pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer);
|
pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer);
|
||||||
}
|
}
|
||||||
|
|
|
@ -753,6 +753,8 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t nextProcessVer) {
|
||||||
if (pChkInfo->checkpointId != 0) {
|
if (pChkInfo->checkpointId != 0) {
|
||||||
pChkInfo->nextProcessVer = pChkInfo->checkpointVer + 1;
|
pChkInfo->nextProcessVer = pChkInfo->checkpointVer + 1;
|
||||||
pChkInfo->processedVer = pChkInfo->checkpointVer;
|
pChkInfo->processedVer = pChkInfo->checkpointVer;
|
||||||
|
pTask->execInfo.startCheckpointVer = pChkInfo->nextProcessVer;
|
||||||
|
pTask->execInfo.startCheckpointId = pChkInfo->checkpointId;
|
||||||
tqInfo("s-task:%s restore from the checkpointId:%" PRId64 " ver:%" PRId64 " currentVer:%" PRId64, pTask->id.idStr,
|
tqInfo("s-task:%s restore from the checkpointId:%" PRId64 " ver:%" PRId64 " currentVer:%" PRId64, pTask->id.idStr,
|
||||||
pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer);
|
pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer);
|
||||||
}
|
}
|
||||||
|
|
|
@ -309,6 +309,7 @@ int32_t streamSaveTaskCheckpointInfo(SStreamTask* p, int64_t checkpointId) {
|
||||||
|
|
||||||
pCKInfo->checkpointId = pCKInfo->checkpointingId;
|
pCKInfo->checkpointId = pCKInfo->checkpointingId;
|
||||||
pCKInfo->checkpointVer = pCKInfo->processedVer;
|
pCKInfo->checkpointVer = pCKInfo->processedVer;
|
||||||
|
pCKInfo->checkpointTime = pCKInfo->startTs;
|
||||||
|
|
||||||
streamTaskClearCheckInfo(p, false);
|
streamTaskClearCheckInfo(p, false);
|
||||||
taosThreadMutexUnlock(&p->lock);
|
taosThreadMutexUnlock(&p->lock);
|
||||||
|
|
|
@ -957,11 +957,18 @@ int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pReq) {
|
||||||
if (tEncodeDouble(pEncoder, ps->sinkQuota) < 0) return -1;
|
if (tEncodeDouble(pEncoder, ps->sinkQuota) < 0) return -1;
|
||||||
if (tEncodeDouble(pEncoder, ps->sinkDataSize) < 0) return -1;
|
if (tEncodeDouble(pEncoder, ps->sinkDataSize) < 0) return -1;
|
||||||
if (tEncodeI64(pEncoder, ps->processedVer) < 0) return -1;
|
if (tEncodeI64(pEncoder, ps->processedVer) < 0) return -1;
|
||||||
if (tEncodeI64(pEncoder, ps->verStart) < 0) return -1;
|
if (tEncodeI64(pEncoder, ps->verRange.minVer) < 0) return -1;
|
||||||
if (tEncodeI64(pEncoder, ps->verEnd) < 0) return -1;
|
if (tEncodeI64(pEncoder, ps->verRange.maxVer) < 0) return -1;
|
||||||
if (tEncodeI64(pEncoder, ps->checkpointId) < 0) return -1;
|
if (tEncodeI64(pEncoder, ps->checkpointInfo.activeId) < 0) return -1;
|
||||||
if (tEncodeI8(pEncoder, ps->checkpointFailed) < 0) return -1;
|
if (tEncodeI8(pEncoder, ps->checkpointInfo.failed) < 0) return -1;
|
||||||
if (tEncodeI32(pEncoder, ps->chkpointTransId) < 0) return -1;
|
if (tEncodeI32(pEncoder, ps->checkpointInfo.activeTransId) < 0) return -1;
|
||||||
|
if (tEncodeI64(pEncoder, ps->checkpointInfo.latestId) < 0) return -1;
|
||||||
|
if (tEncodeI64(pEncoder, ps->checkpointInfo.latestVer) < 0) return -1;
|
||||||
|
if (tEncodeI64(pEncoder, ps->checkpointInfo.latestTime) < 0) return -1;
|
||||||
|
if (tEncodeI64(pEncoder, ps->startTime) < 0) return -1;
|
||||||
|
if (tEncodeI64(pEncoder, ps->startCheckpointId) < 0) return -1;
|
||||||
|
if (tEncodeI64(pEncoder, ps->startCheckpointVer) < 0) return -1;
|
||||||
|
if (tEncodeI64(pEncoder, ps->hTaskId) < 0) return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t numOfVgs = taosArrayGetSize(pReq->pUpdateNodes);
|
int32_t numOfVgs = taosArrayGetSize(pReq->pUpdateNodes);
|
||||||
|
@ -996,11 +1003,19 @@ int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pReq) {
|
||||||
if (tDecodeDouble(pDecoder, &entry.sinkQuota) < 0) return -1;
|
if (tDecodeDouble(pDecoder, &entry.sinkQuota) < 0) return -1;
|
||||||
if (tDecodeDouble(pDecoder, &entry.sinkDataSize) < 0) return -1;
|
if (tDecodeDouble(pDecoder, &entry.sinkDataSize) < 0) return -1;
|
||||||
if (tDecodeI64(pDecoder, &entry.processedVer) < 0) return -1;
|
if (tDecodeI64(pDecoder, &entry.processedVer) < 0) return -1;
|
||||||
if (tDecodeI64(pDecoder, &entry.verStart) < 0) return -1;
|
if (tDecodeI64(pDecoder, &entry.verRange.minVer) < 0) return -1;
|
||||||
if (tDecodeI64(pDecoder, &entry.verEnd) < 0) return -1;
|
if (tDecodeI64(pDecoder, &entry.verRange.maxVer) < 0) return -1;
|
||||||
if (tDecodeI64(pDecoder, &entry.checkpointId) < 0) return -1;
|
if (tDecodeI64(pDecoder, &entry.checkpointInfo.activeId) < 0) return -1;
|
||||||
if (tDecodeI8(pDecoder, &entry.checkpointFailed) < 0) return -1;
|
if (tDecodeI8(pDecoder, &entry.checkpointInfo.failed) < 0) return -1;
|
||||||
if (tDecodeI32(pDecoder, &entry.chkpointTransId) < 0) return -1;
|
if (tDecodeI32(pDecoder, &entry.checkpointInfo.activeTransId) < 0) return -1;
|
||||||
|
|
||||||
|
if (tDecodeI64(pDecoder, &entry.checkpointInfo.latestId) < 0) return -1;
|
||||||
|
if (tDecodeI64(pDecoder, &entry.checkpointInfo.latestVer) < 0) return -1;
|
||||||
|
if (tDecodeI64(pDecoder, &entry.checkpointInfo.latestTime) < 0) return -1;
|
||||||
|
if (tDecodeI64(pDecoder, &entry.startTime) < 0) return -1;
|
||||||
|
if (tDecodeI64(pDecoder, &entry.startCheckpointId) < 0) return -1;
|
||||||
|
if (tDecodeI64(pDecoder, &entry.startCheckpointVer) < 0) return -1;
|
||||||
|
if (tDecodeI64(pDecoder, &entry.hTaskId) < 0) return -1;
|
||||||
|
|
||||||
entry.id.taskId = taskId;
|
entry.id.taskId = taskId;
|
||||||
taosArrayPush(pReq->pTaskStatus, &entry);
|
taosArrayPush(pReq->pTaskStatus, &entry);
|
||||||
|
@ -1102,7 +1117,16 @@ static int32_t metaHeartbeatToMnodeImpl(SStreamMeta* pMeta) {
|
||||||
.status = streamTaskGetStatus(*pTask)->state,
|
.status = streamTaskGetStatus(*pTask)->state,
|
||||||
.nodeId = hbMsg.vgId,
|
.nodeId = hbMsg.vgId,
|
||||||
.stage = pMeta->stage,
|
.stage = pMeta->stage,
|
||||||
|
|
||||||
.inputQUsed = SIZE_IN_MiB(streamQueueGetItemSize((*pTask)->inputq.queue)),
|
.inputQUsed = SIZE_IN_MiB(streamQueueGetItemSize((*pTask)->inputq.queue)),
|
||||||
|
.startTime = (*pTask)->execInfo.start,
|
||||||
|
.checkpointInfo.latestId = (*pTask)->chkInfo.checkpointId,
|
||||||
|
.checkpointInfo.latestVer = (*pTask)->chkInfo.checkpointVer,
|
||||||
|
.checkpointInfo.latestTime = (*pTask)->chkInfo.checkpointTime,
|
||||||
|
.hTaskId = (*pTask)->hTaskInfo.id.taskId,
|
||||||
|
|
||||||
|
.startCheckpointId = (*pTask)->execInfo.startCheckpointId,
|
||||||
|
.startCheckpointVer = (*pTask)->execInfo.startCheckpointVer,
|
||||||
};
|
};
|
||||||
|
|
||||||
entry.inputRate = entry.inputQUsed * 100.0 / (2 * STREAM_TASK_QUEUE_CAPACITY_IN_SIZE);
|
entry.inputRate = entry.inputQUsed * 100.0 / (2 * STREAM_TASK_QUEUE_CAPACITY_IN_SIZE);
|
||||||
|
@ -1112,11 +1136,11 @@ static int32_t metaHeartbeatToMnodeImpl(SStreamMeta* pMeta) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if ((*pTask)->chkInfo.checkpointingId != 0) {
|
if ((*pTask)->chkInfo.checkpointingId != 0) {
|
||||||
entry.checkpointFailed = ((*pTask)->chkInfo.failedId >= (*pTask)->chkInfo.checkpointingId) ? 1 : 0;
|
entry.checkpointInfo.failed = ((*pTask)->chkInfo.failedId >= (*pTask)->chkInfo.checkpointingId) ? 1 : 0;
|
||||||
entry.checkpointId = (*pTask)->chkInfo.checkpointingId;
|
entry.checkpointInfo.activeId = (*pTask)->chkInfo.checkpointingId;
|
||||||
entry.chkpointTransId = (*pTask)->chkInfo.transId;
|
entry.checkpointInfo.activeTransId = (*pTask)->chkInfo.transId;
|
||||||
|
|
||||||
if (entry.checkpointFailed) {
|
if (entry.checkpointInfo.failed) {
|
||||||
stInfo("s-task:%s send kill checkpoint trans info, transId:%d", (*pTask)->id.idStr, (*pTask)->chkInfo.transId);
|
stInfo("s-task:%s send kill checkpoint trans info, transId:%d", (*pTask)->id.idStr, (*pTask)->chkInfo.transId);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1127,7 +1151,7 @@ static int32_t metaHeartbeatToMnodeImpl(SStreamMeta* pMeta) {
|
||||||
entry.processedVer = (*pTask)->chkInfo.processedVer;
|
entry.processedVer = (*pTask)->chkInfo.processedVer;
|
||||||
}
|
}
|
||||||
|
|
||||||
walReaderValidVersionRange((*pTask)->exec.pWalReader, &entry.verStart, &entry.verEnd);
|
walReaderValidVersionRange((*pTask)->exec.pWalReader, &entry.verRange.minVer, &entry.verRange.maxVer);
|
||||||
}
|
}
|
||||||
|
|
||||||
addUpdateNodeIntoHbMsg(*pTask, &hbMsg);
|
addUpdateNodeIntoHbMsg(*pTask, &hbMsg);
|
||||||
|
|
|
@ -44,7 +44,7 @@ static void tryLaunchHistoryTask(void* param, void* tmrId);
|
||||||
static void doProcessDownstreamReadyRsp(SStreamTask* pTask);
|
static void doProcessDownstreamReadyRsp(SStreamTask* pTask);
|
||||||
|
|
||||||
int32_t streamTaskSetReady(SStreamTask* pTask) {
|
int32_t streamTaskSetReady(SStreamTask* pTask) {
|
||||||
int32_t numOfDowns = streamTaskGetNumOfDownstream(pTask);
|
int32_t numOfDowns = streamTaskGetNumOfDownstream(pTask);
|
||||||
SStreamTaskState* p = streamTaskGetStatus(pTask);
|
SStreamTaskState* p = streamTaskGetStatus(pTask);
|
||||||
|
|
||||||
if ((p->state == TASK_STATUS__SCAN_HISTORY) && pTask->info.taskLevel != TASK_LEVEL__SOURCE) {
|
if ((p->state == TASK_STATUS__SCAN_HISTORY) && pTask->info.taskLevel != TASK_LEVEL__SOURCE) {
|
||||||
|
|
|
@ -849,13 +849,15 @@ void streamTaskStatusCopy(STaskStatusEntry* pDst, const STaskStatusEntry* pSrc)
|
||||||
pDst->inputQUsed = pSrc->inputQUsed;
|
pDst->inputQUsed = pSrc->inputQUsed;
|
||||||
pDst->inputRate = pSrc->inputRate;
|
pDst->inputRate = pSrc->inputRate;
|
||||||
pDst->processedVer = pSrc->processedVer;
|
pDst->processedVer = pSrc->processedVer;
|
||||||
pDst->verStart = pSrc->verStart;
|
pDst->verRange = pSrc->verRange;
|
||||||
pDst->verEnd = pSrc->verEnd;
|
|
||||||
pDst->sinkQuota = pSrc->sinkQuota;
|
pDst->sinkQuota = pSrc->sinkQuota;
|
||||||
pDst->sinkDataSize = pSrc->sinkDataSize;
|
pDst->sinkDataSize = pSrc->sinkDataSize;
|
||||||
pDst->checkpointId = pSrc->checkpointId;
|
pDst->checkpointInfo = pSrc->checkpointInfo;
|
||||||
pDst->checkpointFailed = pSrc->checkpointFailed;
|
pDst->startCheckpointId = pSrc->startCheckpointId;
|
||||||
pDst->chkpointTransId = pSrc->chkpointTransId;
|
pDst->startCheckpointVer = pSrc->startCheckpointVer;
|
||||||
|
|
||||||
|
pDst->startTime = pSrc->startTime;
|
||||||
|
pDst->hTaskId = pSrc->hTaskId;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t taskPauseCallback(SStreamTask* pTask, void* param) {
|
static int32_t taskPauseCallback(SStreamTask* pTask, void* param) {
|
||||||
|
|
Loading…
Reference in New Issue