Merge pull request #25538 from taosdata/fix/3_liaohj
refactor: do some internal refactor.
This commit is contained in:
commit
f115a33a94
|
@ -45,7 +45,7 @@ int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t size,
|
||||||
int32_t s3GetObjectsByPrefix(const char *prefix, const char *path);
|
int32_t s3GetObjectsByPrefix(const char *prefix, const char *path);
|
||||||
void s3EvictCache(const char *path, long object_size);
|
void s3EvictCache(const char *path, long object_size);
|
||||||
long s3Size(const char *object_name);
|
long s3Size(const char *object_name);
|
||||||
int32_t s3GetObjectToFile(const char *object_name, char *fileName);
|
int32_t s3GetObjectToFile(const char *object_name, const char *fileName);
|
||||||
|
|
||||||
#define S3_DATA_CHUNK_PAGES (256 * 1024 * 1024)
|
#define S3_DATA_CHUNK_PAGES (256 * 1024 * 1024)
|
||||||
|
|
||||||
|
|
|
@ -56,6 +56,7 @@ extern "C" {
|
||||||
#define STREAM_EXEC_T_RESTART_ALL_TASKS (-4)
|
#define STREAM_EXEC_T_RESTART_ALL_TASKS (-4)
|
||||||
#define STREAM_EXEC_T_STOP_ALL_TASKS (-5)
|
#define STREAM_EXEC_T_STOP_ALL_TASKS (-5)
|
||||||
#define STREAM_EXEC_T_RESUME_TASK (-6)
|
#define STREAM_EXEC_T_RESUME_TASK (-6)
|
||||||
|
#define STREAM_EXEC_T_ADD_FAILED_TASK (-7)
|
||||||
|
|
||||||
typedef struct SStreamTask SStreamTask;
|
typedef struct SStreamTask SStreamTask;
|
||||||
typedef struct SStreamQueue SStreamQueue;
|
typedef struct SStreamQueue SStreamQueue;
|
||||||
|
@ -443,6 +444,7 @@ typedef struct SDownstreamStatusInfo {
|
||||||
typedef struct STaskCheckInfo {
|
typedef struct STaskCheckInfo {
|
||||||
SArray* pList;
|
SArray* pList;
|
||||||
int64_t startTs;
|
int64_t startTs;
|
||||||
|
int64_t timeoutStartTs;
|
||||||
int32_t notReadyTasks;
|
int32_t notReadyTasks;
|
||||||
int32_t inCheckProcess;
|
int32_t inCheckProcess;
|
||||||
int32_t stopCheckProcess;
|
int32_t stopCheckProcess;
|
||||||
|
@ -547,7 +549,7 @@ typedef struct SStreamMeta {
|
||||||
SArray* chkpSaved;
|
SArray* chkpSaved;
|
||||||
SArray* chkpInUse;
|
SArray* chkpInUse;
|
||||||
SRWLatch chkpDirLock;
|
SRWLatch chkpDirLock;
|
||||||
void* qHandle;
|
void* qHandle; // todo remove it
|
||||||
void* bkdChkptMgt;
|
void* bkdChkptMgt;
|
||||||
} SStreamMeta;
|
} SStreamMeta;
|
||||||
|
|
||||||
|
@ -885,6 +887,7 @@ bool streamMetaTaskInTimer(SStreamMeta* pMeta);
|
||||||
int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, int64_t startTs,
|
int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, int64_t startTs,
|
||||||
int64_t endTs, bool ready);
|
int64_t endTs, bool ready);
|
||||||
int32_t streamMetaResetTaskStatus(SStreamMeta* pMeta);
|
int32_t streamMetaResetTaskStatus(SStreamMeta* pMeta);
|
||||||
|
int32_t streamMetaAddFailedTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId);
|
||||||
|
|
||||||
void streamMetaRLock(SStreamMeta* pMeta);
|
void streamMetaRLock(SStreamMeta* pMeta);
|
||||||
void streamMetaRUnLock(SStreamMeta* pMeta);
|
void streamMetaRUnLock(SStreamMeta* pMeta);
|
||||||
|
|
|
@ -1196,7 +1196,7 @@ static S3Status getObjectCallback(int bufferSize, const char *buffer, void *call
|
||||||
return ((wrote < (size_t)bufferSize) ? S3StatusAbortedByCallback : S3StatusOK);
|
return ((wrote < (size_t)bufferSize) ? S3StatusAbortedByCallback : S3StatusOK);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t s3GetObjectToFile(const char *object_name, char *fileName) {
|
int32_t s3GetObjectToFile(const char *object_name, const char *fileName) {
|
||||||
int64_t ifModifiedSince = -1, ifNotModifiedSince = -1;
|
int64_t ifModifiedSince = -1, ifNotModifiedSince = -1;
|
||||||
const char *ifMatch = 0, *ifNotMatch = 0;
|
const char *ifMatch = 0, *ifNotMatch = 0;
|
||||||
|
|
||||||
|
@ -1733,6 +1733,6 @@ int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t size,
|
||||||
void s3EvictCache(const char *path, long object_size) {}
|
void s3EvictCache(const char *path, long object_size) {}
|
||||||
long s3Size(const char *object_name) { return 0; }
|
long s3Size(const char *object_name) { return 0; }
|
||||||
int32_t s3GetObjectsByPrefix(const char *prefix, const char *path) { return 0; }
|
int32_t s3GetObjectsByPrefix(const char *prefix, const char *path) { return 0; }
|
||||||
int32_t s3GetObjectToFile(const char *object_name, char *fileName) { return 0; }
|
int32_t s3GetObjectToFile(const char *object_name, const char *fileName) { return 0; }
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -173,6 +173,7 @@ static const SSysDbTableSchema streamSchema[] = {
|
||||||
{.name = "watermark", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false},
|
{.name = "watermark", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false},
|
||||||
{.name = "trigger", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
{.name = "trigger", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
||||||
{.name = "sink_quota", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
{.name = "sink_quota", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
||||||
|
{.name = "checkpoint_backup", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
||||||
{.name = "history_scan_idle", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
{.name = "history_scan_idle", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -193,6 +194,7 @@ static const SSysDbTableSchema streamTaskSchema[] = {
|
||||||
{.name = "checkpoint_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false},
|
{.name = "checkpoint_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false},
|
||||||
{.name = "checkpoint_id", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false},
|
{.name = "checkpoint_id", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false},
|
||||||
{.name = "checkpoint_version", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false},
|
{.name = "checkpoint_version", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false},
|
||||||
|
{.name = "checkpoint_backup", .bytes = 15, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
||||||
{.name = "ds_err_info", .bytes = 25, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
{.name = "ds_err_info", .bytes = 25, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
||||||
{.name = "history_task_id", .bytes = 16 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
{.name = "history_task_id", .bytes = 16 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
||||||
{.name = "history_task_status", .bytes = 12 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
{.name = "history_task_status", .bytes = 12 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
||||||
|
|
|
@ -1495,6 +1495,13 @@ static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
|
||||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
colDataSetVal(pColInfo, numOfRows, (const char *)dstStr, false);
|
colDataSetVal(pColInfo, numOfRows, (const char *)dstStr, false);
|
||||||
|
|
||||||
|
// checkpoint backup type
|
||||||
|
char backup[20 + VARSTR_HEADER_SIZE] = {0};
|
||||||
|
STR_TO_VARSTR(backup, "none")
|
||||||
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
|
colDataSetVal(pColInfo, numOfRows, (const char *)backup, false);
|
||||||
|
|
||||||
|
// history scan idle
|
||||||
char scanHistoryIdle[20 + VARSTR_HEADER_SIZE] = {0};
|
char scanHistoryIdle[20 + VARSTR_HEADER_SIZE] = {0};
|
||||||
strcpy(scanHistoryIdle, "100a");
|
strcpy(scanHistoryIdle, "100a");
|
||||||
|
|
||||||
|
@ -1644,10 +1651,14 @@ static int32_t setTaskAttrInResBlock(SStreamObj *pStream, SStreamTask *pTask, SS
|
||||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
colDataSetVal(pColInfo, numOfRows, (const char*)&pe->checkpointInfo.latestId, false);
|
colDataSetVal(pColInfo, numOfRows, (const char*)&pe->checkpointInfo.latestId, false);
|
||||||
|
|
||||||
// checkpoint info
|
// checkpoint version
|
||||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
colDataSetVal(pColInfo, numOfRows, (const char*)&pe->checkpointInfo.latestVer, false);
|
colDataSetVal(pColInfo, numOfRows, (const char*)&pe->checkpointInfo.latestVer, false);
|
||||||
|
|
||||||
|
// checkpoint backup status
|
||||||
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
|
colDataSetVal(pColInfo, numOfRows, 0, true);
|
||||||
|
|
||||||
// ds_err_info
|
// ds_err_info
|
||||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
colDataSetVal(pColInfo, numOfRows, 0, true);
|
colDataSetVal(pColInfo, numOfRows, 0, true);
|
||||||
|
|
|
@ -23,6 +23,10 @@ typedef struct STaskUpdateEntry {
|
||||||
int32_t transId;
|
int32_t transId;
|
||||||
} STaskUpdateEntry;
|
} STaskUpdateEntry;
|
||||||
|
|
||||||
|
typedef struct SMStreamCheckpointReadyRspMsg {
|
||||||
|
SMsgHead head;
|
||||||
|
} SMStreamCheckpointReadyRspMsg;
|
||||||
|
|
||||||
static STaskId replaceStreamTaskId(SStreamTask* pTask) {
|
static STaskId replaceStreamTaskId(SStreamTask* pTask) {
|
||||||
ASSERT(pTask->info.fillHistory);
|
ASSERT(pTask->info.fillHistory);
|
||||||
STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
|
STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
|
||||||
|
@ -518,63 +522,15 @@ int32_t tqStreamTaskProcessCheckRsp(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLe
|
||||||
tqDebug("tq task:0x%x (vgId:%d) recv check rsp(reqId:0x%" PRIx64 ") from 0x%x (vgId:%d) status %d",
|
tqDebug("tq task:0x%x (vgId:%d) recv check rsp(reqId:0x%" PRIx64 ") from 0x%x (vgId:%d) status %d",
|
||||||
rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.status);
|
rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.status);
|
||||||
|
|
||||||
int64_t initTs = 0;
|
|
||||||
int64_t now = taosGetTimestampMs();
|
|
||||||
STaskId id = {.streamId = rsp.streamId, .taskId = rsp.upstreamTaskId};
|
|
||||||
STaskId fId = {0};
|
|
||||||
bool hasHistoryTask = false;
|
|
||||||
|
|
||||||
// todo extract method
|
|
||||||
if (!isLeader) {
|
if (!isLeader) {
|
||||||
// this task may have been stopped, so acquire task may failed. Retrieve it directly from the task hash map.
|
|
||||||
streamMetaRLock(pMeta);
|
|
||||||
|
|
||||||
SStreamTask** ppTask = taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
|
|
||||||
if (ppTask != NULL) {
|
|
||||||
setParam(*ppTask, &initTs, &hasHistoryTask, &fId);
|
|
||||||
streamMetaRUnLock(pMeta);
|
|
||||||
|
|
||||||
if (hasHistoryTask) {
|
|
||||||
streamMetaAddTaskLaunchResult(pMeta, fId.streamId, fId.taskId, initTs, now, false);
|
|
||||||
}
|
|
||||||
|
|
||||||
tqError("vgId:%d not leader, task:0x%x not handle the check rsp, downstream:0x%x (vgId:%d)", vgId,
|
tqError("vgId:%d not leader, task:0x%x not handle the check rsp, downstream:0x%x (vgId:%d)", vgId,
|
||||||
rsp.upstreamTaskId, rsp.downstreamTaskId, rsp.downstreamNodeId);
|
rsp.upstreamTaskId, rsp.downstreamTaskId, rsp.downstreamNodeId);
|
||||||
} else {
|
return streamMetaAddFailedTask(pMeta, rsp.streamId, rsp.upstreamTaskId);
|
||||||
streamMetaRUnLock(pMeta);
|
|
||||||
|
|
||||||
tqError("tq failed to locate the stream task:0x%" PRIx64 "-0x%x (vgId:%d), it may have been destroyed or stopped",
|
|
||||||
rsp.streamId, rsp.upstreamTaskId, vgId);
|
|
||||||
code = terrno = TSDB_CODE_STREAM_TASK_NOT_EXIST;
|
|
||||||
}
|
|
||||||
|
|
||||||
streamMetaAddTaskLaunchResult(pMeta, rsp.streamId, rsp.upstreamTaskId, initTs, now, false);
|
|
||||||
return code;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
SStreamTask* pTask = streamMetaAcquireTask(pMeta, rsp.streamId, rsp.upstreamTaskId);
|
SStreamTask* pTask = streamMetaAcquireTask(pMeta, rsp.streamId, rsp.upstreamTaskId);
|
||||||
if (pTask == NULL) {
|
if (pTask == NULL) {
|
||||||
streamMetaRLock(pMeta);
|
return streamMetaAddFailedTask(pMeta, rsp.streamId, rsp.upstreamTaskId);
|
||||||
|
|
||||||
// let's try to find this task in hashmap
|
|
||||||
SStreamTask** ppTask = taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
|
|
||||||
if (ppTask != NULL) {
|
|
||||||
setParam(*ppTask, &initTs, &hasHistoryTask, &fId);
|
|
||||||
streamMetaRUnLock(pMeta);
|
|
||||||
|
|
||||||
if (hasHistoryTask) {
|
|
||||||
streamMetaAddTaskLaunchResult(pMeta, fId.streamId, fId.taskId, initTs, now, false);
|
|
||||||
}
|
|
||||||
} else { // not exist even in the hash map of meta, forget it
|
|
||||||
streamMetaRUnLock(pMeta);
|
|
||||||
}
|
|
||||||
|
|
||||||
streamMetaAddTaskLaunchResult(pMeta, rsp.streamId, rsp.upstreamTaskId, initTs, now, false);
|
|
||||||
tqError("tq failed to locate the stream task:0x%" PRIx64 "-0x%x (vgId:%d), it may have been destroyed or stopped",
|
|
||||||
rsp.streamId, rsp.upstreamTaskId, vgId);
|
|
||||||
|
|
||||||
code = terrno = TSDB_CODE_STREAM_TASK_NOT_EXIST;
|
|
||||||
return code;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
code = streamProcessCheckRsp(pTask, &rsp);
|
code = streamProcessCheckRsp(pTask, &rsp);
|
||||||
|
@ -582,10 +538,6 @@ int32_t tqStreamTaskProcessCheckRsp(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLe
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
typedef struct SMStreamCheckpointReadyRspMsg {
|
|
||||||
SMsgHead head;
|
|
||||||
} SMStreamCheckpointReadyRspMsg;
|
|
||||||
|
|
||||||
int32_t tqStreamTaskProcessCheckpointReadyMsg(SStreamMeta* pMeta, SRpcMsg* pMsg) {
|
int32_t tqStreamTaskProcessCheckpointReadyMsg(SStreamMeta* pMeta, SRpcMsg* pMsg) {
|
||||||
int32_t vgId = pMeta->vgId;
|
int32_t vgId = pMeta->vgId;
|
||||||
char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
|
char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
|
||||||
|
@ -868,6 +820,9 @@ int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLead
|
||||||
} else if (type == STREAM_EXEC_T_STOP_ALL_TASKS) {
|
} else if (type == STREAM_EXEC_T_STOP_ALL_TASKS) {
|
||||||
streamMetaStopAllTasks(pMeta);
|
streamMetaStopAllTasks(pMeta);
|
||||||
return 0;
|
return 0;
|
||||||
|
} else if (type == STREAM_EXEC_T_ADD_FAILED_TASK) {
|
||||||
|
int32_t code = streamMetaAddFailedTask(pMeta, pReq->streamId, pReq->taskId);
|
||||||
|
return code;
|
||||||
} else if (type == STREAM_EXEC_T_RESUME_TASK) { // task resume to run after idle for a while
|
} else if (type == STREAM_EXEC_T_RESUME_TASK) { // task resume to run after idle for a while
|
||||||
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId);
|
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId);
|
||||||
|
|
||||||
|
|
|
@ -122,7 +122,7 @@ int32_t streamSendCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pReq,
|
||||||
int32_t streamAddCheckpointReadyMsg(SStreamTask* pTask, int32_t srcTaskId, int32_t index, int64_t checkpointId);
|
int32_t streamAddCheckpointReadyMsg(SStreamTask* pTask, int32_t srcTaskId, int32_t index, int64_t checkpointId);
|
||||||
int32_t streamTaskSendCheckpointReadyMsg(SStreamTask* pTask);
|
int32_t streamTaskSendCheckpointReadyMsg(SStreamTask* pTask);
|
||||||
int32_t streamTaskSendCheckpointSourceRsp(SStreamTask* pTask);
|
int32_t streamTaskSendCheckpointSourceRsp(SStreamTask* pTask);
|
||||||
void streamTaskSetCheckpointFailedId(SStreamTask* pTask);
|
void streamTaskSetFailedCheckpointId(SStreamTask* pTask);
|
||||||
int32_t streamTaskGetNumOfDownstream(const SStreamTask* pTask);
|
int32_t streamTaskGetNumOfDownstream(const SStreamTask* pTask);
|
||||||
int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t numRate, float quotaRate, const char*);
|
int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t numRate, float quotaRate, const char*);
|
||||||
STaskId streamTaskGetTaskId(const SStreamTask* pTask);
|
STaskId streamTaskGetTaskId(const SStreamTask* pTask);
|
||||||
|
@ -160,8 +160,6 @@ ECHECKPOINT_BACKUP_TYPE streamGetCheckpointBackupType();
|
||||||
int32_t streamTaskBackupCheckpoint(char* id, char* path);
|
int32_t streamTaskBackupCheckpoint(char* id, char* path);
|
||||||
int32_t downloadCheckpoint(char* id, char* path);
|
int32_t downloadCheckpoint(char* id, char* path);
|
||||||
int32_t deleteCheckpoint(char* id);
|
int32_t deleteCheckpoint(char* id);
|
||||||
int32_t deleteCheckpointFile(char* id, char* name);
|
|
||||||
int32_t downloadCheckpointByName(char* id, char* fname, char* dstName);
|
|
||||||
|
|
||||||
int32_t streamTaskOnNormalTaskReady(SStreamTask* pTask);
|
int32_t streamTaskOnNormalTaskReady(SStreamTask* pTask);
|
||||||
int32_t streamTaskOnScanhistoryTaskReady(SStreamTask* pTask);
|
int32_t streamTaskOnScanhistoryTaskReady(SStreamTask* pTask);
|
||||||
|
|
|
@ -175,7 +175,7 @@ int32_t streamTaskStartMonitorCheckRsp(SStreamTask* pTask) {
|
||||||
streamTaskInitTaskCheckInfo(pInfo, &pTask->outputInfo, taosGetTimestampMs());
|
streamTaskInitTaskCheckInfo(pInfo, &pTask->outputInfo, taosGetTimestampMs());
|
||||||
|
|
||||||
int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
|
int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
|
||||||
stDebug("s-task:%s start check rsp monit, ref:%d ", pTask->id.idStr, ref);
|
stDebug("s-task:%s start check-rsp monit, ref:%d ", pTask->id.idStr, ref);
|
||||||
|
|
||||||
if (pInfo->checkRspTmr == NULL) {
|
if (pInfo->checkRspTmr == NULL) {
|
||||||
pInfo->checkRspTmr = taosTmrStart(rspMonitorFn, CHECK_RSP_CHECK_INTERVAL, pTask, streamTimer);
|
pInfo->checkRspTmr = taosTmrStart(rspMonitorFn, CHECK_RSP_CHECK_INTERVAL, pTask, streamTimer);
|
||||||
|
@ -194,7 +194,7 @@ int32_t streamTaskStopMonitorCheckRsp(STaskCheckInfo* pInfo, const char* id) {
|
||||||
pInfo->stopCheckProcess = 1;
|
pInfo->stopCheckProcess = 1;
|
||||||
taosThreadMutexUnlock(&pInfo->checkInfoLock);
|
taosThreadMutexUnlock(&pInfo->checkInfoLock);
|
||||||
|
|
||||||
stDebug("s-task:%s set stop check rsp mon", id);
|
stDebug("s-task:%s set stop check-rsp monit", id);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -272,6 +272,8 @@ int32_t streamTaskInitTaskCheckInfo(STaskCheckInfo* pInfo, STaskOutputInfo* pOut
|
||||||
}
|
}
|
||||||
|
|
||||||
pInfo->startTs = startTs;
|
pInfo->startTs = startTs;
|
||||||
|
pInfo->timeoutStartTs = startTs;
|
||||||
|
pInfo->stopCheckProcess = 0;
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -293,7 +295,7 @@ int32_t streamTaskUpdateCheckInfo(STaskCheckInfo* pInfo, int32_t taskId, int32_t
|
||||||
SDownstreamStatusInfo* p = findCheckRspStatus(pInfo, taskId);
|
SDownstreamStatusInfo* p = findCheckRspStatus(pInfo, taskId);
|
||||||
if (p != NULL) {
|
if (p != NULL) {
|
||||||
if (reqId != p->reqId) {
|
if (reqId != p->reqId) {
|
||||||
stError("s-task:%s reqId:%" PRIx64 " expected:%" PRIx64 " expired check-rsp recv from downstream task:0x%x, discarded",
|
stError("s-task:%s reqId:0x%" PRIx64 " expected:0x%" PRIx64 " expired check-rsp recv from downstream task:0x%x, discarded",
|
||||||
id, reqId, p->reqId, taskId);
|
id, reqId, p->reqId, taskId);
|
||||||
taosThreadMutexUnlock(&pInfo->checkInfoLock);
|
taosThreadMutexUnlock(&pInfo->checkInfoLock);
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
|
@ -329,7 +331,7 @@ int32_t streamTaskStartCheckDownstream(STaskCheckInfo* pInfo, const char* id) {
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
|
|
||||||
stDebug("s-task:%s set the in-check-procedure flag", id);
|
stDebug("s-task:%s set the in check-rsp flag", id);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -343,9 +345,10 @@ int32_t streamTaskCompleteCheckRsp(STaskCheckInfo* pInfo, bool lock, const char*
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t el = (pInfo->startTs != 0) ? (taosGetTimestampMs() - pInfo->startTs) : 0;
|
int64_t el = (pInfo->startTs != 0) ? (taosGetTimestampMs() - pInfo->startTs) : 0;
|
||||||
stDebug("s-task:%s clear the in-check-procedure flag, not in-check-procedure elapsed time:%" PRId64 " ms", id, el);
|
stDebug("s-task:%s clear the in check-rsp flag, not in check-rsp anymore, elapsed time:%" PRId64 " ms", id, el);
|
||||||
|
|
||||||
pInfo->startTs = 0;
|
pInfo->startTs = 0;
|
||||||
|
pInfo->timeoutStartTs = 0;
|
||||||
pInfo->notReadyTasks = 0;
|
pInfo->notReadyTasks = 0;
|
||||||
pInfo->inCheckProcess = 0;
|
pInfo->inCheckProcess = 0;
|
||||||
pInfo->stopCheckProcess = 0;
|
pInfo->stopCheckProcess = 0;
|
||||||
|
@ -458,6 +461,7 @@ void handleTimeoutDownstreamTasks(SStreamTask* pTask, SArray* pTimeoutList) {
|
||||||
int32_t numOfTimeout = taosArrayGetSize(pTimeoutList);
|
int32_t numOfTimeout = taosArrayGetSize(pTimeoutList);
|
||||||
|
|
||||||
ASSERT(pTask->status.downstreamReady == 0);
|
ASSERT(pTask->status.downstreamReady == 0);
|
||||||
|
pInfo->timeoutStartTs = taosGetTimestampMs();
|
||||||
|
|
||||||
for (int32_t i = 0; i < numOfTimeout; ++i) {
|
for (int32_t i = 0; i < numOfTimeout; ++i) {
|
||||||
int32_t taskId = *(int32_t*)taosArrayGet(pTimeoutList, i);
|
int32_t taskId = *(int32_t*)taosArrayGet(pTimeoutList, i);
|
||||||
|
@ -488,7 +492,7 @@ void handleTimeoutDownstreamTasks(SStreamTask* pTask, SArray* pTimeoutList) {
|
||||||
stDebug("s-task:%s vgId:%d %d downstream task(s) all add into nodeUpate list", id, vgId, numOfTimeout);
|
stDebug("s-task:%s vgId:%d %d downstream task(s) all add into nodeUpate list", id, vgId, numOfTimeout);
|
||||||
} else {
|
} else {
|
||||||
stDebug("s-task:%s vgId:%d %d downstream task(s) timeout, send check msg again, retry:%d start time:%" PRId64, id,
|
stDebug("s-task:%s vgId:%d %d downstream task(s) timeout, send check msg again, retry:%d start time:%" PRId64, id,
|
||||||
vgId, numOfTimeout, pInfo->timeoutRetryCount, pInfo->startTs);
|
vgId, numOfTimeout, pInfo->timeoutRetryCount, pInfo->timeoutStartTs);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -517,6 +521,30 @@ void handleNotReadyDownstreamTask(SStreamTask* pTask, SArray* pNotReadyList) {
|
||||||
vgId, numOfNotReady, pInfo->notReadyRetryCount, pInfo->startTs);
|
vgId, numOfNotReady, pInfo->notReadyRetryCount, pInfo->startTs);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// the action of add status may incur the restart procedure, which should NEVER be executed in the timer thread.
|
||||||
|
// The restart of all tasks requires that all tasks should not have active timer for now. Therefore, the execution
|
||||||
|
// of restart in timer thread will result in a dead lock.
|
||||||
|
static int32_t addDownstreamFailedStatusResultAsync(SMsgCb* pMsgCb, int32_t vgId, int64_t streamId, int32_t taskId) {
|
||||||
|
SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq));
|
||||||
|
if (pRunReq == NULL) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
stError("vgId:%d failed to create msg to stop tasks async, code:%s", vgId, terrstr());
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
stDebug("vgId:%d create msg add failed s-task:0x%x", vgId, taskId);
|
||||||
|
|
||||||
|
pRunReq->head.vgId = vgId;
|
||||||
|
pRunReq->streamId = streamId;
|
||||||
|
pRunReq->taskId = taskId;
|
||||||
|
pRunReq->reqType = STREAM_EXEC_T_ADD_FAILED_TASK;
|
||||||
|
|
||||||
|
SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)};
|
||||||
|
tmsgPutToQueue(pMsgCb, STREAM_QUEUE, &msg);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
// this function is executed in timer thread
|
||||||
void rspMonitorFn(void* param, void* tmrId) {
|
void rspMonitorFn(void* param, void* tmrId) {
|
||||||
SStreamTask* pTask = param;
|
SStreamTask* pTask = param;
|
||||||
SStreamMeta* pMeta = pTask->pMeta;
|
SStreamMeta* pMeta = pTask->pMeta;
|
||||||
|
@ -524,7 +552,7 @@ void rspMonitorFn(void* param, void* tmrId) {
|
||||||
STaskCheckInfo* pInfo = &pTask->taskCheckInfo;
|
STaskCheckInfo* pInfo = &pTask->taskCheckInfo;
|
||||||
int32_t vgId = pTask->pMeta->vgId;
|
int32_t vgId = pTask->pMeta->vgId;
|
||||||
int64_t now = taosGetTimestampMs();
|
int64_t now = taosGetTimestampMs();
|
||||||
int64_t el = now - pInfo->startTs;
|
int64_t timeoutDuration = now - pInfo->timeoutStartTs;
|
||||||
ETaskStatus state = pStat->state;
|
ETaskStatus state = pStat->state;
|
||||||
const char* id = pTask->id.idStr;
|
const char* id = pTask->id.idStr;
|
||||||
int32_t numOfReady = 0;
|
int32_t numOfReady = 0;
|
||||||
|
@ -541,12 +569,7 @@ void rspMonitorFn(void* param, void* tmrId) {
|
||||||
stDebug("s-task:%s status:%s vgId:%d quit from monitor check-rsp tmr, ref:%d", id, pStat->name, vgId, ref);
|
stDebug("s-task:%s status:%s vgId:%d quit from monitor check-rsp tmr, ref:%d", id, pStat->name, vgId, ref);
|
||||||
|
|
||||||
streamTaskCompleteCheckRsp(pInfo, true, id);
|
streamTaskCompleteCheckRsp(pInfo, true, id);
|
||||||
|
addDownstreamFailedStatusResultAsync(pTask->pMsgCb, vgId, pTask->id.streamId, pTask->id.taskId);
|
||||||
streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, pInfo->startTs, now, false);
|
|
||||||
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
|
|
||||||
STaskId* pHId = &pTask->hTaskInfo.id;
|
|
||||||
streamMetaAddTaskLaunchResult(pTask->pMeta, pHId->streamId, pHId->taskId, pInfo->startTs, now, false);
|
|
||||||
}
|
|
||||||
|
|
||||||
streamMetaReleaseTask(pMeta, pTask);
|
streamMetaReleaseTask(pMeta, pTask);
|
||||||
return;
|
return;
|
||||||
|
@ -577,7 +600,7 @@ void rspMonitorFn(void* param, void* tmrId) {
|
||||||
SArray* pTimeoutList = taosArrayInit(4, sizeof(int64_t));
|
SArray* pTimeoutList = taosArrayInit(4, sizeof(int64_t));
|
||||||
|
|
||||||
if (pStat->state == TASK_STATUS__UNINIT) {
|
if (pStat->state == TASK_STATUS__UNINIT) {
|
||||||
getCheckRspStatus(pInfo, el, &numOfReady, &numOfFault, &numOfNotRsp, pTimeoutList, pNotReadyList, id);
|
getCheckRspStatus(pInfo, timeoutDuration, &numOfReady, &numOfFault, &numOfNotRsp, pTimeoutList, pNotReadyList, id);
|
||||||
} else { // unexpected status
|
} else { // unexpected status
|
||||||
stError("s-task:%s unexpected task status:%s during waiting for check rsp", id, pStat->name);
|
stError("s-task:%s unexpected task status:%s during waiting for check rsp", id, pStat->name);
|
||||||
}
|
}
|
||||||
|
@ -614,13 +637,7 @@ void rspMonitorFn(void* param, void* tmrId) {
|
||||||
streamTaskCompleteCheckRsp(pInfo, false, id);
|
streamTaskCompleteCheckRsp(pInfo, false, id);
|
||||||
taosThreadMutexUnlock(&pInfo->checkInfoLock);
|
taosThreadMutexUnlock(&pInfo->checkInfoLock);
|
||||||
|
|
||||||
// add the not-ready tasks into the final task status result buf, along with related fill-history task if exists.
|
addDownstreamFailedStatusResultAsync(pTask->pMsgCb, vgId, pTask->id.streamId, pTask->id.taskId);
|
||||||
streamMetaAddTaskLaunchResult(pMeta, pTask->id.streamId, pTask->id.taskId, pInfo->startTs, now, false);
|
|
||||||
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
|
|
||||||
STaskId* pHId = &pTask->hTaskInfo.id;
|
|
||||||
streamMetaAddTaskLaunchResult(pMeta, pHId->streamId, pHId->taskId, pInfo->startTs, now, false);
|
|
||||||
}
|
|
||||||
|
|
||||||
streamMetaReleaseTask(pMeta, pTask);
|
streamMetaReleaseTask(pMeta, pTask);
|
||||||
|
|
||||||
taosArrayDestroy(pNotReadyList);
|
taosArrayDestroy(pNotReadyList);
|
||||||
|
@ -639,8 +656,10 @@ void rspMonitorFn(void* param, void* tmrId) {
|
||||||
taosTmrReset(rspMonitorFn, CHECK_RSP_CHECK_INTERVAL, pTask, streamTimer, &pInfo->checkRspTmr);
|
taosTmrReset(rspMonitorFn, CHECK_RSP_CHECK_INTERVAL, pTask, streamTimer, &pInfo->checkRspTmr);
|
||||||
taosThreadMutexUnlock(&pInfo->checkInfoLock);
|
taosThreadMutexUnlock(&pInfo->checkInfoLock);
|
||||||
|
|
||||||
stDebug("s-task:%s continue checking rsp in 300ms, total:%d, notRsp:%d, notReady:%d, fault:%d, timeout:%d, ready:%d",
|
stDebug(
|
||||||
id, total, numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady);
|
"s-task:%s vgId:%d continue checking rsp in 300ms, total:%d, notRsp:%d, notReady:%d, fault:%d, timeout:%d, "
|
||||||
|
"ready:%d",
|
||||||
|
id, vgId, total, numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady);
|
||||||
|
|
||||||
taosArrayDestroy(pNotReadyList);
|
taosArrayDestroy(pNotReadyList);
|
||||||
taosArrayDestroy(pTimeoutList);
|
taosArrayDestroy(pTimeoutList);
|
||||||
|
|
|
@ -26,6 +26,9 @@ typedef struct {
|
||||||
SStreamTask* pTask;
|
SStreamTask* pTask;
|
||||||
} SAsyncUploadArg;
|
} SAsyncUploadArg;
|
||||||
|
|
||||||
|
static int32_t downloadCheckpointDataByName(const char* id, const char* fname, const char* dstName);
|
||||||
|
static int32_t deleteCheckpointFile(char* id, char* name);
|
||||||
|
|
||||||
int32_t tEncodeStreamCheckpointSourceReq(SEncoder* pEncoder, const SStreamCheckpointSourceReq* pReq) {
|
int32_t tEncodeStreamCheckpointSourceReq(SEncoder* pEncoder, const SStreamCheckpointSourceReq* pReq) {
|
||||||
if (tStartEncode(pEncoder) < 0) return -1;
|
if (tStartEncode(pEncoder) < 0) return -1;
|
||||||
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
|
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
|
||||||
|
@ -376,21 +379,23 @@ int32_t streamSaveTaskCheckpointInfo(SStreamTask* p, int64_t checkpointId) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
void streamTaskSetCheckpointFailedId(SStreamTask* pTask) {
|
void streamTaskSetFailedCheckpointId(SStreamTask* pTask) {
|
||||||
pTask->chkInfo.failedId = pTask->chkInfo.checkpointingId;
|
pTask->chkInfo.failedId = pTask->chkInfo.checkpointingId;
|
||||||
stDebug("s-task:%s mark the checkpointId:%" PRId64 " (transId:%d) failed", pTask->id.idStr,
|
stDebug("s-task:%s mark the checkpointId:%" PRId64 " (transId:%d) failed", pTask->id.idStr,
|
||||||
pTask->chkInfo.checkpointingId, pTask->chkInfo.transId);
|
pTask->chkInfo.checkpointingId, pTask->chkInfo.transId);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t getChkpMeta(char* id, char* path, SArray* list) {
|
static int32_t getCheckpointDataMeta(const char* id, const char* path, SArray* list) {
|
||||||
char* file = taosMemoryCalloc(1, strlen(path) + 32);
|
char* file = taosMemoryCalloc(1, strlen(path) + 32);
|
||||||
sprintf(file, "%s%s%s", path, TD_DIRSEP, "META_TMP");
|
sprintf(file, "%s%s%s", path, TD_DIRSEP, "META_TMP");
|
||||||
int32_t code = downloadCheckpointByName(id, "META", file);
|
|
||||||
|
int32_t code = downloadCheckpointDataByName(id, "META", file);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
stDebug("chkp failed to download meta file:%s", file);
|
stDebug("chkp failed to download meta file:%s", file);
|
||||||
taosMemoryFree(file);
|
taosMemoryFree(file);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
TdFilePtr pFile = taosOpenFile(file, TD_FILE_READ);
|
TdFilePtr pFile = taosOpenFile(file, TD_FILE_READ);
|
||||||
char buf[128] = {0};
|
char buf[128] = {0};
|
||||||
if (taosReadFile(pFile, buf, sizeof(buf)) <= 0) {
|
if (taosReadFile(pFile, buf, sizeof(buf)) <= 0) {
|
||||||
|
@ -427,7 +432,7 @@ int32_t uploadCheckpointData(void* param) {
|
||||||
stError("s-task:%s failed to gen upload checkpoint:%" PRId64 "", arg->pTask->id.idStr, arg->chkpId);
|
stError("s-task:%s failed to gen upload checkpoint:%" PRId64 "", arg->pTask->id.idStr, arg->chkpId);
|
||||||
}
|
}
|
||||||
if (arg->type == DATA_UPLOAD_S3) {
|
if (arg->type == DATA_UPLOAD_S3) {
|
||||||
if (code == 0 && (code = getChkpMeta(arg->taskId, path, toDelFiles)) != 0) {
|
if (code == 0 && (code = getCheckpointDataMeta(arg->taskId, path, toDelFiles)) != 0) {
|
||||||
stError("s-task:%s failed to get checkpoint:%" PRId64 " meta", arg->pTask->id.idStr, arg->chkpId);
|
stError("s-task:%s failed to get checkpoint:%" PRId64 " meta", arg->pTask->id.idStr, arg->chkpId);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -457,8 +462,7 @@ int32_t uploadCheckpointData(void* param) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamTaskUploadChkp(SStreamTask* pTask, int64_t chkpId, char* taskId) {
|
int32_t streamTaskRemoteBackupCheckpoint(SStreamTask* pTask, int64_t chkpId, char* taskId) {
|
||||||
// async upload
|
|
||||||
ECHECKPOINT_BACKUP_TYPE type = streamGetCheckpointBackupType();
|
ECHECKPOINT_BACKUP_TYPE type = streamGetCheckpointBackupType();
|
||||||
if (type == DATA_UPLOAD_DISABLE) {
|
if (type == DATA_UPLOAD_DISABLE) {
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -514,7 +518,7 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) {
|
||||||
if (code == TSDB_CODE_SUCCESS) {
|
if (code == TSDB_CODE_SUCCESS) {
|
||||||
code = streamSaveTaskCheckpointInfo(pTask, ckId);
|
code = streamSaveTaskCheckpointInfo(pTask, ckId);
|
||||||
if (code == TSDB_CODE_SUCCESS) {
|
if (code == TSDB_CODE_SUCCESS) {
|
||||||
code = streamTaskUploadChkp(pTask, ckId, (char*)id);
|
code = streamTaskRemoteBackupCheckpoint(pTask, ckId, (char*)id);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
stError("s-task:%s failed to upload checkpoint:%" PRId64 " failed", id, ckId);
|
stError("s-task:%s failed to upload checkpoint:%" PRId64 " failed", id, ckId);
|
||||||
}
|
}
|
||||||
|
@ -546,7 +550,7 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) {
|
||||||
code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_CHECKPOINT_DONE);
|
code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_CHECKPOINT_DONE);
|
||||||
taosThreadMutexUnlock(&pTask->lock);
|
taosThreadMutexUnlock(&pTask->lock);
|
||||||
|
|
||||||
streamTaskSetCheckpointFailedId(pTask);
|
streamTaskSetFailedCheckpointId(pTask);
|
||||||
stDebug("s-task:%s clear checkpoint flag since gen checkpoint failed, checkpointId:%" PRId64, id, ckId);
|
stDebug("s-task:%s clear checkpoint flag since gen checkpoint failed, checkpointId:%" PRId64, id, ckId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -585,12 +589,12 @@ static int32_t uploadCheckpointToS3(char* id, char* path) {
|
||||||
stDebug("[s3] upload checkpoint:%s", filename);
|
stDebug("[s3] upload checkpoint:%s", filename);
|
||||||
// break;
|
// break;
|
||||||
}
|
}
|
||||||
taosCloseDir(&pDir);
|
|
||||||
|
|
||||||
|
taosCloseDir(&pDir);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t downloadCheckpointByNameS3(char* id, char* fname, char* dstName) {
|
static int32_t downloadCheckpointByNameS3(const char* id, const char* fname, const char* dstName) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
char* buf = taosMemoryCalloc(1, strlen(id) + strlen(dstName) + 4);
|
char* buf = taosMemoryCalloc(1, strlen(id) + strlen(dstName) + 4);
|
||||||
sprintf(buf, "%s/%s", id, fname);
|
sprintf(buf, "%s/%s", id, fname);
|
||||||
|
@ -625,16 +629,18 @@ int32_t streamTaskBackupCheckpoint(char* id, char* path) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// fileName: CURRENT
|
// fileName: CURRENT
|
||||||
int32_t downloadCheckpointByName(char* id, char* fname, char* dstName) {
|
int32_t downloadCheckpointDataByName(const char* id, const char* fname, const char* dstName) {
|
||||||
if (id == NULL || fname == NULL || strlen(id) == 0 || strlen(fname) == 0 || strlen(fname) >= PATH_MAX) {
|
if (id == NULL || fname == NULL || strlen(id) == 0 || strlen(fname) == 0 || strlen(fname) >= PATH_MAX) {
|
||||||
stError("uploadCheckpointByName parameters invalid");
|
stError("uploadCheckpointByName parameters invalid");
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (strlen(tsSnodeAddress) != 0) {
|
if (strlen(tsSnodeAddress) != 0) {
|
||||||
return 0;
|
return 0;
|
||||||
} else if (tsS3StreamEnabled) {
|
} else if (tsS3StreamEnabled) {
|
||||||
return downloadCheckpointByNameS3(id, fname, dstName);
|
return downloadCheckpointByNameS3(id, fname, dstName);
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -643,11 +649,13 @@ int32_t downloadCheckpoint(char* id, char* path) {
|
||||||
stError("downloadCheckpoint parameters invalid");
|
stError("downloadCheckpoint parameters invalid");
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (strlen(tsSnodeAddress) != 0) {
|
if (strlen(tsSnodeAddress) != 0) {
|
||||||
return downloadRsync(id, path);
|
return downloadRsync(id, path);
|
||||||
} else if (tsS3StreamEnabled) {
|
} else if (tsS3StreamEnabled) {
|
||||||
return s3GetObjectsByPrefix(id, path);
|
return s3GetObjectsByPrefix(id, path);
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -541,7 +541,7 @@ static void setLastExecTs(SStreamTask* pTask, int64_t ts) { pTask->status.lastEx
|
||||||
* todo: the batch of blocks should be tuned dynamic, according to the total elapsed time of each batch of blocks, the
|
* todo: the batch of blocks should be tuned dynamic, according to the total elapsed time of each batch of blocks, the
|
||||||
* appropriate batch of blocks should be handled in 5 to 10 sec.
|
* appropriate batch of blocks should be handled in 5 to 10 sec.
|
||||||
*/
|
*/
|
||||||
int32_t doStreamExecTask(SStreamTask* pTask) {
|
static int32_t doStreamExecTask(SStreamTask* pTask) {
|
||||||
const char* id = pTask->id.idStr;
|
const char* id = pTask->id.idStr;
|
||||||
|
|
||||||
// merge multiple input data if possible in the input queue.
|
// merge multiple input data if possible in the input queue.
|
||||||
|
|
|
@ -1399,7 +1399,7 @@ SArray* streamMetaSendMsgBeforeCloseTasks(SStreamMeta* pMeta) {
|
||||||
|
|
||||||
SStreamTaskState* pState = streamTaskGetStatus(pTask);
|
SStreamTaskState* pState = streamTaskGetStatus(pTask);
|
||||||
if (pState->state == TASK_STATUS__CK) {
|
if (pState->state == TASK_STATUS__CK) {
|
||||||
streamTaskSetCheckpointFailedId(pTask);
|
streamTaskSetFailedCheckpointId(pTask);
|
||||||
} else {
|
} else {
|
||||||
stDebug("s-task:%s status:%s not reset the checkpoint", pTask->id.idStr, pState->name);
|
stDebug("s-task:%s status:%s not reset the checkpoint", pTask->id.idStr, pState->name);
|
||||||
}
|
}
|
||||||
|
@ -1707,3 +1707,38 @@ int32_t streamMetaResetTaskStatus(SStreamMeta* pMeta) {
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t streamMetaAddFailedTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) {
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
int64_t now = taosGetTimestampMs();
|
||||||
|
int64_t startTs = 0;
|
||||||
|
bool hasFillhistoryTask = false;
|
||||||
|
STaskId hId = {0};
|
||||||
|
|
||||||
|
stDebug("vgId:%d add failed task:0x%x", pMeta->vgId, taskId);
|
||||||
|
|
||||||
|
streamMetaRLock(pMeta);
|
||||||
|
|
||||||
|
STaskId id = {.streamId = streamId, .taskId = taskId};
|
||||||
|
SStreamTask** ppTask = taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
|
||||||
|
|
||||||
|
if (ppTask != NULL) {
|
||||||
|
startTs = (*ppTask)->taskCheckInfo.startTs;
|
||||||
|
hasFillhistoryTask = HAS_RELATED_FILLHISTORY_TASK(*ppTask);
|
||||||
|
hId = (*ppTask)->hTaskInfo.id;
|
||||||
|
|
||||||
|
streamMetaRUnLock(pMeta);
|
||||||
|
|
||||||
|
// add the failed task info, along with the related fill-history task info into tasks list.
|
||||||
|
streamMetaAddTaskLaunchResult(pMeta, streamId, taskId, startTs, now, false);
|
||||||
|
if (hasFillhistoryTask) {
|
||||||
|
streamMetaAddTaskLaunchResult(pMeta, hId.streamId, hId.taskId, startTs, now, false);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
stError("failed to locate the stream task:0x%" PRIx64 "-0x%x (vgId:%d), it may have been destroyed or stopped",
|
||||||
|
streamId, taskId, pMeta->vgId);
|
||||||
|
code = TSDB_CODE_STREAM_TASK_NOT_EXIST;
|
||||||
|
}
|
||||||
|
|
||||||
|
return code;
|
||||||
|
}
|
|
@ -193,7 +193,7 @@ int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_
|
||||||
taosThreadMutexLock(&pTask->lock);
|
taosThreadMutexLock(&pTask->lock);
|
||||||
ETaskStatus status = streamTaskGetStatus(pTask)->state;
|
ETaskStatus status = streamTaskGetStatus(pTask)->state;
|
||||||
if (status == TASK_STATUS__CK) {
|
if (status == TASK_STATUS__CK) {
|
||||||
streamTaskSetCheckpointFailedId(pTask);
|
streamTaskSetFailedCheckpointId(pTask);
|
||||||
}
|
}
|
||||||
taosThreadMutexUnlock(&pTask->lock);
|
taosThreadMutexUnlock(&pTask->lock);
|
||||||
}
|
}
|
||||||
|
@ -203,7 +203,7 @@ int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_
|
||||||
taosThreadMutexLock(&pTask->lock);
|
taosThreadMutexLock(&pTask->lock);
|
||||||
ETaskStatus status = streamTaskGetStatus(pTask)->state;
|
ETaskStatus status = streamTaskGetStatus(pTask)->state;
|
||||||
if (status == TASK_STATUS__CK) {
|
if (status == TASK_STATUS__CK) {
|
||||||
streamTaskSetCheckpointFailedId(pTask);
|
streamTaskSetFailedCheckpointId(pTask);
|
||||||
}
|
}
|
||||||
taosThreadMutexUnlock(&pTask->lock);
|
taosThreadMutexUnlock(&pTask->lock);
|
||||||
|
|
||||||
|
|
|
@ -222,7 +222,7 @@ class TDTestCase:
|
||||||
|
|
||||||
tdSql.query("select * from information_schema.ins_columns where db_name ='information_schema'")
|
tdSql.query("select * from information_schema.ins_columns where db_name ='information_schema'")
|
||||||
tdLog.info(len(tdSql.queryResult))
|
tdLog.info(len(tdSql.queryResult))
|
||||||
tdSql.checkEqual(True, len(tdSql.queryResult) in range(251, 252))
|
tdSql.checkEqual(True, len(tdSql.queryResult) in range(253, 254))
|
||||||
|
|
||||||
tdSql.query("select * from information_schema.ins_columns where db_name ='performance_schema'")
|
tdSql.query("select * from information_schema.ins_columns where db_name ='performance_schema'")
|
||||||
tdSql.checkEqual(54, len(tdSql.queryResult))
|
tdSql.checkEqual(54, len(tdSql.queryResult))
|
||||||
|
|
Loading…
Reference in New Issue