diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index c12bb146b4..5cecb1af42 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -304,9 +304,9 @@ typedef struct SStreamTaskId { typedef struct SCheckpointInfo { int64_t startTs; - int64_t checkpointId; - - int64_t checkpointVer; // latest checkpointId version + int64_t checkpointId; // latest checkpoint id + int64_t checkpointVer; // latest checkpoint offset in wal + int64_t checkpointTime; // latest checkpoint time int64_t processedVer; int64_t nextProcessVer; // current offset in WAL, not serialize it int64_t failedId; // record the latest failed checkpoint id @@ -386,6 +386,9 @@ typedef struct STaskExecStatisInfo { int64_t created; int64_t init; int64_t start; + int64_t startCheckpointId; + int64_t startCheckpointVer; + int64_t step1Start; double step1El; int64_t step2Start; @@ -672,24 +675,34 @@ typedef struct { int32_t tEncodeStreamCheckpointReadyMsg(SEncoder* pEncoder, const SStreamCheckpointReadyMsg* pRsp); int32_t tDecodeStreamCheckpointReadyMsg(SDecoder* pDecoder, SStreamCheckpointReadyMsg* pRsp); +typedef struct STaskCkptInfo { + int64_t latestId; // saved checkpoint id + int64_t latestVer; // saved checkpoint ver + int64_t latestTime; // latest checkpoint time + int64_t activeId; // current active checkpoint id + int32_t activeTransId; // checkpoint trans id + int8_t failed; // denote if the checkpoint is failed or not +} STaskCkptInfo; + typedef struct STaskStatusEntry { - STaskId id; - int32_t status; - int32_t statusLastDuration; // to record the last duration of current status - int64_t stage; - int32_t nodeId; - int64_t verStart; // start version in WAL, only valid for source task - int64_t verEnd; // end version in WAL, only valid for source task - int64_t processedVer; // only valid for source task - int64_t checkpointId; // current active checkpoint id - int32_t chkpointTransId; // checkpoint trans id - int8_t checkpointFailed; // denote if the checkpoint is failed or not - bool inputQChanging; // inputQ is changing or not - int64_t inputQUnchangeCounter; - double inputQUsed; // in MiB - double inputRate; - double sinkQuota; // existed quota size for sink task - double sinkDataSize; // sink to dst data size + STaskId id; + int32_t status; + int32_t statusLastDuration; // to record the last duration of current status + int64_t stage; + int32_t nodeId; + SVersionRange verRange; // start/end version in WAL, only valid for source task + int64_t processedVer; // only valid for source task + bool inputQChanging; // inputQ is changing or not + int64_t inputQUnchangeCounter; + double inputQUsed; // in MiB + double inputRate; + double sinkQuota; // existed quota size for sink task + double sinkDataSize; // sink to dst data size + int64_t startTime; + int64_t startCheckpointId; + int64_t startCheckpointVer; + int64_t hTaskId; + STaskCkptInfo checkpointInfo; } STaskStatusEntry; typedef struct SStreamHbMsg { diff --git a/source/common/src/systable.c b/source/common/src/systable.c index 80b3efb05d..9f1509077c 100644 --- a/source/common/src/systable.c +++ b/source/common/src/systable.c @@ -159,6 +159,8 @@ static const SSysDbTableSchema userStbsSchema[] = { static const SSysDbTableSchema streamSchema[] = { {.name = "stream_name", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false}, + {.name = "stream_id", .bytes = 16 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, + {.name = "history_id", .bytes = 16 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "sql", .bytes = TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "status", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "source_db", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, @@ -182,9 +184,13 @@ static const SSysDbTableSchema streamTaskSchema[] = { // {.name = "out_queue", .bytes = 20, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "info", .bytes = 35, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "start_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false}, - {.name = "checkpointId", .bytes = 25, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false}, - {.name = "checkpointInfo", .bytes = 25, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, + {.name = "start_id", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false}, + {.name = "start_ver", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false}, + {.name = "checkpoint_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false}, + {.name = "checkpoint_id", .bytes = 25, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false}, + {.name = "checkpoint_version", .bytes = 25, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false}, {.name = "ds_err_info", .bytes = 25, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, + {.name = "history_task_id", .bytes = 16 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "history_task_status", .bytes = 12 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, }; diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 8f9afb2adc..05b06e83a8 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1304,9 +1304,31 @@ static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataSetVal(pColInfo, numOfRows, (const char *)streamName, false); + // create time pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataSetVal(pColInfo, numOfRows, (const char *)&pStream->createTime, false); + // stream id + char buf[128] = {0}; + int32_t len = tintToHex(pStream->uid, &buf[4]); + buf[2] = '0'; + buf[3] = 'x'; + varDataSetLen(buf, len + 2); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataSetVal(pColInfo, numOfRows, buf, false); + + // related fill-history stream id + memset(buf, 0, tListLen(buf)); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + if (pStream->hTaskUid != 0) { + len = tintToHex(pStream->hTaskUid, &buf[4]); + varDataSetLen(buf, len + 2); + colDataSetVal(pColInfo, numOfRows, buf, false); + } else { + colDataSetVal(pColInfo, numOfRows, buf, true); + } + + // related fill-history stream id char sql[TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE] = {0}; STR_WITH_MAXSIZE_TO_VARSTR(sql, pStream->sql, sizeof(sql)); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); @@ -1469,13 +1491,14 @@ static int32_t setTaskAttrInResBlock(SStreamObj *pStream, SStreamTask *pTask, SS // pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); // colDataSetVal(pColInfo, numOfRows, (const char*)vbuf, false); + // info if (pTask->info.taskLevel == TASK_LEVEL__SINK) { const char *sinkStr = "%.2fMiB"; sprintf(buf, sinkStr, pe->sinkDataSize); } else if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { // offset info const char *offsetStr = "%" PRId64 " [%" PRId64 ", %" PRId64 "]"; - sprintf(buf, offsetStr, pe->processedVer, pe->verStart, pe->verEnd); + sprintf(buf, offsetStr, pe->processedVer, pe->verRange.minVer, pe->verRange.maxVer); } STR_TO_VARSTR(vbuf, buf); @@ -1483,6 +1506,50 @@ static int32_t setTaskAttrInResBlock(SStreamObj *pStream, SStreamTask *pTask, SS pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataSetVal(pColInfo, numOfRows, (const char *)vbuf, false); + // start_time + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataSetVal(pColInfo, numOfRows, (const char*)&pe->startTime, false); + + // start id + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataSetVal(pColInfo, numOfRows, (const char*)&pe->startCheckpointId, false); + + // start ver + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataSetVal(pColInfo, numOfRows, (const char*)&pe->startCheckpointVer, false); + + // checkpoint time + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + if (pe->checkpointInfo.latestTime != 0) { + colDataSetVal(pColInfo, numOfRows, (const char *)&pe->checkpointInfo.latestTime, false); + } else { + colDataSetVal(pColInfo, numOfRows, 0, true); + } + + // checkpoint_id + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataSetVal(pColInfo, numOfRows, (const char*)&pe->checkpointInfo.latestId, false); + + // checkpoint info + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataSetVal(pColInfo, numOfRows, (const char*)&pe->checkpointInfo.latestVer, false); + + // ds_err_info + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataSetVal(pColInfo, numOfRows, 0, true); + + // history_task_id + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + if (pe->hTaskId != 0) { + colDataSetVal(pColInfo, numOfRows, (const char*)&pe->hTaskId, false); + } else { + colDataSetVal(pColInfo, numOfRows, 0, true); + } + + // history_task_status + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataSetVal(pColInfo, numOfRows, 0, true); + return TSDB_CODE_SUCCESS; } diff --git a/source/dnode/mnode/impl/src/mndStreamHb.c b/source/dnode/mnode/impl/src/mndStreamHb.c index c8f943b931..1fedee3bcf 100644 --- a/source/dnode/mnode/impl/src/mndStreamHb.c +++ b/source/dnode/mnode/impl/src/mndStreamHb.c @@ -294,12 +294,14 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { } streamTaskStatusCopy(pTaskEntry, p); - if ((p->checkpointId != 0) && p->checkpointFailed) { + + STaskCkptInfo *pChkInfo = &p->checkpointInfo; + if ((pChkInfo->activeId != 0) && pChkInfo->failed) { mError("stream task:0x%" PRIx64 " checkpointId:%" PRIx64 " transId:%d failed, kill it", p->id.taskId, - p->checkpointId, p->chkpointTransId); + pChkInfo->activeId, pChkInfo->activeTransId); SFailedCheckpointInfo info = { - .transId = p->chkpointTransId, .checkpointId = p->checkpointId, .streamUid = p->id.streamId}; + .transId = pChkInfo->activeTransId, .checkpointId = pChkInfo->activeId, .streamUid = p->id.streamId}; addIntoCheckpointList(pFailedTasks, &info); } } diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index bd07974c3f..f17716eda0 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -63,6 +63,9 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t nextProcessVer if (pChkInfo->checkpointId != 0) { pChkInfo->nextProcessVer = pChkInfo->checkpointVer + 1; pChkInfo->processedVer = pChkInfo->checkpointVer; + pTask->execInfo.startCheckpointVer = pChkInfo->nextProcessVer; + pTask->execInfo.startCheckpointId = pChkInfo->checkpointId; + sndInfo("s-task:%s restore from the checkpointId:%" PRId64 " ver:%" PRId64 " nextProcessVer:%" PRId64, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer); } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index fb47414fb9..92dc55c0c3 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -753,6 +753,8 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t nextProcessVer) { if (pChkInfo->checkpointId != 0) { pChkInfo->nextProcessVer = pChkInfo->checkpointVer + 1; pChkInfo->processedVer = pChkInfo->checkpointVer; + pTask->execInfo.startCheckpointVer = pChkInfo->nextProcessVer; + pTask->execInfo.startCheckpointId = pChkInfo->checkpointId; tqInfo("s-task:%s restore from the checkpointId:%" PRId64 " ver:%" PRId64 " currentVer:%" PRId64, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer); } diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 7f52c5d2f0..86ee2b837d 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -309,6 +309,7 @@ int32_t streamSaveTaskCheckpointInfo(SStreamTask* p, int64_t checkpointId) { pCKInfo->checkpointId = pCKInfo->checkpointingId; pCKInfo->checkpointVer = pCKInfo->processedVer; + pCKInfo->checkpointTime = pCKInfo->startTs; streamTaskClearCheckInfo(p, false); taosThreadMutexUnlock(&p->lock); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index ea18e791a6..3c22f33f93 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -957,11 +957,18 @@ int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pReq) { if (tEncodeDouble(pEncoder, ps->sinkQuota) < 0) return -1; if (tEncodeDouble(pEncoder, ps->sinkDataSize) < 0) return -1; if (tEncodeI64(pEncoder, ps->processedVer) < 0) return -1; - if (tEncodeI64(pEncoder, ps->verStart) < 0) return -1; - if (tEncodeI64(pEncoder, ps->verEnd) < 0) return -1; - if (tEncodeI64(pEncoder, ps->checkpointId) < 0) return -1; - if (tEncodeI8(pEncoder, ps->checkpointFailed) < 0) return -1; - if (tEncodeI32(pEncoder, ps->chkpointTransId) < 0) return -1; + if (tEncodeI64(pEncoder, ps->verRange.minVer) < 0) return -1; + if (tEncodeI64(pEncoder, ps->verRange.maxVer) < 0) return -1; + if (tEncodeI64(pEncoder, ps->checkpointInfo.activeId) < 0) return -1; + if (tEncodeI8(pEncoder, ps->checkpointInfo.failed) < 0) return -1; + if (tEncodeI32(pEncoder, ps->checkpointInfo.activeTransId) < 0) return -1; + if (tEncodeI64(pEncoder, ps->checkpointInfo.latestId) < 0) return -1; + if (tEncodeI64(pEncoder, ps->checkpointInfo.latestVer) < 0) return -1; + if (tEncodeI64(pEncoder, ps->checkpointInfo.latestTime) < 0) return -1; + if (tEncodeI64(pEncoder, ps->startTime) < 0) return -1; + if (tEncodeI64(pEncoder, ps->startCheckpointId) < 0) return -1; + if (tEncodeI64(pEncoder, ps->startCheckpointVer) < 0) return -1; + if (tEncodeI64(pEncoder, ps->hTaskId) < 0) return -1; } int32_t numOfVgs = taosArrayGetSize(pReq->pUpdateNodes); @@ -996,11 +1003,19 @@ int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pReq) { if (tDecodeDouble(pDecoder, &entry.sinkQuota) < 0) return -1; if (tDecodeDouble(pDecoder, &entry.sinkDataSize) < 0) return -1; if (tDecodeI64(pDecoder, &entry.processedVer) < 0) return -1; - if (tDecodeI64(pDecoder, &entry.verStart) < 0) return -1; - if (tDecodeI64(pDecoder, &entry.verEnd) < 0) return -1; - if (tDecodeI64(pDecoder, &entry.checkpointId) < 0) return -1; - if (tDecodeI8(pDecoder, &entry.checkpointFailed) < 0) return -1; - if (tDecodeI32(pDecoder, &entry.chkpointTransId) < 0) return -1; + if (tDecodeI64(pDecoder, &entry.verRange.minVer) < 0) return -1; + if (tDecodeI64(pDecoder, &entry.verRange.maxVer) < 0) return -1; + if (tDecodeI64(pDecoder, &entry.checkpointInfo.activeId) < 0) return -1; + if (tDecodeI8(pDecoder, &entry.checkpointInfo.failed) < 0) return -1; + if (tDecodeI32(pDecoder, &entry.checkpointInfo.activeTransId) < 0) return -1; + + if (tDecodeI64(pDecoder, &entry.checkpointInfo.latestId) < 0) return -1; + if (tDecodeI64(pDecoder, &entry.checkpointInfo.latestVer) < 0) return -1; + if (tDecodeI64(pDecoder, &entry.checkpointInfo.latestTime) < 0) return -1; + if (tDecodeI64(pDecoder, &entry.startTime) < 0) return -1; + if (tDecodeI64(pDecoder, &entry.startCheckpointId) < 0) return -1; + if (tDecodeI64(pDecoder, &entry.startCheckpointVer) < 0) return -1; + if (tDecodeI64(pDecoder, &entry.hTaskId) < 0) return -1; entry.id.taskId = taskId; taosArrayPush(pReq->pTaskStatus, &entry); @@ -1102,7 +1117,16 @@ static int32_t metaHeartbeatToMnodeImpl(SStreamMeta* pMeta) { .status = streamTaskGetStatus(*pTask)->state, .nodeId = hbMsg.vgId, .stage = pMeta->stage, + .inputQUsed = SIZE_IN_MiB(streamQueueGetItemSize((*pTask)->inputq.queue)), + .startTime = (*pTask)->execInfo.start, + .checkpointInfo.latestId = (*pTask)->chkInfo.checkpointId, + .checkpointInfo.latestVer = (*pTask)->chkInfo.checkpointVer, + .checkpointInfo.latestTime = (*pTask)->chkInfo.checkpointTime, + .hTaskId = (*pTask)->hTaskInfo.id.taskId, + + .startCheckpointId = (*pTask)->execInfo.startCheckpointId, + .startCheckpointVer = (*pTask)->execInfo.startCheckpointVer, }; entry.inputRate = entry.inputQUsed * 100.0 / (2 * STREAM_TASK_QUEUE_CAPACITY_IN_SIZE); @@ -1112,11 +1136,11 @@ static int32_t metaHeartbeatToMnodeImpl(SStreamMeta* pMeta) { } if ((*pTask)->chkInfo.checkpointingId != 0) { - entry.checkpointFailed = ((*pTask)->chkInfo.failedId >= (*pTask)->chkInfo.checkpointingId) ? 1 : 0; - entry.checkpointId = (*pTask)->chkInfo.checkpointingId; - entry.chkpointTransId = (*pTask)->chkInfo.transId; + entry.checkpointInfo.failed = ((*pTask)->chkInfo.failedId >= (*pTask)->chkInfo.checkpointingId) ? 1 : 0; + entry.checkpointInfo.activeId = (*pTask)->chkInfo.checkpointingId; + entry.checkpointInfo.activeTransId = (*pTask)->chkInfo.transId; - if (entry.checkpointFailed) { + if (entry.checkpointInfo.failed) { stInfo("s-task:%s send kill checkpoint trans info, transId:%d", (*pTask)->id.idStr, (*pTask)->chkInfo.transId); } } @@ -1127,7 +1151,7 @@ static int32_t metaHeartbeatToMnodeImpl(SStreamMeta* pMeta) { entry.processedVer = (*pTask)->chkInfo.processedVer; } - walReaderValidVersionRange((*pTask)->exec.pWalReader, &entry.verStart, &entry.verEnd); + walReaderValidVersionRange((*pTask)->exec.pWalReader, &entry.verRange.minVer, &entry.verRange.maxVer); } addUpdateNodeIntoHbMsg(*pTask, &hbMsg); diff --git a/source/libs/stream/src/streamStart.c b/source/libs/stream/src/streamStart.c index f2a694a554..32bd3742ad 100644 --- a/source/libs/stream/src/streamStart.c +++ b/source/libs/stream/src/streamStart.c @@ -44,7 +44,7 @@ static void tryLaunchHistoryTask(void* param, void* tmrId); static void doProcessDownstreamReadyRsp(SStreamTask* pTask); int32_t streamTaskSetReady(SStreamTask* pTask) { - int32_t numOfDowns = streamTaskGetNumOfDownstream(pTask); + int32_t numOfDowns = streamTaskGetNumOfDownstream(pTask); SStreamTaskState* p = streamTaskGetStatus(pTask); if ((p->state == TASK_STATUS__SCAN_HISTORY) && pTask->info.taskLevel != TASK_LEVEL__SOURCE) { diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index c34e162326..7badbfa9f3 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -849,13 +849,15 @@ void streamTaskStatusCopy(STaskStatusEntry* pDst, const STaskStatusEntry* pSrc) pDst->inputQUsed = pSrc->inputQUsed; pDst->inputRate = pSrc->inputRate; pDst->processedVer = pSrc->processedVer; - pDst->verStart = pSrc->verStart; - pDst->verEnd = pSrc->verEnd; + pDst->verRange = pSrc->verRange; pDst->sinkQuota = pSrc->sinkQuota; pDst->sinkDataSize = pSrc->sinkDataSize; - pDst->checkpointId = pSrc->checkpointId; - pDst->checkpointFailed = pSrc->checkpointFailed; - pDst->chkpointTransId = pSrc->chkpointTransId; + pDst->checkpointInfo = pSrc->checkpointInfo; + pDst->startCheckpointId = pSrc->startCheckpointId; + pDst->startCheckpointVer = pSrc->startCheckpointVer; + + pDst->startTime = pSrc->startTime; + pDst->hTaskId = pSrc->hTaskId; } static int32_t taskPauseCallback(SStreamTask* pTask, void* param) {