tcs/stream: use tcs interface instead of s3

This commit is contained in:
Minglei Jin 2024-10-11 15:57:47 +08:00
parent 6d6568c18f
commit 3f403569eb
2 changed files with 32 additions and 31 deletions

View File

@ -3,6 +3,7 @@ add_library(stream STATIC ${STREAM_SRC})
target_include_directories( target_include_directories(
stream stream
PUBLIC "${TD_SOURCE_DIR}/include/libs/stream" PUBLIC "${TD_SOURCE_DIR}/include/libs/stream"
PUBLIC "${TD_SOURCE_DIR}/include/libs/tcs"
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc" PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
) )

View File

@ -13,10 +13,10 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "cos.h"
#include "rsync.h" #include "rsync.h"
#include "streamBackendRocksdb.h" #include "streamBackendRocksdb.h"
#include "streamInt.h" #include "streamInt.h"
#include "tcs.h"
static int32_t downloadCheckpointDataByName(const char* id, const char* fname, const char* dstName); static int32_t downloadCheckpointDataByName(const char* id, const char* fname, const char* dstName);
static int32_t deleteCheckpointFile(const char* id, const char* name); static int32_t deleteCheckpointFile(const char* id, const char* name);
@ -343,7 +343,7 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock
// And if we don't start a new timer, and the lost of checkpoint-trigger message may cause the whole checkpoint // And if we don't start a new timer, and the lost of checkpoint-trigger message may cause the whole checkpoint
// procedure to be stucked. // procedure to be stucked.
SStreamTmrInfo* pTmrInfo = &pActiveInfo->chkptTriggerMsgTmr; SStreamTmrInfo* pTmrInfo = &pActiveInfo->chkptTriggerMsgTmr;
int8_t old = atomic_val_compare_exchange_8(&pTmrInfo->isActive, 0, 1); int8_t old = atomic_val_compare_exchange_8(&pTmrInfo->isActive, 0, 1);
if (old == 0) { if (old == 0) {
int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s start checkpoint-trigger monitor in 10s, ref:%d ", pTask->id.idStr, ref); stDebug("s-task:%s start checkpoint-trigger monitor in 10s, ref:%d ", pTask->id.idStr, ref);
@ -351,7 +351,7 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock
streamTmrStart(checkpointTriggerMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId, streamTmrStart(checkpointTriggerMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId,
"trigger-recv-monitor"); "trigger-recv-monitor");
pTmrInfo->launchChkptId = pActiveInfo->activeId; pTmrInfo->launchChkptId = pActiveInfo->activeId;
} else { // already launched, do nothing } else { // already launched, do nothing
stError("s-task:%s previous checkpoint-trigger monitor tmr is set, not start new one", pTask->id.idStr); stError("s-task:%s previous checkpoint-trigger monitor tmr is set, not start new one", pTask->id.idStr);
} }
} }
@ -372,10 +372,10 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock
if (type == TASK_OUTPUT__FIXED_DISPATCH || type == TASK_OUTPUT__SHUFFLE_DISPATCH) { if (type == TASK_OUTPUT__FIXED_DISPATCH || type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
stDebug("s-task:%s set childIdx:%d, and add checkpoint-trigger block into outputQ", id, pTask->info.selfChildId); stDebug("s-task:%s set childIdx:%d, and add checkpoint-trigger block into outputQ", id, pTask->info.selfChildId);
code = continueDispatchCheckpointTriggerBlock(pBlock, pTask); // todo handle this failure code = continueDispatchCheckpointTriggerBlock(pBlock, pTask); // todo handle this failure
} else { // only one task exists, no need to dispatch downstream info } else { // only one task exists, no need to dispatch downstream info
code = appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT, pActiveInfo->activeId, pActiveInfo->transId, code =
-1); appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT, pActiveInfo->activeId, pActiveInfo->transId, -1);
streamFreeQitem((SStreamQueueItem*)pBlock); streamFreeQitem((SStreamQueueItem*)pBlock);
} }
} else if (taskLevel == TASK_LEVEL__SINK || taskLevel == TASK_LEVEL__AGG) { } else if (taskLevel == TASK_LEVEL__SINK || taskLevel == TASK_LEVEL__AGG) {
@ -398,8 +398,8 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock
if (taskLevel == TASK_LEVEL__SINK) { if (taskLevel == TASK_LEVEL__SINK) {
stDebug("s-task:%s process checkpoint-trigger block, all %d upstreams sent, send ready msg to upstream", id, num); stDebug("s-task:%s process checkpoint-trigger block, all %d upstreams sent, send ready msg to upstream", id, num);
streamFreeQitem((SStreamQueueItem*)pBlock); streamFreeQitem((SStreamQueueItem*)pBlock);
code = streamTaskBuildCheckpoint(pTask); // todo: not handle error yet code = streamTaskBuildCheckpoint(pTask); // todo: not handle error yet
} else { // source & agg tasks need to forward the checkpoint msg downwards } else { // source & agg tasks need to forward the checkpoint msg downwards
stDebug("s-task:%s process checkpoint-trigger block, all %d upstreams sent, forwards to downstream", id, num); stDebug("s-task:%s process checkpoint-trigger block, all %d upstreams sent, forwards to downstream", id, num);
code = flushStateDataInExecutor(pTask, (SStreamQueueItem*)pBlock); code = flushStateDataInExecutor(pTask, (SStreamQueueItem*)pBlock);
if (code) { if (code) {
@ -444,7 +444,7 @@ static int32_t processCheckpointReadyHelp(SActiveCheckpointInfo* pInfo, int32_t
.transId = pInfo->transId, .transId = pInfo->transId,
.streamId = streamId, .streamId = streamId,
.downstreamNodeId = downstreamNodeId}; .downstreamNodeId = downstreamNodeId};
void* p = taosArrayPush(pInfo->pCheckpointReadyRecvList, &info); void* p = taosArrayPush(pInfo->pCheckpointReadyRecvList, &info);
if (p == NULL) { if (p == NULL) {
stError("s-task:%s failed to set checkpoint ready recv msg, code:%s", id, tstrerror(terrno)); stError("s-task:%s failed to set checkpoint ready recv msg, code:%s", id, tstrerror(terrno));
return terrno; return terrno;
@ -559,8 +559,8 @@ void streamTaskClearCheckInfo(SStreamTask* pTask, bool clearChkpReadyMsg) {
} }
streamMutexUnlock(&pInfo->lock); streamMutexUnlock(&pInfo->lock);
stDebug("s-task:%s clear active checkpointInfo, failed checkpointId:%"PRId64", current checkpointId:%"PRId64, stDebug("s-task:%s clear active checkpointInfo, failed checkpointId:%" PRId64 ", current checkpointId:%" PRId64,
pTask->id.idStr, pInfo->failedId, pTask->chkInfo.checkpointId); pTask->id.idStr, pInfo->failedId, pTask->chkInfo.checkpointId);
} }
int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SVUpdateCheckpointInfoReq* pReq) { int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SVUpdateCheckpointInfoReq* pReq) {
@ -574,8 +574,7 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV
if (pReq->checkpointId <= pInfo->checkpointId) { if (pReq->checkpointId <= pInfo->checkpointId) {
stDebug("s-task:%s vgId:%d latest checkpointId:%" PRId64 " Ver:%" PRId64 stDebug("s-task:%s vgId:%d latest checkpointId:%" PRId64 " Ver:%" PRId64
" no need to update checkpoint info, updated checkpointId:%" PRId64 " Ver:%" PRId64 " no need to update checkpoint info, updated checkpointId:%" PRId64 " Ver:%" PRId64 " transId:%d ignored",
" transId:%d ignored",
id, vgId, pInfo->checkpointId, pInfo->checkpointVer, pReq->checkpointId, pReq->checkpointVer, id, vgId, pInfo->checkpointId, pInfo->checkpointVer, pReq->checkpointId, pReq->checkpointVer,
pReq->transId); pReq->transId);
streamMutexUnlock(&pTask->lock); streamMutexUnlock(&pTask->lock);
@ -622,7 +621,7 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV
} }
bool valid = (pInfo->checkpointId <= pReq->checkpointId && pInfo->checkpointVer <= pReq->checkpointVer && bool valid = (pInfo->checkpointId <= pReq->checkpointId && pInfo->checkpointVer <= pReq->checkpointVer &&
pInfo->processedVer <= pReq->checkpointVer); pInfo->processedVer <= pReq->checkpointVer);
if (!valid) { if (!valid) {
stFatal("invalid checkpoint id check, current checkpointId:%" PRId64 " checkpointVer:%" PRId64 stFatal("invalid checkpoint id check, current checkpointId:%" PRId64 " checkpointVer:%" PRId64
@ -907,7 +906,7 @@ static int32_t doChkptStatusCheck(SStreamTask* pTask) {
if (pTmrInfo->launchChkptId != pActiveInfo->activeId) { if (pTmrInfo->launchChkptId != pActiveInfo->activeId) {
int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask); int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask);
stWarn("s-task:%s vgId:%d checkpoint-trigger retrieve by previous checkpoint procedure, checkpointId:%" PRId64 stWarn("s-task:%s vgId:%d checkpoint-trigger retrieve by previous checkpoint procedure, checkpointId:%" PRId64
", quit, ref:%d", ", quit, ref:%d",
id, vgId, pTmrInfo->launchChkptId, ref); id, vgId, pTmrInfo->launchChkptId, ref);
return -1; return -1;
} }
@ -1004,7 +1003,7 @@ void checkpointTriggerMonitorFn(void* param, void* tmrId) {
int32_t numOfNotSend = 0; int32_t numOfNotSend = 0;
SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo; SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo;
SStreamTmrInfo* pTmrInfo = &pActiveInfo->chkptTriggerMsgTmr; SStreamTmrInfo* pTmrInfo = &pActiveInfo->chkptTriggerMsgTmr;
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask); int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask);
@ -1022,7 +1021,8 @@ void checkpointTriggerMonitorFn(void* param, void* tmrId) {
} }
if (++pTmrInfo->activeCounter < 50) { if (++pTmrInfo->activeCounter < 50) {
streamTmrStart(checkpointTriggerMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId, "trigger-recv-monitor"); streamTmrStart(checkpointTriggerMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId,
"trigger-recv-monitor");
return; return;
} }
@ -1200,8 +1200,8 @@ int32_t streamTaskInitTriggerDispatchInfo(SStreamTask* pTask) {
STaskDispatcherFixed* pDispatch = &pTask->outputInfo.fixedDispatcher; STaskDispatcherFixed* pDispatch = &pTask->outputInfo.fixedDispatcher;
STaskTriggerSendInfo p = {.sendTs = now, .recved = false, .nodeId = pDispatch->nodeId, .taskId = pDispatch->taskId}; STaskTriggerSendInfo p = {.sendTs = now, .recved = false, .nodeId = pDispatch->nodeId, .taskId = pDispatch->taskId};
void* px = taosArrayPush(pInfo->pDispatchTriggerList, &p); void* px = taosArrayPush(pInfo->pDispatchTriggerList, &p);
if (px == NULL) { // pause the stream task, if memory not enough if (px == NULL) { // pause the stream task, if memory not enough
code = terrno; code = terrno;
} }
} else { } else {
@ -1212,8 +1212,8 @@ int32_t streamTaskInitTriggerDispatchInfo(SStreamTask* pTask) {
} }
STaskTriggerSendInfo p = {.sendTs = now, .recved = false, .nodeId = pVgInfo->vgId, .taskId = pVgInfo->taskId}; STaskTriggerSendInfo p = {.sendTs = now, .recved = false, .nodeId = pVgInfo->vgId, .taskId = pVgInfo->taskId};
void* px = taosArrayPush(pInfo->pDispatchTriggerList, &p); void* px = taosArrayPush(pInfo->pDispatchTriggerList, &p);
if (px == NULL) { // pause the stream task, if memory not enough if (px == NULL) { // pause the stream task, if memory not enough
code = terrno; code = terrno;
break; break;
} }
@ -1287,11 +1287,11 @@ void streamTaskSetTriggerDispatchConfirmed(SStreamTask* pTask, int32_t vgId) {
static int32_t uploadCheckpointToS3(const char* id, const char* path) { static int32_t uploadCheckpointToS3(const char* id, const char* path) {
int32_t code = 0; int32_t code = 0;
int32_t nBytes = 0; int32_t nBytes = 0;
/*
if (s3Init() != 0) { if (s3Init() != 0) {
return TSDB_CODE_THIRDPARTY_ERROR; return TSDB_CODE_THIRDPARTY_ERROR;
} }
*/
TdDirPtr pDir = taosOpenDir(path); TdDirPtr pDir = taosOpenDir(path);
if (pDir == NULL) { if (pDir == NULL) {
return terrno; return terrno;
@ -1324,11 +1324,11 @@ static int32_t uploadCheckpointToS3(const char* id, const char* path) {
break; break;
} }
code = s3PutObjectFromFile2(filename, object, 0); code = tcsPutObjectFromFile2(filename, object, 0);
if (code != 0) { if (code != 0) {
stError("[s3] failed to upload checkpoint:%s, reason:%s", filename, tstrerror(code)); stError("[tcs] failed to upload checkpoint:%s, reason:%s", filename, tstrerror(code));
} else { } else {
stDebug("[s3] upload checkpoint:%s", filename); stDebug("[tcs] upload checkpoint:%s", filename);
} }
} }
@ -1354,7 +1354,7 @@ int32_t downloadCheckpointByNameS3(const char* id, const char* fname, const char
taosMemoryFree(buf); taosMemoryFree(buf);
return TSDB_CODE_OUT_OF_RANGE; return TSDB_CODE_OUT_OF_RANGE;
} }
int32_t code = s3GetObjectToFile(buf, dstName); int32_t code = tcsGetObjectToFile(buf, dstName);
if (code != 0) { if (code != 0) {
taosMemoryFree(buf); taosMemoryFree(buf);
return TAOS_SYSTEM_ERROR(errno); return TAOS_SYSTEM_ERROR(errno);
@ -1417,7 +1417,7 @@ int32_t streamTaskDownloadCheckpointData(const char* id, char* path, int64_t che
if (strlen(tsSnodeAddress) != 0) { if (strlen(tsSnodeAddress) != 0) {
return downloadByRsync(id, path, checkpointId); return downloadByRsync(id, path, checkpointId);
} else if (tsS3StreamEnabled) { } else if (tsS3StreamEnabled) {
return s3GetObjectsByPrefix(id, path); return tcsGetObjectsByPrefix(id, path);
} }
return 0; return 0;
@ -1431,7 +1431,7 @@ int32_t deleteCheckpoint(const char* id) {
if (strlen(tsSnodeAddress) != 0) { if (strlen(tsSnodeAddress) != 0) {
return deleteRsync(id); return deleteRsync(id);
} else if (tsS3StreamEnabled) { } else if (tsS3StreamEnabled) {
s3DeleteObjectsByPrefix(id); tcsDeleteObjectsByPrefix(id);
} }
return 0; return 0;
} }
@ -1445,7 +1445,7 @@ int32_t deleteCheckpointFile(const char* id, const char* name) {
} }
char* tmp = object; char* tmp = object;
int32_t code = s3DeleteObjects((const char**)&tmp, 1); int32_t code = tcsDeleteObjects((const char**)&tmp, 1);
if (code != 0) { if (code != 0) {
return TSDB_CODE_THIRDPARTY_ERROR; return TSDB_CODE_THIRDPARTY_ERROR;
} }
@ -1487,4 +1487,4 @@ int32_t streamTaskSendCheckpointsourceRsp(SStreamTask* pTask) {
streamMutexUnlock(&pTask->lock); streamMutexUnlock(&pTask->lock);
return code; return code;
} }