From 3f403569eb7de14c59c4a6f280a4ded70ade2a76 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Fri, 11 Oct 2024 15:57:47 +0800 Subject: [PATCH] tcs/stream: use tcs interface instead of s3 --- source/libs/stream/CMakeLists.txt | 1 + source/libs/stream/src/streamCheckpoint.c | 62 +++++++++++------------ 2 files changed, 32 insertions(+), 31 deletions(-) diff --git a/source/libs/stream/CMakeLists.txt b/source/libs/stream/CMakeLists.txt index b63a8b3900..f08b16f836 100644 --- a/source/libs/stream/CMakeLists.txt +++ b/source/libs/stream/CMakeLists.txt @@ -3,6 +3,7 @@ add_library(stream STATIC ${STREAM_SRC}) target_include_directories( stream PUBLIC "${TD_SOURCE_DIR}/include/libs/stream" + PUBLIC "${TD_SOURCE_DIR}/include/libs/tcs" PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc" ) diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index e44bca123b..3b730e40cc 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -13,10 +13,10 @@ * along with this program. If not, see . */ -#include "cos.h" #include "rsync.h" #include "streamBackendRocksdb.h" #include "streamInt.h" +#include "tcs.h" static int32_t downloadCheckpointDataByName(const char* id, const char* fname, const char* dstName); static int32_t deleteCheckpointFile(const char* id, const char* name); @@ -343,7 +343,7 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock // And if we don't start a new timer, and the lost of checkpoint-trigger message may cause the whole checkpoint // procedure to be stucked. SStreamTmrInfo* pTmrInfo = &pActiveInfo->chkptTriggerMsgTmr; - int8_t old = atomic_val_compare_exchange_8(&pTmrInfo->isActive, 0, 1); + int8_t old = atomic_val_compare_exchange_8(&pTmrInfo->isActive, 0, 1); if (old == 0) { int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); stDebug("s-task:%s start checkpoint-trigger monitor in 10s, ref:%d ", pTask->id.idStr, ref); @@ -351,7 +351,7 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock streamTmrStart(checkpointTriggerMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId, "trigger-recv-monitor"); pTmrInfo->launchChkptId = pActiveInfo->activeId; - } else { // already launched, do nothing + } else { // already launched, do nothing stError("s-task:%s previous checkpoint-trigger monitor tmr is set, not start new one", pTask->id.idStr); } } @@ -372,10 +372,10 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock if (type == TASK_OUTPUT__FIXED_DISPATCH || type == TASK_OUTPUT__SHUFFLE_DISPATCH) { stDebug("s-task:%s set childIdx:%d, and add checkpoint-trigger block into outputQ", id, pTask->info.selfChildId); - code = continueDispatchCheckpointTriggerBlock(pBlock, pTask); // todo handle this failure + code = continueDispatchCheckpointTriggerBlock(pBlock, pTask); // todo handle this failure } else { // only one task exists, no need to dispatch downstream info - code = appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT, pActiveInfo->activeId, pActiveInfo->transId, - -1); + code = + appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT, pActiveInfo->activeId, pActiveInfo->transId, -1); streamFreeQitem((SStreamQueueItem*)pBlock); } } else if (taskLevel == TASK_LEVEL__SINK || taskLevel == TASK_LEVEL__AGG) { @@ -398,8 +398,8 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock if (taskLevel == TASK_LEVEL__SINK) { stDebug("s-task:%s process checkpoint-trigger block, all %d upstreams sent, send ready msg to upstream", id, num); streamFreeQitem((SStreamQueueItem*)pBlock); - code = streamTaskBuildCheckpoint(pTask); // todo: not handle error yet - } else { // source & agg tasks need to forward the checkpoint msg downwards + code = streamTaskBuildCheckpoint(pTask); // todo: not handle error yet + } else { // source & agg tasks need to forward the checkpoint msg downwards stDebug("s-task:%s process checkpoint-trigger block, all %d upstreams sent, forwards to downstream", id, num); code = flushStateDataInExecutor(pTask, (SStreamQueueItem*)pBlock); if (code) { @@ -444,7 +444,7 @@ static int32_t processCheckpointReadyHelp(SActiveCheckpointInfo* pInfo, int32_t .transId = pInfo->transId, .streamId = streamId, .downstreamNodeId = downstreamNodeId}; - void* p = taosArrayPush(pInfo->pCheckpointReadyRecvList, &info); + void* p = taosArrayPush(pInfo->pCheckpointReadyRecvList, &info); if (p == NULL) { stError("s-task:%s failed to set checkpoint ready recv msg, code:%s", id, tstrerror(terrno)); return terrno; @@ -559,8 +559,8 @@ void streamTaskClearCheckInfo(SStreamTask* pTask, bool clearChkpReadyMsg) { } streamMutexUnlock(&pInfo->lock); - stDebug("s-task:%s clear active checkpointInfo, failed checkpointId:%"PRId64", current checkpointId:%"PRId64, - pTask->id.idStr, pInfo->failedId, pTask->chkInfo.checkpointId); + stDebug("s-task:%s clear active checkpointInfo, failed checkpointId:%" PRId64 ", current checkpointId:%" PRId64, + pTask->id.idStr, pInfo->failedId, pTask->chkInfo.checkpointId); } int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SVUpdateCheckpointInfoReq* pReq) { @@ -574,8 +574,7 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV if (pReq->checkpointId <= pInfo->checkpointId) { stDebug("s-task:%s vgId:%d latest checkpointId:%" PRId64 " Ver:%" PRId64 - " no need to update checkpoint info, updated checkpointId:%" PRId64 " Ver:%" PRId64 - " transId:%d ignored", + " no need to update checkpoint info, updated checkpointId:%" PRId64 " Ver:%" PRId64 " transId:%d ignored", id, vgId, pInfo->checkpointId, pInfo->checkpointVer, pReq->checkpointId, pReq->checkpointVer, pReq->transId); streamMutexUnlock(&pTask->lock); @@ -622,7 +621,7 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV } bool valid = (pInfo->checkpointId <= pReq->checkpointId && pInfo->checkpointVer <= pReq->checkpointVer && - pInfo->processedVer <= pReq->checkpointVer); + pInfo->processedVer <= pReq->checkpointVer); if (!valid) { stFatal("invalid checkpoint id check, current checkpointId:%" PRId64 " checkpointVer:%" PRId64 @@ -907,7 +906,7 @@ static int32_t doChkptStatusCheck(SStreamTask* pTask) { if (pTmrInfo->launchChkptId != pActiveInfo->activeId) { int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask); stWarn("s-task:%s vgId:%d checkpoint-trigger retrieve by previous checkpoint procedure, checkpointId:%" PRId64 - ", quit, ref:%d", + ", quit, ref:%d", id, vgId, pTmrInfo->launchChkptId, ref); return -1; } @@ -1004,7 +1003,7 @@ void checkpointTriggerMonitorFn(void* param, void* tmrId) { int32_t numOfNotSend = 0; SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo; - SStreamTmrInfo* pTmrInfo = &pActiveInfo->chkptTriggerMsgTmr; + SStreamTmrInfo* pTmrInfo = &pActiveInfo->chkptTriggerMsgTmr; if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask); @@ -1022,7 +1021,8 @@ void checkpointTriggerMonitorFn(void* param, void* tmrId) { } if (++pTmrInfo->activeCounter < 50) { - streamTmrStart(checkpointTriggerMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId, "trigger-recv-monitor"); + streamTmrStart(checkpointTriggerMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId, + "trigger-recv-monitor"); return; } @@ -1200,8 +1200,8 @@ int32_t streamTaskInitTriggerDispatchInfo(SStreamTask* pTask) { STaskDispatcherFixed* pDispatch = &pTask->outputInfo.fixedDispatcher; STaskTriggerSendInfo p = {.sendTs = now, .recved = false, .nodeId = pDispatch->nodeId, .taskId = pDispatch->taskId}; - void* px = taosArrayPush(pInfo->pDispatchTriggerList, &p); - if (px == NULL) { // pause the stream task, if memory not enough + void* px = taosArrayPush(pInfo->pDispatchTriggerList, &p); + if (px == NULL) { // pause the stream task, if memory not enough code = terrno; } } else { @@ -1212,8 +1212,8 @@ int32_t streamTaskInitTriggerDispatchInfo(SStreamTask* pTask) { } STaskTriggerSendInfo p = {.sendTs = now, .recved = false, .nodeId = pVgInfo->vgId, .taskId = pVgInfo->taskId}; - void* px = taosArrayPush(pInfo->pDispatchTriggerList, &p); - if (px == NULL) { // pause the stream task, if memory not enough + void* px = taosArrayPush(pInfo->pDispatchTriggerList, &p); + if (px == NULL) { // pause the stream task, if memory not enough code = terrno; break; } @@ -1287,11 +1287,11 @@ void streamTaskSetTriggerDispatchConfirmed(SStreamTask* pTask, int32_t vgId) { static int32_t uploadCheckpointToS3(const char* id, const char* path) { int32_t code = 0; int32_t nBytes = 0; - + /* if (s3Init() != 0) { return TSDB_CODE_THIRDPARTY_ERROR; } - + */ TdDirPtr pDir = taosOpenDir(path); if (pDir == NULL) { return terrno; @@ -1324,11 +1324,11 @@ static int32_t uploadCheckpointToS3(const char* id, const char* path) { break; } - code = s3PutObjectFromFile2(filename, object, 0); + code = tcsPutObjectFromFile2(filename, object, 0); if (code != 0) { - stError("[s3] failed to upload checkpoint:%s, reason:%s", filename, tstrerror(code)); + stError("[tcs] failed to upload checkpoint:%s, reason:%s", filename, tstrerror(code)); } else { - stDebug("[s3] upload checkpoint:%s", filename); + stDebug("[tcs] upload checkpoint:%s", filename); } } @@ -1354,7 +1354,7 @@ int32_t downloadCheckpointByNameS3(const char* id, const char* fname, const char taosMemoryFree(buf); return TSDB_CODE_OUT_OF_RANGE; } - int32_t code = s3GetObjectToFile(buf, dstName); + int32_t code = tcsGetObjectToFile(buf, dstName); if (code != 0) { taosMemoryFree(buf); return TAOS_SYSTEM_ERROR(errno); @@ -1417,7 +1417,7 @@ int32_t streamTaskDownloadCheckpointData(const char* id, char* path, int64_t che if (strlen(tsSnodeAddress) != 0) { return downloadByRsync(id, path, checkpointId); } else if (tsS3StreamEnabled) { - return s3GetObjectsByPrefix(id, path); + return tcsGetObjectsByPrefix(id, path); } return 0; @@ -1431,7 +1431,7 @@ int32_t deleteCheckpoint(const char* id) { if (strlen(tsSnodeAddress) != 0) { return deleteRsync(id); } else if (tsS3StreamEnabled) { - s3DeleteObjectsByPrefix(id); + tcsDeleteObjectsByPrefix(id); } return 0; } @@ -1445,7 +1445,7 @@ int32_t deleteCheckpointFile(const char* id, const char* name) { } char* tmp = object; - int32_t code = s3DeleteObjects((const char**)&tmp, 1); + int32_t code = tcsDeleteObjects((const char**)&tmp, 1); if (code != 0) { return TSDB_CODE_THIRDPARTY_ERROR; } @@ -1487,4 +1487,4 @@ int32_t streamTaskSendCheckpointsourceRsp(SStreamTask* pTask) { streamMutexUnlock(&pTask->lock); return code; -} \ No newline at end of file +}