fix(stream): discard the repeatly recv checkpoint-source msg.

This commit is contained in:
Haojun Liao 2024-04-26 15:56:34 +08:00
parent 3961903fea
commit 8b53f76691
7 changed files with 53 additions and 42 deletions

View File

@ -271,7 +271,7 @@ int32_t tsTtlBatchDropNum = 10000; // number of tables dropped per batch
int32_t tsTransPullupInterval = 2; int32_t tsTransPullupInterval = 2;
int32_t tsCompactPullupInterval = 10; int32_t tsCompactPullupInterval = 10;
int32_t tsMqRebalanceInterval = 2; int32_t tsMqRebalanceInterval = 2;
int32_t tsStreamCheckpointInterval = 300; int32_t tsStreamCheckpointInterval = 60;
float tsSinkDataRate = 2.0; float tsSinkDataRate = 2.0;
int32_t tsStreamNodeCheckInterval = 16; int32_t tsStreamNodeCheckInterval = 16;
int32_t tsTtlUnit = 86400; int32_t tsTtlUnit = 86400;

View File

@ -847,7 +847,7 @@ int64_t mndStreamGenChkptId(SMnode *pMnode, bool lock) {
if (pIter == NULL) break; if (pIter == NULL) break;
maxChkptId = TMAX(maxChkptId, pStream->checkpointId); maxChkptId = TMAX(maxChkptId, pStream->checkpointId);
mDebug("stream:%p, %s id:%" PRIx64 "checkpoint %" PRId64 "", pStream, pStream->name, pStream->uid, mDebug("stream:%p, %s id:0x%" PRIx64 " checkpoint %" PRId64 "", pStream, pStream->name, pStream->uid,
pStream->checkpointId); pStream->checkpointId);
sdbRelease(pSdb, pStream); sdbRelease(pSdb, pStream);
} }

View File

@ -1156,14 +1156,24 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
// check if the checkpoint msg already sent or not. // check if the checkpoint msg already sent or not.
if (status == TASK_STATUS__CK) { if (status == TASK_STATUS__CK) {
tqWarn("s-task:%s recv checkpoint-source msg again checkpointId:%" PRId64 tqWarn("s-task:%s repeatly recv checkpoint-source msg checkpointId:%" PRId64
" transId:%d already received, ignore this msg and continue process checkpoint", " transId:%d already handled, ignore msg and continue process checkpoint",
pTask->id.idStr, pTask->chkInfo.checkpointingId, req.transId); pTask->id.idStr, pTask->chkInfo.checkpointingId, req.transId);
taosThreadMutexUnlock(&pTask->lock); taosThreadMutexUnlock(&pTask->lock);
streamMetaReleaseTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pTask);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} else { // checkpoint already finished, and not in checkpoint status
if (req.checkpointId == pTask->chkInfo.checkpointId) {
tqWarn("s-task:%s repeatly recv checkpoint-source msg checkpointId:%" PRId64
" transId:%d already handled, ignore and discard", pTask->id.idStr, req.checkpointId, req.transId);
taosThreadMutexUnlock(&pTask->lock);
streamMetaReleaseTask(pMeta, pTask);
return TSDB_CODE_SUCCESS;
}
} }
streamProcessCheckpointSourceReq(pTask, &req); streamProcessCheckpointSourceReq(pTask, &req);

View File

@ -69,6 +69,7 @@ typedef struct {
int64_t chkpId; int64_t chkpId;
char* dbPrefixPath; char* dbPrefixPath;
} SStreamTaskSnap; } SStreamTaskSnap;
struct STokenBucket { struct STokenBucket {
int32_t numCapacity; // total capacity, available token per second int32_t numCapacity; // total capacity, available token per second
int32_t numOfToken; // total available tokens int32_t numOfToken; // total available tokens
@ -148,18 +149,19 @@ int32_t streamQueueGetItemSize(const SStreamQueue* pQueue);
void streamMetaRemoveDB(void* arg, char* key); void streamMetaRemoveDB(void* arg, char* key);
typedef enum UPLOAD_TYPE { typedef enum ECHECKPOINT_BACKUP_TYPE {
UPLOAD_DISABLE = -1, DATA_UPLOAD_DISABLE = -1,
UPLOAD_S3 = 0, DATA_UPLOAD_S3 = 0,
UPLOAD_RSYNC = 1, DATA_UPLOAD_RSYNC = 1,
} UPLOAD_TYPE; } ECHECKPOINT_BACKUP_TYPE;
UPLOAD_TYPE getUploadType(); ECHECKPOINT_BACKUP_TYPE streamGetCheckpointBackupType();
int uploadCheckpoint(char* id, char* path);
int downloadCheckpoint(char* id, char* path); int32_t streamTaskBackupCheckpoint(char* id, char* path);
int deleteCheckpoint(char* id); int32_t downloadCheckpoint(char* id, char* path);
int deleteCheckpointFile(char* id, char* name); int32_t deleteCheckpoint(char* id);
int downloadCheckpointByName(char* id, char* fname, char* dstName); int32_t deleteCheckpointFile(char* id, char* name);
int32_t downloadCheckpointByName(char* id, char* fname, char* dstName);
int32_t streamTaskOnNormalTaskReady(SStreamTask* pTask); int32_t streamTaskOnNormalTaskReady(SStreamTask* pTask);
int32_t streamTaskOnScanhistoryTaskReady(SStreamTask* pTask); int32_t streamTaskOnScanhistoryTaskReady(SStreamTask* pTask);

View File

@ -376,10 +376,10 @@ int32_t rebuildFromRemoteChkp_s3(char* key, char* chkpPath, int64_t chkpId, char
return code; return code;
} }
int32_t rebuildFromRemoteChkp(char* key, char* chkpPath, int64_t chkpId, char* defaultPath) { int32_t rebuildFromRemoteChkp(char* key, char* chkpPath, int64_t chkpId, char* defaultPath) {
UPLOAD_TYPE type = getUploadType(); ECHECKPOINT_BACKUP_TYPE type = streamGetCheckpointBackupType();
if (type == UPLOAD_S3) { if (type == DATA_UPLOAD_S3) {
return rebuildFromRemoteChkp_s3(key, chkpPath, chkpId, defaultPath); return rebuildFromRemoteChkp_s3(key, chkpPath, chkpId, defaultPath);
} else if (type == UPLOAD_RSYNC) { } else if (type == DATA_UPLOAD_RSYNC) {
return rebuildFromRemoteChkp_rsync(key, chkpPath, chkpId, defaultPath); return rebuildFromRemoteChkp_rsync(key, chkpPath, chkpId, defaultPath);
} }
return -1; return -1;
@ -2111,11 +2111,11 @@ int32_t taskDbGenChkpUploadData__s3(STaskDbWrapper* pDb, void* bkdChkpMgt, int64
} }
int32_t taskDbGenChkpUploadData(void* arg, void* mgt, int64_t chkpId, int8_t type, char** path, SArray* list) { int32_t taskDbGenChkpUploadData(void* arg, void* mgt, int64_t chkpId, int8_t type, char** path, SArray* list) {
STaskDbWrapper* pDb = arg; STaskDbWrapper* pDb = arg;
UPLOAD_TYPE utype = type; ECHECKPOINT_BACKUP_TYPE utype = type;
if (utype == UPLOAD_RSYNC) { if (utype == DATA_UPLOAD_RSYNC) {
return taskDbGenChkpUploadData__rsync(pDb, chkpId, path); return taskDbGenChkpUploadData__rsync(pDb, chkpId, path);
} else if (utype == UPLOAD_S3) { } else if (utype == DATA_UPLOAD_S3) {
return taskDbGenChkpUploadData__s3(pDb, mgt, chkpId, path, list); return taskDbGenChkpUploadData__s3(pDb, mgt, chkpId, path, list);
} }
return -1; return -1;

View File

@ -19,7 +19,7 @@
#include "streamInt.h" #include "streamInt.h"
typedef struct { typedef struct {
UPLOAD_TYPE type; ECHECKPOINT_BACKUP_TYPE type;
char* taskId; char* taskId;
int64_t chkpId; int64_t chkpId;
@ -416,7 +416,7 @@ int32_t getChkpMeta(char* id, char* path, SArray* list) {
return code; return code;
} }
int32_t doUploadChkp(void* param) { int32_t uploadCheckpointData(void* param) {
SAsyncUploadArg* arg = param; SAsyncUploadArg* arg = param;
char* path = NULL; char* path = NULL;
int32_t code = 0; int32_t code = 0;
@ -426,13 +426,13 @@ int32_t doUploadChkp(void* param) {
(int8_t)(arg->type), &path, toDelFiles)) != 0) { (int8_t)(arg->type), &path, toDelFiles)) != 0) {
stError("s-task:%s failed to gen upload checkpoint:%" PRId64 "", arg->pTask->id.idStr, arg->chkpId); stError("s-task:%s failed to gen upload checkpoint:%" PRId64 "", arg->pTask->id.idStr, arg->chkpId);
} }
if (arg->type == UPLOAD_S3) { if (arg->type == DATA_UPLOAD_S3) {
if (code == 0 && (code = getChkpMeta(arg->taskId, path, toDelFiles)) != 0) { if (code == 0 && (code = getChkpMeta(arg->taskId, path, toDelFiles)) != 0) {
stError("s-task:%s failed to get checkpoint:%" PRId64 " meta", arg->pTask->id.idStr, arg->chkpId); stError("s-task:%s failed to get checkpoint:%" PRId64 " meta", arg->pTask->id.idStr, arg->chkpId);
} }
} }
if (code == 0 && (code = uploadCheckpoint(arg->taskId, path)) != 0) { if (code == 0 && (code = streamTaskBackupCheckpoint(arg->taskId, path)) != 0) {
stError("s-task:%s failed to upload checkpoint:%" PRId64, arg->pTask->id.idStr, arg->chkpId); stError("s-task:%s failed to upload checkpoint:%" PRId64, arg->pTask->id.idStr, arg->chkpId);
} }
@ -459,8 +459,8 @@ int32_t doUploadChkp(void* param) {
int32_t streamTaskUploadChkp(SStreamTask* pTask, int64_t chkpId, char* taskId) { int32_t streamTaskUploadChkp(SStreamTask* pTask, int64_t chkpId, char* taskId) {
// async upload // async upload
UPLOAD_TYPE type = getUploadType(); ECHECKPOINT_BACKUP_TYPE type = streamGetCheckpointBackupType();
if (type == UPLOAD_DISABLE) { if (type == DATA_UPLOAD_DISABLE) {
return 0; return 0;
} }
@ -474,7 +474,7 @@ int32_t streamTaskUploadChkp(SStreamTask* pTask, int64_t chkpId, char* taskId) {
arg->chkpId = chkpId; arg->chkpId = chkpId;
arg->pTask = pTask; arg->pTask = pTask;
return streamMetaAsyncExec(pTask->pMeta, doUploadChkp, arg, NULL); return streamMetaAsyncExec(pTask->pMeta, uploadCheckpointData, arg, NULL);
} }
int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) { int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) {
@ -558,7 +558,7 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) {
return code; return code;
} }
static int uploadCheckpointToS3(char* id, char* path) { static int32_t uploadCheckpointToS3(char* id, char* path) {
TdDirPtr pDir = taosOpenDir(path); TdDirPtr pDir = taosOpenDir(path);
if (pDir == NULL) return -1; if (pDir == NULL) return -1;
@ -590,8 +590,8 @@ static int uploadCheckpointToS3(char* id, char* path) {
return 0; return 0;
} }
static int downloadCheckpointByNameS3(char* id, char* fname, char* dstName) { static int32_t downloadCheckpointByNameS3(char* id, char* fname, char* dstName) {
int code = 0; int32_t code = 0;
char* buf = taosMemoryCalloc(1, strlen(id) + strlen(dstName) + 4); char* buf = taosMemoryCalloc(1, strlen(id) + strlen(dstName) + 4);
sprintf(buf, "%s/%s", id, fname); sprintf(buf, "%s/%s", id, fname);
if (s3GetObjectToFile(buf, dstName) != 0) { if (s3GetObjectToFile(buf, dstName) != 0) {
@ -601,19 +601,19 @@ static int downloadCheckpointByNameS3(char* id, char* fname, char* dstName) {
return code; return code;
} }
UPLOAD_TYPE getUploadType() { ECHECKPOINT_BACKUP_TYPE streamGetCheckpointBackupType() {
if (strlen(tsSnodeAddress) != 0) { if (strlen(tsSnodeAddress) != 0) {
return UPLOAD_RSYNC; return DATA_UPLOAD_RSYNC;
} else if (tsS3StreamEnabled) { } else if (tsS3StreamEnabled) {
return UPLOAD_S3; return DATA_UPLOAD_S3;
} else { } else {
return UPLOAD_DISABLE; return DATA_UPLOAD_DISABLE;
} }
} }
int uploadCheckpoint(char* id, char* path) { int32_t streamTaskBackupCheckpoint(char* id, char* path) {
if (id == NULL || path == NULL || strlen(id) == 0 || strlen(path) == 0 || strlen(path) >= PATH_MAX) { if (id == NULL || path == NULL || strlen(id) == 0 || strlen(path) == 0 || strlen(path) >= PATH_MAX) {
stError("uploadCheckpoint parameters invalid"); stError("streamTaskBackupCheckpoint parameters invalid");
return -1; return -1;
} }
if (strlen(tsSnodeAddress) != 0) { if (strlen(tsSnodeAddress) != 0) {
@ -625,7 +625,7 @@ int uploadCheckpoint(char* id, char* path) {
} }
// fileName: CURRENT // fileName: CURRENT
int downloadCheckpointByName(char* id, char* fname, char* dstName) { int32_t downloadCheckpointByName(char* id, char* fname, char* dstName) {
if (id == NULL || fname == NULL || strlen(id) == 0 || strlen(fname) == 0 || strlen(fname) >= PATH_MAX) { if (id == NULL || fname == NULL || strlen(id) == 0 || strlen(fname) == 0 || strlen(fname) >= PATH_MAX) {
stError("uploadCheckpointByName parameters invalid"); stError("uploadCheckpointByName parameters invalid");
return -1; return -1;
@ -638,7 +638,7 @@ int downloadCheckpointByName(char* id, char* fname, char* dstName) {
return 0; return 0;
} }
int downloadCheckpoint(char* id, char* path) { int32_t downloadCheckpoint(char* id, char* path) {
if (id == NULL || path == NULL || strlen(id) == 0 || strlen(path) == 0 || strlen(path) >= PATH_MAX) { if (id == NULL || path == NULL || strlen(id) == 0 || strlen(path) == 0 || strlen(path) >= PATH_MAX) {
stError("downloadCheckpoint parameters invalid"); stError("downloadCheckpoint parameters invalid");
return -1; return -1;
@ -651,7 +651,7 @@ int downloadCheckpoint(char* id, char* path) {
return 0; return 0;
} }
int deleteCheckpoint(char* id) { int32_t deleteCheckpoint(char* id) {
if (id == NULL || strlen(id) == 0) { if (id == NULL || strlen(id) == 0) {
stError("deleteCheckpoint parameters invalid"); stError("deleteCheckpoint parameters invalid");
return -1; return -1;
@ -664,7 +664,7 @@ int deleteCheckpoint(char* id) {
return 0; return 0;
} }
int deleteCheckpointFile(char* id, char* name) { int32_t deleteCheckpointFile(char* id, char* name) {
char object[128] = {0}; char object[128] = {0};
snprintf(object, sizeof(object), "%s/%s", id, name); snprintf(object, sizeof(object), "%s/%s", id, name);
char* tmp = object; char* tmp = object;

View File

@ -17,7 +17,6 @@
#define MAX_STREAM_EXEC_BATCH_NUM 32 #define MAX_STREAM_EXEC_BATCH_NUM 32
#define MAX_SMOOTH_BURST_RATIO 5 // 5 sec #define MAX_SMOOTH_BURST_RATIO 5 // 5 sec
#define WAIT_FOR_DURATION 10
// todo refactor: // todo refactor:
// read data from input queue // read data from input queue