Merge pull request #25517 from taosdata/fix/3_liaohj
fix(stream): add task node into update list if it is timeout for more than 100sec.
This commit is contained in:
commit
5f5595a6ab
|
@ -435,6 +435,7 @@ typedef struct SUpstreamInfo {
|
||||||
typedef struct SDownstreamStatusInfo {
|
typedef struct SDownstreamStatusInfo {
|
||||||
int64_t reqId;
|
int64_t reqId;
|
||||||
int32_t taskId;
|
int32_t taskId;
|
||||||
|
int32_t vgId;
|
||||||
int64_t rspTs;
|
int64_t rspTs;
|
||||||
int32_t status;
|
int32_t status;
|
||||||
} SDownstreamStatusInfo;
|
} SDownstreamStatusInfo;
|
||||||
|
@ -847,12 +848,9 @@ int32_t streamTaskSetDb(SStreamMeta* pMeta, void* pTask, char* key);
|
||||||
bool streamTaskIsSinkTask(const SStreamTask* pTask);
|
bool streamTaskIsSinkTask(const SStreamTask* pTask);
|
||||||
int32_t streamTaskSendCheckpointReq(SStreamTask* pTask);
|
int32_t streamTaskSendCheckpointReq(SStreamTask* pTask);
|
||||||
|
|
||||||
int32_t streamTaskAddReqInfo(STaskCheckInfo* pInfo, int64_t reqId, int32_t taskId, const char* id);
|
|
||||||
int32_t streamTaskUpdateCheckInfo(STaskCheckInfo* pInfo, int32_t taskId, int32_t status, int64_t rspTs, int64_t reqId,
|
|
||||||
int32_t* pNotReady, const char* id);
|
|
||||||
void streamTaskCleanupCheckInfo(STaskCheckInfo* pInfo);
|
|
||||||
int32_t streamTaskStartMonitorCheckRsp(SStreamTask* pTask);
|
int32_t streamTaskStartMonitorCheckRsp(SStreamTask* pTask);
|
||||||
int32_t streamTaskStopMonitorCheckRsp(STaskCheckInfo* pInfo, const char* id);
|
int32_t streamTaskStopMonitorCheckRsp(STaskCheckInfo* pInfo, const char* id);
|
||||||
|
void streamTaskCleanupCheckInfo(STaskCheckInfo* pInfo);
|
||||||
|
|
||||||
void streamTaskStatusInit(STaskStatusEntry* pEntry, const SStreamTask* pTask);
|
void streamTaskStatusInit(STaskStatusEntry* pEntry, const SStreamTask* pTask);
|
||||||
void streamTaskStatusCopy(STaskStatusEntry* pDst, const STaskStatusEntry* pSrc);
|
void streamTaskStatusCopy(STaskStatusEntry* pDst, const STaskStatusEntry* pSrc);
|
||||||
|
|
|
@ -271,7 +271,7 @@ int32_t tsTtlBatchDropNum = 10000; // number of tables dropped per batch
|
||||||
int32_t tsTransPullupInterval = 2;
|
int32_t tsTransPullupInterval = 2;
|
||||||
int32_t tsCompactPullupInterval = 10;
|
int32_t tsCompactPullupInterval = 10;
|
||||||
int32_t tsMqRebalanceInterval = 2;
|
int32_t tsMqRebalanceInterval = 2;
|
||||||
int32_t tsStreamCheckpointInterval = 300;
|
int32_t tsStreamCheckpointInterval = 60;
|
||||||
float tsSinkDataRate = 2.0;
|
float tsSinkDataRate = 2.0;
|
||||||
int32_t tsStreamNodeCheckInterval = 16;
|
int32_t tsStreamNodeCheckInterval = 16;
|
||||||
int32_t tsTtlUnit = 86400;
|
int32_t tsTtlUnit = 86400;
|
||||||
|
|
|
@ -847,7 +847,7 @@ int64_t mndStreamGenChkptId(SMnode *pMnode, bool lock) {
|
||||||
if (pIter == NULL) break;
|
if (pIter == NULL) break;
|
||||||
|
|
||||||
maxChkptId = TMAX(maxChkptId, pStream->checkpointId);
|
maxChkptId = TMAX(maxChkptId, pStream->checkpointId);
|
||||||
mDebug("stream:%p, %s id:%" PRIx64 "checkpoint %" PRId64 "", pStream, pStream->name, pStream->uid,
|
mDebug("stream:%p, %s id:0x%" PRIx64 " checkpoint %" PRId64 "", pStream, pStream->name, pStream->uid,
|
||||||
pStream->checkpointId);
|
pStream->checkpointId);
|
||||||
sdbRelease(pSdb, pStream);
|
sdbRelease(pSdb, pStream);
|
||||||
}
|
}
|
||||||
|
|
|
@ -1156,14 +1156,24 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
|
||||||
|
|
||||||
// check if the checkpoint msg already sent or not.
|
// check if the checkpoint msg already sent or not.
|
||||||
if (status == TASK_STATUS__CK) {
|
if (status == TASK_STATUS__CK) {
|
||||||
tqWarn("s-task:%s recv checkpoint-source msg again checkpointId:%" PRId64
|
tqWarn("s-task:%s repeatly recv checkpoint-source msg checkpointId:%" PRId64
|
||||||
" transId:%d already received, ignore this msg and continue process checkpoint",
|
" transId:%d already handled, ignore msg and continue process checkpoint",
|
||||||
pTask->id.idStr, pTask->chkInfo.checkpointingId, req.transId);
|
pTask->id.idStr, pTask->chkInfo.checkpointingId, req.transId);
|
||||||
|
|
||||||
taosThreadMutexUnlock(&pTask->lock);
|
taosThreadMutexUnlock(&pTask->lock);
|
||||||
streamMetaReleaseTask(pMeta, pTask);
|
streamMetaReleaseTask(pMeta, pTask);
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
} else { // checkpoint already finished, and not in checkpoint status
|
||||||
|
if (req.checkpointId == pTask->chkInfo.checkpointId) {
|
||||||
|
tqWarn("s-task:%s repeatly recv checkpoint-source msg checkpointId:%" PRId64
|
||||||
|
" transId:%d already handled, ignore and discard", pTask->id.idStr, req.checkpointId, req.transId);
|
||||||
|
|
||||||
|
taosThreadMutexUnlock(&pTask->lock);
|
||||||
|
streamMetaReleaseTask(pMeta, pTask);
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
streamProcessCheckpointSourceReq(pTask, &req);
|
streamProcessCheckpointSourceReq(pTask, &req);
|
||||||
|
|
|
@ -69,6 +69,7 @@ typedef struct {
|
||||||
int64_t chkpId;
|
int64_t chkpId;
|
||||||
char* dbPrefixPath;
|
char* dbPrefixPath;
|
||||||
} SStreamTaskSnap;
|
} SStreamTaskSnap;
|
||||||
|
|
||||||
struct STokenBucket {
|
struct STokenBucket {
|
||||||
int32_t numCapacity; // total capacity, available token per second
|
int32_t numCapacity; // total capacity, available token per second
|
||||||
int32_t numOfToken; // total available tokens
|
int32_t numOfToken; // total available tokens
|
||||||
|
@ -148,18 +149,19 @@ int32_t streamQueueGetItemSize(const SStreamQueue* pQueue);
|
||||||
|
|
||||||
void streamMetaRemoveDB(void* arg, char* key);
|
void streamMetaRemoveDB(void* arg, char* key);
|
||||||
|
|
||||||
typedef enum UPLOAD_TYPE {
|
typedef enum ECHECKPOINT_BACKUP_TYPE {
|
||||||
UPLOAD_DISABLE = -1,
|
DATA_UPLOAD_DISABLE = -1,
|
||||||
UPLOAD_S3 = 0,
|
DATA_UPLOAD_S3 = 0,
|
||||||
UPLOAD_RSYNC = 1,
|
DATA_UPLOAD_RSYNC = 1,
|
||||||
} UPLOAD_TYPE;
|
} ECHECKPOINT_BACKUP_TYPE;
|
||||||
|
|
||||||
UPLOAD_TYPE getUploadType();
|
ECHECKPOINT_BACKUP_TYPE streamGetCheckpointBackupType();
|
||||||
int uploadCheckpoint(char* id, char* path);
|
|
||||||
int downloadCheckpoint(char* id, char* path);
|
int32_t streamTaskBackupCheckpoint(char* id, char* path);
|
||||||
int deleteCheckpoint(char* id);
|
int32_t downloadCheckpoint(char* id, char* path);
|
||||||
int deleteCheckpointFile(char* id, char* name);
|
int32_t deleteCheckpoint(char* id);
|
||||||
int downloadCheckpointByName(char* id, char* fname, char* dstName);
|
int32_t deleteCheckpointFile(char* id, char* name);
|
||||||
|
int32_t downloadCheckpointByName(char* id, char* fname, char* dstName);
|
||||||
|
|
||||||
int32_t streamTaskOnNormalTaskReady(SStreamTask* pTask);
|
int32_t streamTaskOnNormalTaskReady(SStreamTask* pTask);
|
||||||
int32_t streamTaskOnScanhistoryTaskReady(SStreamTask* pTask);
|
int32_t streamTaskOnScanhistoryTaskReady(SStreamTask* pTask);
|
||||||
|
|
|
@ -376,10 +376,10 @@ int32_t rebuildFromRemoteChkp_s3(char* key, char* chkpPath, int64_t chkpId, char
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
int32_t rebuildFromRemoteChkp(char* key, char* chkpPath, int64_t chkpId, char* defaultPath) {
|
int32_t rebuildFromRemoteChkp(char* key, char* chkpPath, int64_t chkpId, char* defaultPath) {
|
||||||
UPLOAD_TYPE type = getUploadType();
|
ECHECKPOINT_BACKUP_TYPE type = streamGetCheckpointBackupType();
|
||||||
if (type == UPLOAD_S3) {
|
if (type == DATA_UPLOAD_S3) {
|
||||||
return rebuildFromRemoteChkp_s3(key, chkpPath, chkpId, defaultPath);
|
return rebuildFromRemoteChkp_s3(key, chkpPath, chkpId, defaultPath);
|
||||||
} else if (type == UPLOAD_RSYNC) {
|
} else if (type == DATA_UPLOAD_RSYNC) {
|
||||||
return rebuildFromRemoteChkp_rsync(key, chkpPath, chkpId, defaultPath);
|
return rebuildFromRemoteChkp_rsync(key, chkpPath, chkpId, defaultPath);
|
||||||
}
|
}
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -2111,11 +2111,11 @@ int32_t taskDbGenChkpUploadData__s3(STaskDbWrapper* pDb, void* bkdChkpMgt, int64
|
||||||
}
|
}
|
||||||
int32_t taskDbGenChkpUploadData(void* arg, void* mgt, int64_t chkpId, int8_t type, char** path, SArray* list) {
|
int32_t taskDbGenChkpUploadData(void* arg, void* mgt, int64_t chkpId, int8_t type, char** path, SArray* list) {
|
||||||
STaskDbWrapper* pDb = arg;
|
STaskDbWrapper* pDb = arg;
|
||||||
UPLOAD_TYPE utype = type;
|
ECHECKPOINT_BACKUP_TYPE utype = type;
|
||||||
|
|
||||||
if (utype == UPLOAD_RSYNC) {
|
if (utype == DATA_UPLOAD_RSYNC) {
|
||||||
return taskDbGenChkpUploadData__rsync(pDb, chkpId, path);
|
return taskDbGenChkpUploadData__rsync(pDb, chkpId, path);
|
||||||
} else if (utype == UPLOAD_S3) {
|
} else if (utype == DATA_UPLOAD_S3) {
|
||||||
return taskDbGenChkpUploadData__s3(pDb, mgt, chkpId, path, list);
|
return taskDbGenChkpUploadData__s3(pDb, mgt, chkpId, path, list);
|
||||||
}
|
}
|
||||||
return -1;
|
return -1;
|
||||||
|
|
|
@ -18,7 +18,7 @@
|
||||||
#include "streamBackendRocksdb.h"
|
#include "streamBackendRocksdb.h"
|
||||||
#include "streamInt.h"
|
#include "streamInt.h"
|
||||||
|
|
||||||
#define CHECK_NOT_RSP_DURATION 10*1000 // 10 sec
|
#define CHECK_NOT_RSP_DURATION 10 * 1000 // 10 sec
|
||||||
|
|
||||||
static void processDownstreamReadyRsp(SStreamTask* pTask);
|
static void processDownstreamReadyRsp(SStreamTask* pTask);
|
||||||
static void addIntoNodeUpdateList(SStreamTask* pTask, int32_t nodeId);
|
static void addIntoNodeUpdateList(SStreamTask* pTask, int32_t nodeId);
|
||||||
|
@ -26,16 +26,22 @@ static void rspMonitorFn(void* param, void* tmrId);
|
||||||
static int32_t streamTaskInitTaskCheckInfo(STaskCheckInfo* pInfo, STaskOutputInfo* pOutputInfo, int64_t startTs);
|
static int32_t streamTaskInitTaskCheckInfo(STaskCheckInfo* pInfo, STaskOutputInfo* pOutputInfo, int64_t startTs);
|
||||||
static int32_t streamTaskStartCheckDownstream(STaskCheckInfo* pInfo, const char* id);
|
static int32_t streamTaskStartCheckDownstream(STaskCheckInfo* pInfo, const char* id);
|
||||||
static int32_t streamTaskCompleteCheckRsp(STaskCheckInfo* pInfo, bool lock, const char* id);
|
static int32_t streamTaskCompleteCheckRsp(STaskCheckInfo* pInfo, bool lock, const char* id);
|
||||||
|
static int32_t streamTaskAddReqInfo(STaskCheckInfo* pInfo, int64_t reqId, int32_t taskId, int32_t vgId, const char* id);
|
||||||
static void doSendCheckMsg(SStreamTask* pTask, SDownstreamStatusInfo* p);
|
static void doSendCheckMsg(SStreamTask* pTask, SDownstreamStatusInfo* p);
|
||||||
static void getCheckRspStatus(STaskCheckInfo* pInfo, int64_t el, int32_t* numOfReady, int32_t* numOfFault,
|
static void handleTimeoutDownstreamTasks(SStreamTask* pTask, SArray* pTimeoutList);
|
||||||
int32_t* numOfNotRsp, SArray* pTimeoutList, SArray* pNotReadyList, const char* id);
|
static void handleNotReadyDownstreamTask(SStreamTask* pTask, SArray* pNotReadyList);
|
||||||
|
static int32_t streamTaskUpdateCheckInfo(STaskCheckInfo* pInfo, int32_t taskId, int32_t status, int64_t rspTs,
|
||||||
|
int64_t reqId, int32_t* pNotReady, const char* id);
|
||||||
|
static void setCheckDownstreamReqInfo(SStreamTaskCheckReq* pReq, int64_t reqId, int32_t dstTaskId, int32_t dstNodeId);
|
||||||
|
static void getCheckRspStatus(STaskCheckInfo* pInfo, int64_t el, int32_t* numOfReady, int32_t* numOfFault,
|
||||||
|
int32_t* numOfNotRsp, SArray* pTimeoutList, SArray* pNotReadyList, const char* id);
|
||||||
static SDownstreamStatusInfo* findCheckRspStatus(STaskCheckInfo* pInfo, int32_t taskId);
|
static SDownstreamStatusInfo* findCheckRspStatus(STaskCheckInfo* pInfo, int32_t taskId);
|
||||||
|
|
||||||
// check status
|
// check status
|
||||||
void streamTaskCheckDownstream(SStreamTask* pTask) {
|
void streamTaskCheckDownstream(SStreamTask* pTask) {
|
||||||
SDataRange* pRange = &pTask->dataRange;
|
SDataRange* pRange = &pTask->dataRange;
|
||||||
STimeWindow* pWindow = &pRange->window;
|
STimeWindow* pWindow = &pRange->window;
|
||||||
|
const char* idstr = pTask->id.idStr;
|
||||||
|
|
||||||
SStreamTaskCheckReq req = {
|
SStreamTaskCheckReq req = {
|
||||||
.streamId = pTask->id.streamId,
|
.streamId = pTask->id.streamId,
|
||||||
|
@ -51,16 +57,15 @@ void streamTaskCheckDownstream(SStreamTask* pTask) {
|
||||||
if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
|
if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
|
||||||
streamTaskStartMonitorCheckRsp(pTask);
|
streamTaskStartMonitorCheckRsp(pTask);
|
||||||
|
|
||||||
req.reqId = tGenIdPI64();
|
STaskDispatcherFixed* pDispatch = &pTask->outputInfo.fixedDispatcher;
|
||||||
req.downstreamNodeId = pTask->outputInfo.fixedDispatcher.nodeId;
|
|
||||||
req.downstreamTaskId = pTask->outputInfo.fixedDispatcher.taskId;
|
|
||||||
|
|
||||||
streamTaskAddReqInfo(&pTask->taskCheckInfo, req.reqId, req.downstreamTaskId, pTask->id.idStr);
|
setCheckDownstreamReqInfo(&req, tGenIdPI64(), pDispatch->taskId, pDispatch->nodeId);
|
||||||
|
streamTaskAddReqInfo(&pTask->taskCheckInfo, req.reqId, pDispatch->taskId, pDispatch->nodeId, idstr);
|
||||||
|
|
||||||
stDebug("s-task:%s (vgId:%d) stage:%" PRId64 " check single downstream task:0x%x(vgId:%d) ver:%" PRId64 "-%" PRId64
|
stDebug("s-task:%s (vgId:%d) stage:%" PRId64 " check single downstream task:0x%x(vgId:%d) ver:%" PRId64 "-%" PRId64
|
||||||
" window:%" PRId64 "-%" PRId64 " reqId:0x%" PRIx64,
|
" window:%" PRId64 "-%" PRId64 " reqId:0x%" PRIx64,
|
||||||
pTask->id.idStr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId,
|
idstr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, pRange->range.minVer,
|
||||||
pRange->range.minVer, pRange->range.maxVer, pWindow->skey, pWindow->ekey, req.reqId);
|
pRange->range.maxVer, pWindow->skey, pWindow->ekey, req.reqId);
|
||||||
|
|
||||||
streamSendCheckMsg(pTask, &req, pTask->outputInfo.fixedDispatcher.nodeId, &pTask->outputInfo.fixedDispatcher.epSet);
|
streamSendCheckMsg(pTask, &req, pTask->outputInfo.fixedDispatcher.nodeId, &pTask->outputInfo.fixedDispatcher.epSet);
|
||||||
|
|
||||||
|
@ -70,25 +75,23 @@ void streamTaskCheckDownstream(SStreamTask* pTask) {
|
||||||
SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos;
|
SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos;
|
||||||
|
|
||||||
int32_t numOfVgs = taosArrayGetSize(vgInfo);
|
int32_t numOfVgs = taosArrayGetSize(vgInfo);
|
||||||
stDebug("s-task:%s check %d downstream tasks, ver:%" PRId64 "-%" PRId64 " window:%" PRId64 "-%" PRId64,
|
stDebug("s-task:%s check %d downstream tasks, ver:%" PRId64 "-%" PRId64 " window:%" PRId64 "-%" PRId64, idstr,
|
||||||
pTask->id.idStr, numOfVgs, pRange->range.minVer, pRange->range.maxVer, pWindow->skey, pWindow->ekey);
|
numOfVgs, pRange->range.minVer, pRange->range.maxVer, pWindow->skey, pWindow->ekey);
|
||||||
|
|
||||||
for (int32_t i = 0; i < numOfVgs; i++) {
|
for (int32_t i = 0; i < numOfVgs; i++) {
|
||||||
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
|
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
|
||||||
req.reqId = tGenIdPI64();
|
|
||||||
req.downstreamNodeId = pVgInfo->vgId;
|
|
||||||
req.downstreamTaskId = pVgInfo->taskId;
|
|
||||||
|
|
||||||
streamTaskAddReqInfo(&pTask->taskCheckInfo, req.reqId, req.downstreamTaskId, pTask->id.idStr);
|
setCheckDownstreamReqInfo(&req, tGenIdPI64(), pVgInfo->taskId, pVgInfo->vgId);
|
||||||
|
streamTaskAddReqInfo(&pTask->taskCheckInfo, req.reqId, pVgInfo->taskId, pVgInfo->vgId, idstr);
|
||||||
|
|
||||||
stDebug("s-task:%s (vgId:%d) stage:%" PRId64
|
stDebug("s-task:%s (vgId:%d) stage:%" PRId64
|
||||||
" check downstream task:0x%x (vgId:%d) (shuffle), idx:%d, reqId:0x%" PRIx64,
|
" check downstream task:0x%x (vgId:%d) (shuffle), idx:%d, reqId:0x%" PRIx64,
|
||||||
pTask->id.idStr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, i, req.reqId);
|
idstr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, i, req.reqId);
|
||||||
streamSendCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
|
streamSendCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
|
||||||
}
|
}
|
||||||
} else { // for sink task, set it ready directly.
|
} else { // for sink task, set it ready directly.
|
||||||
stDebug("s-task:%s (vgId:%d) set downstream ready, since no downstream", pTask->id.idStr, pTask->info.nodeId);
|
stDebug("s-task:%s (vgId:%d) set downstream ready, since no downstream", idstr, pTask->info.nodeId);
|
||||||
streamTaskStopMonitorCheckRsp(&pTask->taskCheckInfo, pTask->id.idStr);
|
streamTaskStopMonitorCheckRsp(&pTask->taskCheckInfo, idstr);
|
||||||
processDownstreamReadyRsp(pTask);
|
processDownstreamReadyRsp(pTask);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -158,31 +161,12 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamTaskAddReqInfo(STaskCheckInfo* pInfo, int64_t reqId, int32_t taskId, const char* id) {
|
|
||||||
SDownstreamStatusInfo info = {.taskId = taskId, .status = -1, .reqId = reqId, .rspTs = 0};
|
|
||||||
|
|
||||||
taosThreadMutexLock(&pInfo->checkInfoLock);
|
|
||||||
|
|
||||||
SDownstreamStatusInfo* p = findCheckRspStatus(pInfo, taskId);
|
|
||||||
if (p != NULL) {
|
|
||||||
stDebug("s-task:%s check info to task:0x%x already sent", id, taskId);
|
|
||||||
taosThreadMutexUnlock(&pInfo->checkInfoLock);
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
taosArrayPush(pInfo->pList, &info);
|
|
||||||
|
|
||||||
taosThreadMutexUnlock(&pInfo->checkInfoLock);
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t streamTaskStartMonitorCheckRsp(SStreamTask* pTask) {
|
int32_t streamTaskStartMonitorCheckRsp(SStreamTask* pTask) {
|
||||||
STaskCheckInfo* pInfo = &pTask->taskCheckInfo;
|
STaskCheckInfo* pInfo = &pTask->taskCheckInfo;
|
||||||
|
|
||||||
taosThreadMutexLock(&pInfo->checkInfoLock);
|
taosThreadMutexLock(&pInfo->checkInfoLock);
|
||||||
int32_t code = streamTaskStartCheckDownstream(pInfo, pTask->id.idStr);
|
int32_t code = streamTaskStartCheckDownstream(pInfo, pTask->id.idStr);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
|
||||||
taosThreadMutexUnlock(&pInfo->checkInfoLock);
|
taosThreadMutexUnlock(&pInfo->checkInfoLock);
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
|
@ -307,10 +291,8 @@ int32_t streamTaskUpdateCheckInfo(STaskCheckInfo* pInfo, int32_t taskId, int32_t
|
||||||
|
|
||||||
SDownstreamStatusInfo* p = findCheckRspStatus(pInfo, taskId);
|
SDownstreamStatusInfo* p = findCheckRspStatus(pInfo, taskId);
|
||||||
if (p != NULL) {
|
if (p != NULL) {
|
||||||
|
|
||||||
if (reqId != p->reqId) {
|
if (reqId != p->reqId) {
|
||||||
stError("s-task:%s reqId:%" PRIx64 " expected:%" PRIx64
|
stError("s-task:%s reqId:%" PRIx64 " expected:%" PRIx64 " expired check-rsp recv from downstream task:0x%x, discarded",
|
||||||
" expired check-rsp recv from downstream task:0x%x, discarded",
|
|
||||||
id, reqId, p->reqId, taskId);
|
id, reqId, p->reqId, taskId);
|
||||||
taosThreadMutexUnlock(&pInfo->checkInfoLock);
|
taosThreadMutexUnlock(&pInfo->checkInfoLock);
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
|
@ -341,7 +323,8 @@ int32_t streamTaskStartCheckDownstream(STaskCheckInfo* pInfo, const char* id) {
|
||||||
pInfo->inCheckProcess = 1;
|
pInfo->inCheckProcess = 1;
|
||||||
} else {
|
} else {
|
||||||
ASSERT(pInfo->startTs > 0);
|
ASSERT(pInfo->startTs > 0);
|
||||||
stError("s-task:%s already in check procedure, checkTs:%"PRId64", start monitor check rsp failed", id, pInfo->startTs);
|
stError("s-task:%s already in check procedure, checkTs:%" PRId64 ", start monitor check rsp failed", id,
|
||||||
|
pInfo->startTs);
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -378,6 +361,24 @@ int32_t streamTaskCompleteCheckRsp(STaskCheckInfo* pInfo, bool lock, const char*
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t streamTaskAddReqInfo(STaskCheckInfo* pInfo, int64_t reqId, int32_t taskId, int32_t vgId, const char* id) {
|
||||||
|
SDownstreamStatusInfo info = {.taskId = taskId, .status = -1, .vgId = vgId, .reqId = reqId, .rspTs = 0};
|
||||||
|
|
||||||
|
taosThreadMutexLock(&pInfo->checkInfoLock);
|
||||||
|
|
||||||
|
SDownstreamStatusInfo* p = findCheckRspStatus(pInfo, taskId);
|
||||||
|
if (p != NULL) {
|
||||||
|
stDebug("s-task:%s check info to task:0x%x already sent", id, taskId);
|
||||||
|
taosThreadMutexUnlock(&pInfo->checkInfoLock);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
taosArrayPush(pInfo->pList, &info);
|
||||||
|
|
||||||
|
taosThreadMutexUnlock(&pInfo->checkInfoLock);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
void doSendCheckMsg(SStreamTask* pTask, SDownstreamStatusInfo* p) {
|
void doSendCheckMsg(SStreamTask* pTask, SDownstreamStatusInfo* p) {
|
||||||
SStreamTaskCheckReq req = {
|
SStreamTaskCheckReq req = {
|
||||||
.streamId = pTask->id.streamId,
|
.streamId = pTask->id.streamId,
|
||||||
|
@ -389,9 +390,9 @@ void doSendCheckMsg(SStreamTask* pTask, SDownstreamStatusInfo* p) {
|
||||||
|
|
||||||
STaskOutputInfo* pOutputInfo = &pTask->outputInfo;
|
STaskOutputInfo* pOutputInfo = &pTask->outputInfo;
|
||||||
if (pOutputInfo->type == TASK_OUTPUT__FIXED_DISPATCH) {
|
if (pOutputInfo->type == TASK_OUTPUT__FIXED_DISPATCH) {
|
||||||
req.reqId = p->reqId;
|
STaskDispatcherFixed* pDispatch = &pOutputInfo->fixedDispatcher;
|
||||||
req.downstreamNodeId = pOutputInfo->fixedDispatcher.nodeId;
|
setCheckDownstreamReqInfo(&req, p->reqId, pDispatch->taskId, pDispatch->taskId);
|
||||||
req.downstreamTaskId = pOutputInfo->fixedDispatcher.taskId;
|
|
||||||
stDebug("s-task:%s (vgId:%d) stage:%" PRId64 " re-send check downstream task:0x%x(vgId:%d) reqId:0x%" PRIx64,
|
stDebug("s-task:%s (vgId:%d) stage:%" PRId64 " re-send check downstream task:0x%x(vgId:%d) reqId:0x%" PRIx64,
|
||||||
pTask->id.idStr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, req.reqId);
|
pTask->id.idStr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, req.reqId);
|
||||||
|
|
||||||
|
@ -404,12 +405,10 @@ void doSendCheckMsg(SStreamTask* pTask, SDownstreamStatusInfo* p) {
|
||||||
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
|
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
|
||||||
|
|
||||||
if (p->taskId == pVgInfo->taskId) {
|
if (p->taskId == pVgInfo->taskId) {
|
||||||
req.reqId = p->reqId;
|
setCheckDownstreamReqInfo(&req, p->reqId, pVgInfo->taskId, pVgInfo->vgId);
|
||||||
req.downstreamNodeId = pVgInfo->vgId;
|
|
||||||
req.downstreamTaskId = pVgInfo->taskId;
|
|
||||||
|
|
||||||
stDebug("s-task:%s (vgId:%d) stage:%" PRId64
|
stDebug("s-task:%s (vgId:%d) stage:%" PRId64
|
||||||
" re-send check downstream task:0x%x(vgId:%d) (shuffle), idx:%d reqId:0x%" PRIx64,
|
" re-send check downstream task:0x%x(vgId:%d) (shuffle), idx:%d reqId:0x%" PRIx64,
|
||||||
pTask->id.idStr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, i,
|
pTask->id.idStr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, i,
|
||||||
p->reqId);
|
p->reqId);
|
||||||
streamSendCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
|
streamSendCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
|
||||||
|
@ -423,7 +422,6 @@ void doSendCheckMsg(SStreamTask* pTask, SDownstreamStatusInfo* p) {
|
||||||
|
|
||||||
void getCheckRspStatus(STaskCheckInfo* pInfo, int64_t el, int32_t* numOfReady, int32_t* numOfFault,
|
void getCheckRspStatus(STaskCheckInfo* pInfo, int64_t el, int32_t* numOfReady, int32_t* numOfFault,
|
||||||
int32_t* numOfNotRsp, SArray* pTimeoutList, SArray* pNotReadyList, const char* id) {
|
int32_t* numOfNotRsp, SArray* pTimeoutList, SArray* pNotReadyList, const char* id) {
|
||||||
|
|
||||||
for (int32_t i = 0; i < taosArrayGetSize(pInfo->pList); ++i) {
|
for (int32_t i = 0; i < taosArrayGetSize(pInfo->pList); ++i) {
|
||||||
SDownstreamStatusInfo* p = taosArrayGet(pInfo->pList, i);
|
SDownstreamStatusInfo* p = taosArrayGet(pInfo->pList, i);
|
||||||
if (p->status == TASK_DOWNSTREAM_READY) {
|
if (p->status == TASK_DOWNSTREAM_READY) {
|
||||||
|
@ -447,6 +445,78 @@ void getCheckRspStatus(STaskCheckInfo* pInfo, int64_t el, int32_t* numOfReady, i
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void setCheckDownstreamReqInfo(SStreamTaskCheckReq* pReq, int64_t reqId, int32_t dstTaskId, int32_t dstNodeId) {
|
||||||
|
pReq->reqId = reqId;
|
||||||
|
pReq->downstreamTaskId = dstTaskId;
|
||||||
|
pReq->downstreamNodeId = dstNodeId;
|
||||||
|
}
|
||||||
|
|
||||||
|
void handleTimeoutDownstreamTasks(SStreamTask* pTask, SArray* pTimeoutList) {
|
||||||
|
STaskCheckInfo* pInfo = &pTask->taskCheckInfo;
|
||||||
|
const char* id = pTask->id.idStr;
|
||||||
|
int32_t vgId = pTask->pMeta->vgId;
|
||||||
|
int32_t numOfTimeout = taosArrayGetSize(pTimeoutList);
|
||||||
|
|
||||||
|
ASSERT(pTask->status.downstreamReady == 0);
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < numOfTimeout; ++i) {
|
||||||
|
int32_t taskId = *(int32_t*)taosArrayGet(pTimeoutList, i);
|
||||||
|
|
||||||
|
SDownstreamStatusInfo* p = findCheckRspStatus(pInfo, taskId);
|
||||||
|
if (p != NULL) {
|
||||||
|
ASSERT(p->status == -1 && p->rspTs == 0);
|
||||||
|
doSendCheckMsg(pTask, p);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pInfo->timeoutRetryCount += 1;
|
||||||
|
|
||||||
|
// timeout more than 100 sec, add into node update list
|
||||||
|
if (pInfo->timeoutRetryCount > 10) {
|
||||||
|
pInfo->timeoutRetryCount = 0;
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < numOfTimeout; ++i) {
|
||||||
|
int32_t taskId = *(int32_t*)taosArrayGet(pTimeoutList, i);
|
||||||
|
SDownstreamStatusInfo* p = findCheckRspStatus(pInfo, taskId);
|
||||||
|
if (p != NULL) {
|
||||||
|
addIntoNodeUpdateList(pTask, p->vgId);
|
||||||
|
stDebug("s-task:%s vgId:%d downstream task:0x%x (vgId:%d) timeout more than 100sec, add into nodeUpate list",
|
||||||
|
id, vgId, p->taskId, p->vgId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
stDebug("s-task:%s vgId:%d %d downstream task(s) all add into nodeUpate list", id, vgId, numOfTimeout);
|
||||||
|
} else {
|
||||||
|
stDebug("s-task:%s vgId:%d %d downstream task(s) timeout, send check msg again, retry:%d start time:%" PRId64, id,
|
||||||
|
vgId, numOfTimeout, pInfo->timeoutRetryCount, pInfo->startTs);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void handleNotReadyDownstreamTask(SStreamTask* pTask, SArray* pNotReadyList) {
|
||||||
|
STaskCheckInfo* pInfo = &pTask->taskCheckInfo;
|
||||||
|
const char* id = pTask->id.idStr;
|
||||||
|
int32_t vgId = pTask->pMeta->vgId;
|
||||||
|
int32_t numOfNotReady = taosArrayGetSize(pNotReadyList);
|
||||||
|
|
||||||
|
ASSERT(pTask->status.downstreamReady == 0);
|
||||||
|
|
||||||
|
// reset the info, and send the check msg to failure downstream again
|
||||||
|
for (int32_t i = 0; i < numOfNotReady; ++i) {
|
||||||
|
int32_t taskId = *(int32_t*)taosArrayGet(pNotReadyList, i);
|
||||||
|
|
||||||
|
SDownstreamStatusInfo* p = findCheckRspStatus(pInfo, taskId);
|
||||||
|
if (p != NULL) {
|
||||||
|
p->rspTs = 0;
|
||||||
|
p->status = -1;
|
||||||
|
doSendCheckMsg(pTask, p);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pInfo->notReadyRetryCount += 1;
|
||||||
|
stDebug("s-task:%s vgId:%d %d downstream task(s) not ready, send check msg again, retry:%d start time:%" PRId64, id,
|
||||||
|
vgId, numOfNotReady, pInfo->notReadyRetryCount, pInfo->startTs);
|
||||||
|
}
|
||||||
|
|
||||||
void rspMonitorFn(void* param, void* tmrId) {
|
void rspMonitorFn(void* param, void* tmrId) {
|
||||||
SStreamTask* pTask = param;
|
SStreamTask* pTask = param;
|
||||||
SStreamTaskState* pStat = streamTaskGetStatus(pTask);
|
SStreamTaskState* pStat = streamTaskGetStatus(pTask);
|
||||||
|
@ -461,6 +531,7 @@ void rspMonitorFn(void* param, void* tmrId) {
|
||||||
int32_t numOfNotRsp = 0;
|
int32_t numOfNotRsp = 0;
|
||||||
int32_t numOfNotReady = 0;
|
int32_t numOfNotReady = 0;
|
||||||
int32_t numOfTimeout = 0;
|
int32_t numOfTimeout = 0;
|
||||||
|
int32_t total = taosArrayGetSize(pInfo->pList);
|
||||||
|
|
||||||
stDebug("s-task:%s start to do check-downstream-rsp check in tmr", id);
|
stDebug("s-task:%s start to do check-downstream-rsp check in tmr", id);
|
||||||
|
|
||||||
|
@ -510,7 +581,7 @@ void rspMonitorFn(void* param, void* tmrId) {
|
||||||
numOfTimeout = (int32_t)taosArrayGetSize(pTimeoutList);
|
numOfTimeout = (int32_t)taosArrayGetSize(pTimeoutList);
|
||||||
|
|
||||||
// fault tasks detected, not try anymore
|
// fault tasks detected, not try anymore
|
||||||
ASSERT((numOfReady + numOfFault + numOfNotReady + numOfTimeout + numOfNotRsp) == taosArrayGetSize(pInfo->pList));
|
ASSERT((numOfReady + numOfFault + numOfNotReady + numOfTimeout + numOfNotRsp) == total);
|
||||||
if (numOfFault > 0) {
|
if (numOfFault > 0) {
|
||||||
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
|
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
|
||||||
stDebug(
|
stDebug(
|
||||||
|
@ -550,57 +621,18 @@ void rspMonitorFn(void* param, void* tmrId) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (numOfNotReady > 0) { // check to make sure not in recheck timer
|
if (numOfNotReady > 0) { // check to make sure not in recheck timer
|
||||||
ASSERT(pTask->status.downstreamReady == 0);
|
handleNotReadyDownstreamTask(pTask, pNotReadyList);
|
||||||
|
|
||||||
// reset the info, and send the check msg to failure downstream again
|
|
||||||
for (int32_t i = 0; i < numOfNotReady; ++i) {
|
|
||||||
int32_t taskId = *(int32_t*)taosArrayGet(pNotReadyList, i);
|
|
||||||
|
|
||||||
SDownstreamStatusInfo* p = findCheckRspStatus(pInfo, taskId);
|
|
||||||
if (p != NULL) {
|
|
||||||
p->rspTs = 0;
|
|
||||||
p->status = -1;
|
|
||||||
doSendCheckMsg(pTask, p);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pInfo->notReadyRetryCount += 1;
|
|
||||||
stDebug("s-task:%s %d downstream task(s) not ready, send check msg again, retry:%d start time:%" PRId64, id,
|
|
||||||
numOfNotReady, pInfo->notReadyRetryCount, pInfo->startTs);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// todo add into node update list and send to mnode
|
|
||||||
if (numOfTimeout > 0) {
|
if (numOfTimeout > 0) {
|
||||||
ASSERT(pTask->status.downstreamReady == 0);
|
handleTimeoutDownstreamTasks(pTask, pTimeoutList);
|
||||||
|
|
||||||
for (int32_t i = 0; i < numOfTimeout; ++i) {
|
|
||||||
int32_t taskId = *(int32_t*)taosArrayGet(pTimeoutList, i);
|
|
||||||
|
|
||||||
SDownstreamStatusInfo* p = findCheckRspStatus(pInfo, taskId);
|
|
||||||
if (p != NULL) {
|
|
||||||
ASSERT(p->status == -1 && p->rspTs == 0);
|
|
||||||
doSendCheckMsg(pTask, p);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pInfo->timeoutRetryCount += 1;
|
|
||||||
|
|
||||||
// timeout more than 100 sec, add into node update list
|
|
||||||
if (pInfo->timeoutRetryCount > 10) {
|
|
||||||
pInfo->timeoutRetryCount = 0;
|
|
||||||
stDebug("s-task:%s vgId:%d %d downstream task(s) timeout more than 100sec, add into nodeUpate list", id, vgId,
|
|
||||||
numOfTimeout);
|
|
||||||
} else {
|
|
||||||
stDebug("s-task:%s vgId:%d %d downstream task(s) timeout, send check msg again, retry:%d start time:%" PRId64, id,
|
|
||||||
vgId, numOfTimeout, pInfo->timeoutRetryCount, pInfo->startTs);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
taosTmrReset(rspMonitorFn, CHECK_RSP_INTERVAL, pTask, streamTimer, &pInfo->checkRspTmr);
|
taosTmrReset(rspMonitorFn, CHECK_RSP_INTERVAL, pTask, streamTimer, &pInfo->checkRspTmr);
|
||||||
taosThreadMutexUnlock(&pInfo->checkInfoLock);
|
taosThreadMutexUnlock(&pInfo->checkInfoLock);
|
||||||
|
|
||||||
stDebug("s-task:%s continue checking rsp in 300ms, notRsp:%d, notReady:%d, fault:%d, timeout:%d, ready:%d", id,
|
stDebug("s-task:%s continue checking rsp in 300ms, total:%d, notRsp:%d, notReady:%d, fault:%d, timeout:%d, ready:%d",
|
||||||
numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady);
|
id, total, numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady);
|
||||||
|
|
||||||
taosArrayDestroy(pNotReadyList);
|
taosArrayDestroy(pNotReadyList);
|
||||||
taosArrayDestroy(pTimeoutList);
|
taosArrayDestroy(pTimeoutList);
|
||||||
|
|
|
@ -19,7 +19,7 @@
|
||||||
#include "streamInt.h"
|
#include "streamInt.h"
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
UPLOAD_TYPE type;
|
ECHECKPOINT_BACKUP_TYPE type;
|
||||||
char* taskId;
|
char* taskId;
|
||||||
int64_t chkpId;
|
int64_t chkpId;
|
||||||
|
|
||||||
|
@ -416,7 +416,7 @@ int32_t getChkpMeta(char* id, char* path, SArray* list) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t doUploadChkp(void* param) {
|
int32_t uploadCheckpointData(void* param) {
|
||||||
SAsyncUploadArg* arg = param;
|
SAsyncUploadArg* arg = param;
|
||||||
char* path = NULL;
|
char* path = NULL;
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
@ -426,13 +426,13 @@ int32_t doUploadChkp(void* param) {
|
||||||
(int8_t)(arg->type), &path, toDelFiles)) != 0) {
|
(int8_t)(arg->type), &path, toDelFiles)) != 0) {
|
||||||
stError("s-task:%s failed to gen upload checkpoint:%" PRId64 "", arg->pTask->id.idStr, arg->chkpId);
|
stError("s-task:%s failed to gen upload checkpoint:%" PRId64 "", arg->pTask->id.idStr, arg->chkpId);
|
||||||
}
|
}
|
||||||
if (arg->type == UPLOAD_S3) {
|
if (arg->type == DATA_UPLOAD_S3) {
|
||||||
if (code == 0 && (code = getChkpMeta(arg->taskId, path, toDelFiles)) != 0) {
|
if (code == 0 && (code = getChkpMeta(arg->taskId, path, toDelFiles)) != 0) {
|
||||||
stError("s-task:%s failed to get checkpoint:%" PRId64 " meta", arg->pTask->id.idStr, arg->chkpId);
|
stError("s-task:%s failed to get checkpoint:%" PRId64 " meta", arg->pTask->id.idStr, arg->chkpId);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (code == 0 && (code = uploadCheckpoint(arg->taskId, path)) != 0) {
|
if (code == 0 && (code = streamTaskBackupCheckpoint(arg->taskId, path)) != 0) {
|
||||||
stError("s-task:%s failed to upload checkpoint:%" PRId64, arg->pTask->id.idStr, arg->chkpId);
|
stError("s-task:%s failed to upload checkpoint:%" PRId64, arg->pTask->id.idStr, arg->chkpId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -459,8 +459,8 @@ int32_t doUploadChkp(void* param) {
|
||||||
|
|
||||||
int32_t streamTaskUploadChkp(SStreamTask* pTask, int64_t chkpId, char* taskId) {
|
int32_t streamTaskUploadChkp(SStreamTask* pTask, int64_t chkpId, char* taskId) {
|
||||||
// async upload
|
// async upload
|
||||||
UPLOAD_TYPE type = getUploadType();
|
ECHECKPOINT_BACKUP_TYPE type = streamGetCheckpointBackupType();
|
||||||
if (type == UPLOAD_DISABLE) {
|
if (type == DATA_UPLOAD_DISABLE) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -474,7 +474,7 @@ int32_t streamTaskUploadChkp(SStreamTask* pTask, int64_t chkpId, char* taskId) {
|
||||||
arg->chkpId = chkpId;
|
arg->chkpId = chkpId;
|
||||||
arg->pTask = pTask;
|
arg->pTask = pTask;
|
||||||
|
|
||||||
return streamMetaAsyncExec(pTask->pMeta, doUploadChkp, arg, NULL);
|
return streamMetaAsyncExec(pTask->pMeta, uploadCheckpointData, arg, NULL);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) {
|
int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) {
|
||||||
|
@ -558,7 +558,7 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int uploadCheckpointToS3(char* id, char* path) {
|
static int32_t uploadCheckpointToS3(char* id, char* path) {
|
||||||
TdDirPtr pDir = taosOpenDir(path);
|
TdDirPtr pDir = taosOpenDir(path);
|
||||||
if (pDir == NULL) return -1;
|
if (pDir == NULL) return -1;
|
||||||
|
|
||||||
|
@ -590,8 +590,8 @@ static int uploadCheckpointToS3(char* id, char* path) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int downloadCheckpointByNameS3(char* id, char* fname, char* dstName) {
|
static int32_t downloadCheckpointByNameS3(char* id, char* fname, char* dstName) {
|
||||||
int code = 0;
|
int32_t code = 0;
|
||||||
char* buf = taosMemoryCalloc(1, strlen(id) + strlen(dstName) + 4);
|
char* buf = taosMemoryCalloc(1, strlen(id) + strlen(dstName) + 4);
|
||||||
sprintf(buf, "%s/%s", id, fname);
|
sprintf(buf, "%s/%s", id, fname);
|
||||||
if (s3GetObjectToFile(buf, dstName) != 0) {
|
if (s3GetObjectToFile(buf, dstName) != 0) {
|
||||||
|
@ -601,19 +601,19 @@ static int downloadCheckpointByNameS3(char* id, char* fname, char* dstName) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
UPLOAD_TYPE getUploadType() {
|
ECHECKPOINT_BACKUP_TYPE streamGetCheckpointBackupType() {
|
||||||
if (strlen(tsSnodeAddress) != 0) {
|
if (strlen(tsSnodeAddress) != 0) {
|
||||||
return UPLOAD_RSYNC;
|
return DATA_UPLOAD_RSYNC;
|
||||||
} else if (tsS3StreamEnabled) {
|
} else if (tsS3StreamEnabled) {
|
||||||
return UPLOAD_S3;
|
return DATA_UPLOAD_S3;
|
||||||
} else {
|
} else {
|
||||||
return UPLOAD_DISABLE;
|
return DATA_UPLOAD_DISABLE;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int uploadCheckpoint(char* id, char* path) {
|
int32_t streamTaskBackupCheckpoint(char* id, char* path) {
|
||||||
if (id == NULL || path == NULL || strlen(id) == 0 || strlen(path) == 0 || strlen(path) >= PATH_MAX) {
|
if (id == NULL || path == NULL || strlen(id) == 0 || strlen(path) == 0 || strlen(path) >= PATH_MAX) {
|
||||||
stError("uploadCheckpoint parameters invalid");
|
stError("streamTaskBackupCheckpoint parameters invalid");
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
if (strlen(tsSnodeAddress) != 0) {
|
if (strlen(tsSnodeAddress) != 0) {
|
||||||
|
@ -625,7 +625,7 @@ int uploadCheckpoint(char* id, char* path) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// fileName: CURRENT
|
// fileName: CURRENT
|
||||||
int downloadCheckpointByName(char* id, char* fname, char* dstName) {
|
int32_t downloadCheckpointByName(char* id, char* fname, char* dstName) {
|
||||||
if (id == NULL || fname == NULL || strlen(id) == 0 || strlen(fname) == 0 || strlen(fname) >= PATH_MAX) {
|
if (id == NULL || fname == NULL || strlen(id) == 0 || strlen(fname) == 0 || strlen(fname) >= PATH_MAX) {
|
||||||
stError("uploadCheckpointByName parameters invalid");
|
stError("uploadCheckpointByName parameters invalid");
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -638,7 +638,7 @@ int downloadCheckpointByName(char* id, char* fname, char* dstName) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int downloadCheckpoint(char* id, char* path) {
|
int32_t downloadCheckpoint(char* id, char* path) {
|
||||||
if (id == NULL || path == NULL || strlen(id) == 0 || strlen(path) == 0 || strlen(path) >= PATH_MAX) {
|
if (id == NULL || path == NULL || strlen(id) == 0 || strlen(path) == 0 || strlen(path) >= PATH_MAX) {
|
||||||
stError("downloadCheckpoint parameters invalid");
|
stError("downloadCheckpoint parameters invalid");
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -651,7 +651,7 @@ int downloadCheckpoint(char* id, char* path) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int deleteCheckpoint(char* id) {
|
int32_t deleteCheckpoint(char* id) {
|
||||||
if (id == NULL || strlen(id) == 0) {
|
if (id == NULL || strlen(id) == 0) {
|
||||||
stError("deleteCheckpoint parameters invalid");
|
stError("deleteCheckpoint parameters invalid");
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -664,7 +664,7 @@ int deleteCheckpoint(char* id) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int deleteCheckpointFile(char* id, char* name) {
|
int32_t deleteCheckpointFile(char* id, char* name) {
|
||||||
char object[128] = {0};
|
char object[128] = {0};
|
||||||
snprintf(object, sizeof(object), "%s/%s", id, name);
|
snprintf(object, sizeof(object), "%s/%s", id, name);
|
||||||
char* tmp = object;
|
char* tmp = object;
|
||||||
|
|
|
@ -17,7 +17,6 @@
|
||||||
|
|
||||||
#define MAX_STREAM_EXEC_BATCH_NUM 32
|
#define MAX_STREAM_EXEC_BATCH_NUM 32
|
||||||
#define MAX_SMOOTH_BURST_RATIO 5 // 5 sec
|
#define MAX_SMOOTH_BURST_RATIO 5 // 5 sec
|
||||||
#define WAIT_FOR_DURATION 10
|
|
||||||
|
|
||||||
// todo refactor:
|
// todo refactor:
|
||||||
// read data from input queue
|
// read data from input queue
|
||||||
|
|
Loading…
Reference in New Issue