diff --git a/include/common/cos.h b/include/common/cos.h index 8e48533304..17c48d594b 100644 --- a/include/common/cos.h +++ b/include/common/cos.h @@ -45,7 +45,7 @@ int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t size, int32_t s3GetObjectsByPrefix(const char *prefix, const char *path); void s3EvictCache(const char *path, long object_size); long s3Size(const char *object_name); -int32_t s3GetObjectToFile(const char *object_name, char *fileName); +int32_t s3GetObjectToFile(const char *object_name, const char *fileName); #define S3_DATA_CHUNK_PAGES (256 * 1024 * 1024) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 4423bf94b6..e3487c49d1 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -56,6 +56,7 @@ extern "C" { #define STREAM_EXEC_T_RESTART_ALL_TASKS (-4) #define STREAM_EXEC_T_STOP_ALL_TASKS (-5) #define STREAM_EXEC_T_RESUME_TASK (-6) +#define STREAM_EXEC_T_ADD_FAILED_TASK (-7) typedef struct SStreamTask SStreamTask; typedef struct SStreamQueue SStreamQueue; @@ -443,6 +444,7 @@ typedef struct SDownstreamStatusInfo { typedef struct STaskCheckInfo { SArray* pList; int64_t startTs; + int64_t timeoutStartTs; int32_t notReadyTasks; int32_t inCheckProcess; int32_t stopCheckProcess; @@ -547,7 +549,7 @@ typedef struct SStreamMeta { SArray* chkpSaved; SArray* chkpInUse; SRWLatch chkpDirLock; - void* qHandle; + void* qHandle; // todo remove it void* bkdChkptMgt; } SStreamMeta; @@ -885,6 +887,7 @@ bool streamMetaTaskInTimer(SStreamMeta* pMeta); int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, int64_t startTs, int64_t endTs, bool ready); int32_t streamMetaResetTaskStatus(SStreamMeta* pMeta); +int32_t streamMetaAddFailedTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId); void streamMetaRLock(SStreamMeta* pMeta); void streamMetaRUnLock(SStreamMeta* pMeta); diff --git a/source/common/src/cos.c b/source/common/src/cos.c index 990bfdcea3..0db6664ab9 100644 --- a/source/common/src/cos.c +++ b/source/common/src/cos.c @@ -1196,7 +1196,7 @@ static S3Status getObjectCallback(int bufferSize, const char *buffer, void *call return ((wrote < (size_t)bufferSize) ? S3StatusAbortedByCallback : S3StatusOK); } -int32_t s3GetObjectToFile(const char *object_name, char *fileName) { +int32_t s3GetObjectToFile(const char *object_name, const char *fileName) { int64_t ifModifiedSince = -1, ifNotModifiedSince = -1; const char *ifMatch = 0, *ifNotMatch = 0; @@ -1733,6 +1733,6 @@ int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t size, void s3EvictCache(const char *path, long object_size) {} long s3Size(const char *object_name) { return 0; } int32_t s3GetObjectsByPrefix(const char *prefix, const char *path) { return 0; } -int32_t s3GetObjectToFile(const char *object_name, char *fileName) { return 0; } +int32_t s3GetObjectToFile(const char *object_name, const char *fileName) { return 0; } #endif diff --git a/source/common/src/systable.c b/source/common/src/systable.c index 14e8088dfe..bf2f14339d 100644 --- a/source/common/src/systable.c +++ b/source/common/src/systable.c @@ -173,6 +173,7 @@ static const SSysDbTableSchema streamSchema[] = { {.name = "watermark", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false}, {.name = "trigger", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "sink_quota", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, + {.name = "checkpoint_backup", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "history_scan_idle", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, }; @@ -193,6 +194,7 @@ static const SSysDbTableSchema streamTaskSchema[] = { {.name = "checkpoint_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false}, {.name = "checkpoint_id", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false}, {.name = "checkpoint_version", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false}, + {.name = "checkpoint_backup", .bytes = 15, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "ds_err_info", .bytes = 25, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "history_task_id", .bytes = 16 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "history_task_status", .bytes = 12 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 3dc26b4118..c87b8e84f4 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1495,6 +1495,13 @@ static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataSetVal(pColInfo, numOfRows, (const char *)dstStr, false); + // checkpoint backup type + char backup[20 + VARSTR_HEADER_SIZE] = {0}; + STR_TO_VARSTR(backup, "none") + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataSetVal(pColInfo, numOfRows, (const char *)backup, false); + + // history scan idle char scanHistoryIdle[20 + VARSTR_HEADER_SIZE] = {0}; strcpy(scanHistoryIdle, "100a"); @@ -1644,10 +1651,14 @@ static int32_t setTaskAttrInResBlock(SStreamObj *pStream, SStreamTask *pTask, SS pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataSetVal(pColInfo, numOfRows, (const char*)&pe->checkpointInfo.latestId, false); - // checkpoint info + // checkpoint version pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataSetVal(pColInfo, numOfRows, (const char*)&pe->checkpointInfo.latestVer, false); + // checkpoint backup status + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataSetVal(pColInfo, numOfRows, 0, true); + // ds_err_info pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataSetVal(pColInfo, numOfRows, 0, true); diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 04c0c0d204..924b0a8207 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -23,6 +23,10 @@ typedef struct STaskUpdateEntry { int32_t transId; } STaskUpdateEntry; +typedef struct SMStreamCheckpointReadyRspMsg { + SMsgHead head; +} SMStreamCheckpointReadyRspMsg; + static STaskId replaceStreamTaskId(SStreamTask* pTask) { ASSERT(pTask->info.fillHistory); STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId}; @@ -518,63 +522,15 @@ int32_t tqStreamTaskProcessCheckRsp(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLe tqDebug("tq task:0x%x (vgId:%d) recv check rsp(reqId:0x%" PRIx64 ") from 0x%x (vgId:%d) status %d", rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.status); - int64_t initTs = 0; - int64_t now = taosGetTimestampMs(); - STaskId id = {.streamId = rsp.streamId, .taskId = rsp.upstreamTaskId}; - STaskId fId = {0}; - bool hasHistoryTask = false; - - // todo extract method if (!isLeader) { - // this task may have been stopped, so acquire task may failed. Retrieve it directly from the task hash map. - streamMetaRLock(pMeta); - - SStreamTask** ppTask = taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); - if (ppTask != NULL) { - setParam(*ppTask, &initTs, &hasHistoryTask, &fId); - streamMetaRUnLock(pMeta); - - if (hasHistoryTask) { - streamMetaAddTaskLaunchResult(pMeta, fId.streamId, fId.taskId, initTs, now, false); - } - - tqError("vgId:%d not leader, task:0x%x not handle the check rsp, downstream:0x%x (vgId:%d)", vgId, - rsp.upstreamTaskId, rsp.downstreamTaskId, rsp.downstreamNodeId); - } else { - streamMetaRUnLock(pMeta); - - tqError("tq failed to locate the stream task:0x%" PRIx64 "-0x%x (vgId:%d), it may have been destroyed or stopped", - rsp.streamId, rsp.upstreamTaskId, vgId); - code = terrno = TSDB_CODE_STREAM_TASK_NOT_EXIST; - } - - streamMetaAddTaskLaunchResult(pMeta, rsp.streamId, rsp.upstreamTaskId, initTs, now, false); - return code; + tqError("vgId:%d not leader, task:0x%x not handle the check rsp, downstream:0x%x (vgId:%d)", vgId, + rsp.upstreamTaskId, rsp.downstreamTaskId, rsp.downstreamNodeId); + return streamMetaAddFailedTask(pMeta, rsp.streamId, rsp.upstreamTaskId); } SStreamTask* pTask = streamMetaAcquireTask(pMeta, rsp.streamId, rsp.upstreamTaskId); if (pTask == NULL) { - streamMetaRLock(pMeta); - - // let's try to find this task in hashmap - SStreamTask** ppTask = taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); - if (ppTask != NULL) { - setParam(*ppTask, &initTs, &hasHistoryTask, &fId); - streamMetaRUnLock(pMeta); - - if (hasHistoryTask) { - streamMetaAddTaskLaunchResult(pMeta, fId.streamId, fId.taskId, initTs, now, false); - } - } else { // not exist even in the hash map of meta, forget it - streamMetaRUnLock(pMeta); - } - - streamMetaAddTaskLaunchResult(pMeta, rsp.streamId, rsp.upstreamTaskId, initTs, now, false); - tqError("tq failed to locate the stream task:0x%" PRIx64 "-0x%x (vgId:%d), it may have been destroyed or stopped", - rsp.streamId, rsp.upstreamTaskId, vgId); - - code = terrno = TSDB_CODE_STREAM_TASK_NOT_EXIST; - return code; + return streamMetaAddFailedTask(pMeta, rsp.streamId, rsp.upstreamTaskId); } code = streamProcessCheckRsp(pTask, &rsp); @@ -582,10 +538,6 @@ int32_t tqStreamTaskProcessCheckRsp(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLe return code; } -typedef struct SMStreamCheckpointReadyRspMsg { - SMsgHead head; -} SMStreamCheckpointReadyRspMsg; - int32_t tqStreamTaskProcessCheckpointReadyMsg(SStreamMeta* pMeta, SRpcMsg* pMsg) { int32_t vgId = pMeta->vgId; char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); @@ -868,6 +820,9 @@ int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLead } else if (type == STREAM_EXEC_T_STOP_ALL_TASKS) { streamMetaStopAllTasks(pMeta); return 0; + } else if (type == STREAM_EXEC_T_ADD_FAILED_TASK) { + int32_t code = streamMetaAddFailedTask(pMeta, pReq->streamId, pReq->taskId); + return code; } else if (type == STREAM_EXEC_T_RESUME_TASK) { // task resume to run after idle for a while SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId); diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index 07dce9a451..0ee31197dc 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -122,7 +122,7 @@ int32_t streamSendCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pReq, int32_t streamAddCheckpointReadyMsg(SStreamTask* pTask, int32_t srcTaskId, int32_t index, int64_t checkpointId); int32_t streamTaskSendCheckpointReadyMsg(SStreamTask* pTask); int32_t streamTaskSendCheckpointSourceRsp(SStreamTask* pTask); -void streamTaskSetCheckpointFailedId(SStreamTask* pTask); +void streamTaskSetFailedCheckpointId(SStreamTask* pTask); int32_t streamTaskGetNumOfDownstream(const SStreamTask* pTask); int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t numRate, float quotaRate, const char*); STaskId streamTaskGetTaskId(const SStreamTask* pTask); @@ -160,8 +160,6 @@ ECHECKPOINT_BACKUP_TYPE streamGetCheckpointBackupType(); int32_t streamTaskBackupCheckpoint(char* id, char* path); int32_t downloadCheckpoint(char* id, char* path); int32_t deleteCheckpoint(char* id); -int32_t deleteCheckpointFile(char* id, char* name); -int32_t downloadCheckpointByName(char* id, char* fname, char* dstName); int32_t streamTaskOnNormalTaskReady(SStreamTask* pTask); int32_t streamTaskOnScanhistoryTaskReady(SStreamTask* pTask); diff --git a/source/libs/stream/src/streamCheckStatus.c b/source/libs/stream/src/streamCheckStatus.c index 0a87833055..ea9b2ef89f 100644 --- a/source/libs/stream/src/streamCheckStatus.c +++ b/source/libs/stream/src/streamCheckStatus.c @@ -175,7 +175,7 @@ int32_t streamTaskStartMonitorCheckRsp(SStreamTask* pTask) { streamTaskInitTaskCheckInfo(pInfo, &pTask->outputInfo, taosGetTimestampMs()); int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); - stDebug("s-task:%s start check rsp monit, ref:%d ", pTask->id.idStr, ref); + stDebug("s-task:%s start check-rsp monit, ref:%d ", pTask->id.idStr, ref); if (pInfo->checkRspTmr == NULL) { pInfo->checkRspTmr = taosTmrStart(rspMonitorFn, CHECK_RSP_CHECK_INTERVAL, pTask, streamTimer); @@ -194,7 +194,7 @@ int32_t streamTaskStopMonitorCheckRsp(STaskCheckInfo* pInfo, const char* id) { pInfo->stopCheckProcess = 1; taosThreadMutexUnlock(&pInfo->checkInfoLock); - stDebug("s-task:%s set stop check rsp mon", id); + stDebug("s-task:%s set stop check-rsp monit", id); return TSDB_CODE_SUCCESS; } @@ -272,6 +272,8 @@ int32_t streamTaskInitTaskCheckInfo(STaskCheckInfo* pInfo, STaskOutputInfo* pOut } pInfo->startTs = startTs; + pInfo->timeoutStartTs = startTs; + pInfo->stopCheckProcess = 0; return TSDB_CODE_SUCCESS; } @@ -293,7 +295,7 @@ int32_t streamTaskUpdateCheckInfo(STaskCheckInfo* pInfo, int32_t taskId, int32_t SDownstreamStatusInfo* p = findCheckRspStatus(pInfo, taskId); if (p != NULL) { if (reqId != p->reqId) { - stError("s-task:%s reqId:%" PRIx64 " expected:%" PRIx64 " expired check-rsp recv from downstream task:0x%x, discarded", + stError("s-task:%s reqId:0x%" PRIx64 " expected:0x%" PRIx64 " expired check-rsp recv from downstream task:0x%x, discarded", id, reqId, p->reqId, taskId); taosThreadMutexUnlock(&pInfo->checkInfoLock); return TSDB_CODE_FAILED; @@ -329,7 +331,7 @@ int32_t streamTaskStartCheckDownstream(STaskCheckInfo* pInfo, const char* id) { return TSDB_CODE_FAILED; } - stDebug("s-task:%s set the in-check-procedure flag", id); + stDebug("s-task:%s set the in check-rsp flag", id); return TSDB_CODE_SUCCESS; } @@ -343,9 +345,10 @@ int32_t streamTaskCompleteCheckRsp(STaskCheckInfo* pInfo, bool lock, const char* } int64_t el = (pInfo->startTs != 0) ? (taosGetTimestampMs() - pInfo->startTs) : 0; - stDebug("s-task:%s clear the in-check-procedure flag, not in-check-procedure elapsed time:%" PRId64 " ms", id, el); + stDebug("s-task:%s clear the in check-rsp flag, not in check-rsp anymore, elapsed time:%" PRId64 " ms", id, el); pInfo->startTs = 0; + pInfo->timeoutStartTs = 0; pInfo->notReadyTasks = 0; pInfo->inCheckProcess = 0; pInfo->stopCheckProcess = 0; @@ -458,6 +461,7 @@ void handleTimeoutDownstreamTasks(SStreamTask* pTask, SArray* pTimeoutList) { int32_t numOfTimeout = taosArrayGetSize(pTimeoutList); ASSERT(pTask->status.downstreamReady == 0); + pInfo->timeoutStartTs = taosGetTimestampMs(); for (int32_t i = 0; i < numOfTimeout; ++i) { int32_t taskId = *(int32_t*)taosArrayGet(pTimeoutList, i); @@ -488,7 +492,7 @@ void handleTimeoutDownstreamTasks(SStreamTask* pTask, SArray* pTimeoutList) { stDebug("s-task:%s vgId:%d %d downstream task(s) all add into nodeUpate list", id, vgId, numOfTimeout); } else { stDebug("s-task:%s vgId:%d %d downstream task(s) timeout, send check msg again, retry:%d start time:%" PRId64, id, - vgId, numOfTimeout, pInfo->timeoutRetryCount, pInfo->startTs); + vgId, numOfTimeout, pInfo->timeoutRetryCount, pInfo->timeoutStartTs); } } @@ -517,6 +521,30 @@ void handleNotReadyDownstreamTask(SStreamTask* pTask, SArray* pNotReadyList) { vgId, numOfNotReady, pInfo->notReadyRetryCount, pInfo->startTs); } +// the action of add status may incur the restart procedure, which should NEVER be executed in the timer thread. +// The restart of all tasks requires that all tasks should not have active timer for now. Therefore, the execution +// of restart in timer thread will result in a dead lock. +static int32_t addDownstreamFailedStatusResultAsync(SMsgCb* pMsgCb, int32_t vgId, int64_t streamId, int32_t taskId) { + SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq)); + if (pRunReq == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + stError("vgId:%d failed to create msg to stop tasks async, code:%s", vgId, terrstr()); + return -1; + } + + stDebug("vgId:%d create msg add failed s-task:0x%x", vgId, taskId); + + pRunReq->head.vgId = vgId; + pRunReq->streamId = streamId; + pRunReq->taskId = taskId; + pRunReq->reqType = STREAM_EXEC_T_ADD_FAILED_TASK; + + SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)}; + tmsgPutToQueue(pMsgCb, STREAM_QUEUE, &msg); + return 0; +} + +// this function is executed in timer thread void rspMonitorFn(void* param, void* tmrId) { SStreamTask* pTask = param; SStreamMeta* pMeta = pTask->pMeta; @@ -524,7 +552,7 @@ void rspMonitorFn(void* param, void* tmrId) { STaskCheckInfo* pInfo = &pTask->taskCheckInfo; int32_t vgId = pTask->pMeta->vgId; int64_t now = taosGetTimestampMs(); - int64_t el = now - pInfo->startTs; + int64_t timeoutDuration = now - pInfo->timeoutStartTs; ETaskStatus state = pStat->state; const char* id = pTask->id.idStr; int32_t numOfReady = 0; @@ -541,12 +569,7 @@ void rspMonitorFn(void* param, void* tmrId) { stDebug("s-task:%s status:%s vgId:%d quit from monitor check-rsp tmr, ref:%d", id, pStat->name, vgId, ref); streamTaskCompleteCheckRsp(pInfo, true, id); - - streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, pInfo->startTs, now, false); - if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { - STaskId* pHId = &pTask->hTaskInfo.id; - streamMetaAddTaskLaunchResult(pTask->pMeta, pHId->streamId, pHId->taskId, pInfo->startTs, now, false); - } + addDownstreamFailedStatusResultAsync(pTask->pMsgCb, vgId, pTask->id.streamId, pTask->id.taskId); streamMetaReleaseTask(pMeta, pTask); return; @@ -577,7 +600,7 @@ void rspMonitorFn(void* param, void* tmrId) { SArray* pTimeoutList = taosArrayInit(4, sizeof(int64_t)); if (pStat->state == TASK_STATUS__UNINIT) { - getCheckRspStatus(pInfo, el, &numOfReady, &numOfFault, &numOfNotRsp, pTimeoutList, pNotReadyList, id); + getCheckRspStatus(pInfo, timeoutDuration, &numOfReady, &numOfFault, &numOfNotRsp, pTimeoutList, pNotReadyList, id); } else { // unexpected status stError("s-task:%s unexpected task status:%s during waiting for check rsp", id, pStat->name); } @@ -614,13 +637,7 @@ void rspMonitorFn(void* param, void* tmrId) { streamTaskCompleteCheckRsp(pInfo, false, id); taosThreadMutexUnlock(&pInfo->checkInfoLock); - // add the not-ready tasks into the final task status result buf, along with related fill-history task if exists. - streamMetaAddTaskLaunchResult(pMeta, pTask->id.streamId, pTask->id.taskId, pInfo->startTs, now, false); - if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { - STaskId* pHId = &pTask->hTaskInfo.id; - streamMetaAddTaskLaunchResult(pMeta, pHId->streamId, pHId->taskId, pInfo->startTs, now, false); - } - + addDownstreamFailedStatusResultAsync(pTask->pMsgCb, vgId, pTask->id.streamId, pTask->id.taskId); streamMetaReleaseTask(pMeta, pTask); taosArrayDestroy(pNotReadyList); @@ -639,8 +656,10 @@ void rspMonitorFn(void* param, void* tmrId) { taosTmrReset(rspMonitorFn, CHECK_RSP_CHECK_INTERVAL, pTask, streamTimer, &pInfo->checkRspTmr); taosThreadMutexUnlock(&pInfo->checkInfoLock); - stDebug("s-task:%s continue checking rsp in 300ms, total:%d, notRsp:%d, notReady:%d, fault:%d, timeout:%d, ready:%d", - id, total, numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady); + stDebug( + "s-task:%s vgId:%d continue checking rsp in 300ms, total:%d, notRsp:%d, notReady:%d, fault:%d, timeout:%d, " + "ready:%d", + id, vgId, total, numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady); taosArrayDestroy(pNotReadyList); taosArrayDestroy(pTimeoutList); diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index e6d7c2fde8..3428fc36e1 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -26,6 +26,9 @@ typedef struct { SStreamTask* pTask; } SAsyncUploadArg; +static int32_t downloadCheckpointDataByName(const char* id, const char* fname, const char* dstName); +static int32_t deleteCheckpointFile(char* id, char* name); + int32_t tEncodeStreamCheckpointSourceReq(SEncoder* pEncoder, const SStreamCheckpointSourceReq* pReq) { if (tStartEncode(pEncoder) < 0) return -1; if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1; @@ -376,21 +379,23 @@ int32_t streamSaveTaskCheckpointInfo(SStreamTask* p, int64_t checkpointId) { return code; } -void streamTaskSetCheckpointFailedId(SStreamTask* pTask) { +void streamTaskSetFailedCheckpointId(SStreamTask* pTask) { pTask->chkInfo.failedId = pTask->chkInfo.checkpointingId; stDebug("s-task:%s mark the checkpointId:%" PRId64 " (transId:%d) failed", pTask->id.idStr, pTask->chkInfo.checkpointingId, pTask->chkInfo.transId); } -int32_t getChkpMeta(char* id, char* path, SArray* list) { +static int32_t getCheckpointDataMeta(const char* id, const char* path, SArray* list) { char* file = taosMemoryCalloc(1, strlen(path) + 32); sprintf(file, "%s%s%s", path, TD_DIRSEP, "META_TMP"); - int32_t code = downloadCheckpointByName(id, "META", file); + + int32_t code = downloadCheckpointDataByName(id, "META", file); if (code != 0) { stDebug("chkp failed to download meta file:%s", file); taosMemoryFree(file); return code; } + TdFilePtr pFile = taosOpenFile(file, TD_FILE_READ); char buf[128] = {0}; if (taosReadFile(pFile, buf, sizeof(buf)) <= 0) { @@ -427,7 +432,7 @@ int32_t uploadCheckpointData(void* param) { stError("s-task:%s failed to gen upload checkpoint:%" PRId64 "", arg->pTask->id.idStr, arg->chkpId); } if (arg->type == DATA_UPLOAD_S3) { - if (code == 0 && (code = getChkpMeta(arg->taskId, path, toDelFiles)) != 0) { + if (code == 0 && (code = getCheckpointDataMeta(arg->taskId, path, toDelFiles)) != 0) { stError("s-task:%s failed to get checkpoint:%" PRId64 " meta", arg->pTask->id.idStr, arg->chkpId); } } @@ -457,8 +462,7 @@ int32_t uploadCheckpointData(void* param) { return code; } -int32_t streamTaskUploadChkp(SStreamTask* pTask, int64_t chkpId, char* taskId) { - // async upload +int32_t streamTaskRemoteBackupCheckpoint(SStreamTask* pTask, int64_t chkpId, char* taskId) { ECHECKPOINT_BACKUP_TYPE type = streamGetCheckpointBackupType(); if (type == DATA_UPLOAD_DISABLE) { return 0; @@ -514,7 +518,7 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) { if (code == TSDB_CODE_SUCCESS) { code = streamSaveTaskCheckpointInfo(pTask, ckId); if (code == TSDB_CODE_SUCCESS) { - code = streamTaskUploadChkp(pTask, ckId, (char*)id); + code = streamTaskRemoteBackupCheckpoint(pTask, ckId, (char*)id); if (code != TSDB_CODE_SUCCESS) { stError("s-task:%s failed to upload checkpoint:%" PRId64 " failed", id, ckId); } @@ -546,7 +550,7 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) { code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_CHECKPOINT_DONE); taosThreadMutexUnlock(&pTask->lock); - streamTaskSetCheckpointFailedId(pTask); + streamTaskSetFailedCheckpointId(pTask); stDebug("s-task:%s clear checkpoint flag since gen checkpoint failed, checkpointId:%" PRId64, id, ckId); } @@ -585,12 +589,12 @@ static int32_t uploadCheckpointToS3(char* id, char* path) { stDebug("[s3] upload checkpoint:%s", filename); // break; } - taosCloseDir(&pDir); + taosCloseDir(&pDir); return 0; } -static int32_t downloadCheckpointByNameS3(char* id, char* fname, char* dstName) { +static int32_t downloadCheckpointByNameS3(const char* id, const char* fname, const char* dstName) { int32_t code = 0; char* buf = taosMemoryCalloc(1, strlen(id) + strlen(dstName) + 4); sprintf(buf, "%s/%s", id, fname); @@ -625,16 +629,18 @@ int32_t streamTaskBackupCheckpoint(char* id, char* path) { } // fileName: CURRENT -int32_t downloadCheckpointByName(char* id, char* fname, char* dstName) { +int32_t downloadCheckpointDataByName(const char* id, const char* fname, const char* dstName) { if (id == NULL || fname == NULL || strlen(id) == 0 || strlen(fname) == 0 || strlen(fname) >= PATH_MAX) { stError("uploadCheckpointByName parameters invalid"); return -1; } + if (strlen(tsSnodeAddress) != 0) { return 0; } else if (tsS3StreamEnabled) { return downloadCheckpointByNameS3(id, fname, dstName); } + return 0; } @@ -643,11 +649,13 @@ int32_t downloadCheckpoint(char* id, char* path) { stError("downloadCheckpoint parameters invalid"); return -1; } + if (strlen(tsSnodeAddress) != 0) { return downloadRsync(id, path); } else if (tsS3StreamEnabled) { return s3GetObjectsByPrefix(id, path); } + return 0; } diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 891e0aa142..250866005e 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -541,7 +541,7 @@ static void setLastExecTs(SStreamTask* pTask, int64_t ts) { pTask->status.lastEx * todo: the batch of blocks should be tuned dynamic, according to the total elapsed time of each batch of blocks, the * appropriate batch of blocks should be handled in 5 to 10 sec. */ -int32_t doStreamExecTask(SStreamTask* pTask) { +static int32_t doStreamExecTask(SStreamTask* pTask) { const char* id = pTask->id.idStr; // merge multiple input data if possible in the input queue. diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index a464594233..edc1a148a9 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -1399,7 +1399,7 @@ SArray* streamMetaSendMsgBeforeCloseTasks(SStreamMeta* pMeta) { SStreamTaskState* pState = streamTaskGetStatus(pTask); if (pState->state == TASK_STATUS__CK) { - streamTaskSetCheckpointFailedId(pTask); + streamTaskSetFailedCheckpointId(pTask); } else { stDebug("s-task:%s status:%s not reset the checkpoint", pTask->id.idStr, pState->name); } @@ -1706,4 +1706,39 @@ int32_t streamMetaResetTaskStatus(SStreamMeta* pMeta) { } return 0; +} + +int32_t streamMetaAddFailedTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) { + int32_t code = TSDB_CODE_SUCCESS; + int64_t now = taosGetTimestampMs(); + int64_t startTs = 0; + bool hasFillhistoryTask = false; + STaskId hId = {0}; + + stDebug("vgId:%d add failed task:0x%x", pMeta->vgId, taskId); + + streamMetaRLock(pMeta); + + STaskId id = {.streamId = streamId, .taskId = taskId}; + SStreamTask** ppTask = taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); + + if (ppTask != NULL) { + startTs = (*ppTask)->taskCheckInfo.startTs; + hasFillhistoryTask = HAS_RELATED_FILLHISTORY_TASK(*ppTask); + hId = (*ppTask)->hTaskInfo.id; + + streamMetaRUnLock(pMeta); + + // add the failed task info, along with the related fill-history task info into tasks list. + streamMetaAddTaskLaunchResult(pMeta, streamId, taskId, startTs, now, false); + if (hasFillhistoryTask) { + streamMetaAddTaskLaunchResult(pMeta, hId.streamId, hId.taskId, startTs, now, false); + } + } else { + stError("failed to locate the stream task:0x%" PRIx64 "-0x%x (vgId:%d), it may have been destroyed or stopped", + streamId, taskId, pMeta->vgId); + code = TSDB_CODE_STREAM_TASK_NOT_EXIST; + } + + return code; } \ No newline at end of file diff --git a/source/libs/stream/src/streamStartHistory.c b/source/libs/stream/src/streamStartHistory.c index c76536aedf..b3df5755ea 100644 --- a/source/libs/stream/src/streamStartHistory.c +++ b/source/libs/stream/src/streamStartHistory.c @@ -193,7 +193,7 @@ int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_ taosThreadMutexLock(&pTask->lock); ETaskStatus status = streamTaskGetStatus(pTask)->state; if (status == TASK_STATUS__CK) { - streamTaskSetCheckpointFailedId(pTask); + streamTaskSetFailedCheckpointId(pTask); } taosThreadMutexUnlock(&pTask->lock); } @@ -203,7 +203,7 @@ int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_ taosThreadMutexLock(&pTask->lock); ETaskStatus status = streamTaskGetStatus(pTask)->state; if (status == TASK_STATUS__CK) { - streamTaskSetCheckpointFailedId(pTask); + streamTaskSetFailedCheckpointId(pTask); } taosThreadMutexUnlock(&pTask->lock); diff --git a/tests/system-test/0-others/information_schema.py b/tests/system-test/0-others/information_schema.py index 56ef8c6b47..9a112c669e 100644 --- a/tests/system-test/0-others/information_schema.py +++ b/tests/system-test/0-others/information_schema.py @@ -222,7 +222,7 @@ class TDTestCase: tdSql.query("select * from information_schema.ins_columns where db_name ='information_schema'") tdLog.info(len(tdSql.queryResult)) - tdSql.checkEqual(True, len(tdSql.queryResult) in range(251, 252)) + tdSql.checkEqual(True, len(tdSql.queryResult) in range(253, 254)) tdSql.query("select * from information_schema.ins_columns where db_name ='performance_schema'") tdSql.checkEqual(54, len(tdSql.queryResult))