1314 lines
48 KiB
C
1314 lines
48 KiB
C
/*
|
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
|
*
|
|
* This program is free software: you can use, redistribute, and/or modify
|
|
* it under the terms of the GNU Affero General Public License, version 3
|
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
|
*
|
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
|
*
|
|
* You should have received a copy of the GNU Affero General Public License
|
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
*/
|
|
|
|
#include "cos.h"
|
|
#include "rsync.h"
|
|
#include "streamBackendRocksdb.h"
|
|
#include "streamInt.h"
|
|
|
|
static int32_t downloadCheckpointDataByName(const char* id, const char* fname, const char* dstName);
|
|
static int32_t deleteCheckpointFile(const char* id, const char* name);
|
|
static int32_t streamTaskUploadCheckpoint(const char* id, const char* path);
|
|
static int32_t deleteCheckpoint(const char* id);
|
|
static int32_t downloadCheckpointByNameS3(const char* id, const char* fname, const char* dstName);
|
|
static int32_t continueDispatchCheckpointTriggerBlock(SStreamDataBlock* pBlock, SStreamTask* pTask);
|
|
static int32_t appendCheckpointIntoInputQ(SStreamTask* pTask, int32_t checkpointType, int64_t checkpointId,
|
|
int32_t transId, int32_t srcTaskId);
|
|
static int32_t doSendRetrieveTriggerMsg(SStreamTask* pTask, SArray* pNotSendList);
|
|
static void checkpointTriggerMonitorFn(void* param, void* tmrId);
|
|
|
|
int32_t createChkptTriggerBlock(SStreamTask* pTask, int32_t checkpointType, int64_t checkpointId, int32_t transId,
|
|
int32_t srcTaskId, SStreamDataBlock** pRes) {
|
|
SStreamDataBlock* pChkpoint = NULL;
|
|
int32_t code = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, sizeof(SSDataBlock), (void**)&pChkpoint);
|
|
if (code) {
|
|
return code;
|
|
}
|
|
|
|
pChkpoint->type = checkpointType;
|
|
if (checkpointType == STREAM_INPUT__CHECKPOINT_TRIGGER && (pTask->info.taskLevel != TASK_LEVEL__SOURCE)) {
|
|
pChkpoint->srcTaskId = srcTaskId;
|
|
ASSERT(srcTaskId != 0);
|
|
}
|
|
|
|
SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
|
|
if (pBlock == NULL) {
|
|
taosFreeQitem(pChkpoint);
|
|
return TSDB_CODE_OUT_OF_MEMORY;
|
|
}
|
|
|
|
pBlock->info.type = STREAM_CHECKPOINT;
|
|
pBlock->info.version = checkpointId;
|
|
pBlock->info.window.ekey = pBlock->info.window.skey = transId; // NOTE: set the transId
|
|
pBlock->info.rows = 1;
|
|
pBlock->info.childId = pTask->info.selfChildId;
|
|
|
|
pChkpoint->blocks = taosArrayInit(4, sizeof(SSDataBlock)); // pBlock;
|
|
if (pChkpoint->blocks == NULL) {
|
|
taosMemoryFree(pBlock);
|
|
taosFreeQitem(pChkpoint);
|
|
return TSDB_CODE_OUT_OF_MEMORY;
|
|
}
|
|
|
|
void* p = taosArrayPush(pChkpoint->blocks, pBlock);
|
|
if (p == NULL) {
|
|
taosArrayDestroy(pChkpoint->blocks);
|
|
taosMemoryFree(pBlock);
|
|
taosFreeQitem(pChkpoint);
|
|
return TSDB_CODE_OUT_OF_MEMORY;
|
|
}
|
|
|
|
*pRes = pChkpoint;
|
|
|
|
taosMemoryFree(pBlock);
|
|
return TSDB_CODE_SUCCESS;
|
|
}
|
|
|
|
// this message must be put into inputq successfully, continue retrying until it succeeds
|
|
// todo must be success
|
|
int32_t appendCheckpointIntoInputQ(SStreamTask* pTask, int32_t checkpointType, int64_t checkpointId, int32_t transId,
|
|
int32_t srcTaskId) {
|
|
SStreamDataBlock* pCheckpoint = NULL;
|
|
int32_t code = createChkptTriggerBlock(pTask, checkpointType, checkpointId, transId, srcTaskId, &pCheckpoint);
|
|
if (code != TSDB_CODE_SUCCESS) {
|
|
return code;
|
|
}
|
|
|
|
if (streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pCheckpoint) < 0) {
|
|
return TSDB_CODE_OUT_OF_MEMORY;
|
|
}
|
|
|
|
return streamTrySchedExec(pTask);
|
|
}
|
|
|
|
int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSourceReq* pReq) {
|
|
if (pTask->info.taskLevel != TASK_LEVEL__SOURCE) {
|
|
return TSDB_CODE_INVALID_MSG;
|
|
}
|
|
|
|
// todo this status may not be set here.
|
|
// 1. set task status to be prepared for check point, no data are allowed to put into inputQ.
|
|
int32_t code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_GEN_CHECKPOINT);
|
|
if (code != TSDB_CODE_SUCCESS) {
|
|
stError("s-task:%s failed to handle gen-checkpoint event, failed to start checkpoint procedure", pTask->id.idStr);
|
|
return code;
|
|
}
|
|
|
|
pTask->chkInfo.pActiveInfo->transId = pReq->transId;
|
|
pTask->chkInfo.pActiveInfo->activeId = pReq->checkpointId;
|
|
pTask->chkInfo.startTs = taosGetTimestampMs();
|
|
pTask->execInfo.checkpoint += 1;
|
|
|
|
// 2. Put the checkpoint block into inputQ, to make sure all blocks with less version have been handled by this task
|
|
// and this is the last item in the inputQ.
|
|
return appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT_TRIGGER, pReq->checkpointId, pReq->transId, -1);
|
|
}
|
|
|
|
int32_t streamTaskProcessCheckpointTriggerRsp(SStreamTask* pTask, SCheckpointTriggerRsp* pRsp) {
|
|
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
|
|
stError("s-task:%s invalid msg recv, checkpoint-trigger rsp not handled", pTask->id.idStr);
|
|
return TSDB_CODE_INVALID_MSG;
|
|
}
|
|
|
|
if (pRsp->rspCode != TSDB_CODE_SUCCESS) {
|
|
stDebug("s-task:%s retrieve checkpoint-trgger rsp from upstream:0x%x invalid, code:%s", pTask->id.idStr,
|
|
pRsp->upstreamTaskId, tstrerror(pRsp->rspCode));
|
|
return TSDB_CODE_SUCCESS;
|
|
}
|
|
|
|
(void)appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT_TRIGGER, pRsp->checkpointId, pRsp->transId,
|
|
pRsp->upstreamTaskId);
|
|
return TSDB_CODE_SUCCESS;
|
|
}
|
|
|
|
int32_t streamTaskSendCheckpointTriggerMsg(SStreamTask* pTask, int32_t dstTaskId, int32_t downstreamNodeId,
|
|
SRpcHandleInfo* pRpcInfo, int32_t code) {
|
|
int32_t size = sizeof(SMsgHead) + sizeof(SCheckpointTriggerRsp);
|
|
|
|
void* pBuf = rpcMallocCont(size);
|
|
if (pBuf == NULL) {
|
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
|
return terrno;
|
|
}
|
|
|
|
SCheckpointTriggerRsp* pRsp = POINTER_SHIFT(pBuf, sizeof(SMsgHead));
|
|
|
|
((SMsgHead*)pBuf)->vgId = htonl(downstreamNodeId);
|
|
|
|
pRsp->streamId = pTask->id.streamId;
|
|
pRsp->upstreamTaskId = pTask->id.taskId;
|
|
pRsp->taskId = dstTaskId;
|
|
pRsp->rspCode = code;
|
|
|
|
if (code == TSDB_CODE_SUCCESS) {
|
|
pRsp->checkpointId = pTask->chkInfo.pActiveInfo->activeId;
|
|
pRsp->transId = pTask->chkInfo.pActiveInfo->transId;
|
|
} else {
|
|
pRsp->checkpointId = -1;
|
|
pRsp->transId = -1;
|
|
}
|
|
|
|
SRpcMsg rspMsg = {.code = 0, .pCont = pBuf, .contLen = size, .info = *pRpcInfo};
|
|
tmsgSendRsp(&rspMsg);
|
|
|
|
return 0;
|
|
}
|
|
|
|
int32_t continueDispatchCheckpointTriggerBlock(SStreamDataBlock* pBlock, SStreamTask* pTask) {
|
|
pBlock->srcTaskId = pTask->id.taskId;
|
|
pBlock->srcVgId = pTask->pMeta->vgId;
|
|
|
|
int32_t code = taosWriteQitem(pTask->outputq.queue->pQueue, pBlock);
|
|
if (code == 0) {
|
|
ASSERT(pTask->chkInfo.pActiveInfo->dispatchTrigger == false);
|
|
code = streamDispatchStreamBlock(pTask);
|
|
} else {
|
|
stError("s-task:%s failed to put checkpoint into outputQ, code:%s", pTask->id.idStr, tstrerror(code));
|
|
streamFreeQitem((SStreamQueueItem*)pBlock);
|
|
}
|
|
|
|
return code;
|
|
}
|
|
|
|
int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock* pBlock) {
|
|
SSDataBlock* pDataBlock = taosArrayGet(pBlock->blocks, 0);
|
|
if (pDataBlock == NULL) {
|
|
return TSDB_CODE_INVALID_PARA;
|
|
}
|
|
|
|
int64_t checkpointId = pDataBlock->info.version;
|
|
int32_t transId = pDataBlock->info.window.skey;
|
|
const char* id = pTask->id.idStr;
|
|
int32_t code = TSDB_CODE_SUCCESS;
|
|
int32_t vgId = pTask->pMeta->vgId;
|
|
int32_t taskLevel = pTask->info.taskLevel;
|
|
|
|
SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo;
|
|
|
|
streamMutexLock(&pTask->lock);
|
|
if (pTask->chkInfo.checkpointId > checkpointId) {
|
|
stError("s-task:%s vgId:%d current checkpointId:%" PRId64
|
|
" recv expired checkpoint-trigger block, checkpointId:%" PRId64 " transId:%d, discard",
|
|
id, vgId, pTask->chkInfo.checkpointId, checkpointId, transId);
|
|
streamMutexUnlock(&pTask->lock);
|
|
|
|
streamFreeQitem((SStreamQueueItem*)pBlock);
|
|
return code;
|
|
}
|
|
|
|
if (pTask->chkInfo.checkpointId == checkpointId) {
|
|
{ // send checkpoint-ready msg to upstream
|
|
SRpcMsg msg = {0};
|
|
SStreamUpstreamEpInfo* pInfo = NULL;
|
|
streamTaskGetUpstreamTaskEpInfo(pTask, pBlock->srcTaskId, &pInfo);
|
|
if (pInfo == NULL) {
|
|
streamMutexUnlock(&pTask->lock);
|
|
return TSDB_CODE_STREAM_TASK_NOT_EXIST;
|
|
}
|
|
|
|
code = initCheckpointReadyMsg(pTask, pInfo->nodeId, pBlock->srcTaskId, pInfo->childId, checkpointId, &msg);
|
|
if (code == TSDB_CODE_SUCCESS) {
|
|
(void)tmsgSendReq(&pInfo->epSet, &msg);
|
|
}
|
|
}
|
|
|
|
stWarn(
|
|
"s-task:%s vgId:%d recv already finished checkpoint msg, send checkpoint-ready to upstream:0x%x to resume the "
|
|
"interrupted checkpoint",
|
|
id, vgId, pBlock->srcTaskId);
|
|
|
|
streamTaskOpenUpstreamInput(pTask, pBlock->srcTaskId);
|
|
streamMutexUnlock(&pTask->lock);
|
|
|
|
streamFreeQitem((SStreamQueueItem*)pBlock);
|
|
return code;
|
|
}
|
|
|
|
if (streamTaskGetStatus(pTask).state == TASK_STATUS__CK) {
|
|
if (pActiveInfo->activeId != checkpointId) {
|
|
stError("s-task:%s vgId:%d active checkpointId:%" PRId64 ", recv invalid checkpoint-trigger checkpointId:%" PRId64
|
|
" discard",
|
|
id, vgId, pActiveInfo->activeId, checkpointId);
|
|
streamMutexUnlock(&pTask->lock);
|
|
|
|
streamFreeQitem((SStreamQueueItem*)pBlock);
|
|
return code;
|
|
} else { // checkpointId == pActiveInfo->activeId
|
|
if (pActiveInfo->allUpstreamTriggerRecv == 1) {
|
|
stDebug(
|
|
"s-task:%s vgId:%d all upstream checkpoint-trigger recv, discard this checkpoint-trigger, "
|
|
"checkpointId:%" PRId64 " transId:%d",
|
|
id, vgId, checkpointId, transId);
|
|
streamMutexUnlock(&pTask->lock);
|
|
streamFreeQitem((SStreamQueueItem*)pBlock);
|
|
return code;
|
|
}
|
|
|
|
if (taskLevel == TASK_LEVEL__SINK || taskLevel == TASK_LEVEL__AGG) {
|
|
// check if already recv or not, and duplicated checkpoint-trigger msg recv, discard it
|
|
for (int32_t i = 0; i < taosArrayGetSize(pActiveInfo->pReadyMsgList); ++i) {
|
|
STaskCheckpointReadyInfo* p = taosArrayGet(pActiveInfo->pReadyMsgList, i);
|
|
if (p == NULL) {
|
|
streamMutexUnlock(&pTask->lock);
|
|
return TSDB_CODE_INVALID_PARA;
|
|
}
|
|
|
|
if (p->upstreamTaskId == pBlock->srcTaskId) {
|
|
stWarn("s-task:%s repeatly recv checkpoint-source msg from task:0x%x vgId:%d, checkpointId:%" PRId64
|
|
", prev recvTs:%" PRId64 " discard",
|
|
pTask->id.idStr, p->upstreamTaskId, p->upstreamNodeId, p->checkpointId, p->recvTs);
|
|
|
|
streamMutexUnlock(&pTask->lock);
|
|
streamFreeQitem((SStreamQueueItem*)pBlock);
|
|
return code;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
streamMutexUnlock(&pTask->lock);
|
|
|
|
stDebug("s-task:%s vgId:%d start to handle the checkpoint-trigger block, checkpointId:%" PRId64 " ver:%" PRId64
|
|
", transId:%d current active checkpointId:%" PRId64,
|
|
id, vgId, pTask->chkInfo.checkpointId, pTask->chkInfo.checkpointVer, transId, checkpointId);
|
|
|
|
// set task status
|
|
if (streamTaskGetStatus(pTask).state != TASK_STATUS__CK) {
|
|
pActiveInfo->activeId = checkpointId;
|
|
pActiveInfo->transId = transId;
|
|
|
|
code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_GEN_CHECKPOINT);
|
|
if (code != TSDB_CODE_SUCCESS) {
|
|
stError("s-task:%s handle checkpoint-trigger block failed, code:%s", id, tstrerror(code));
|
|
streamFreeQitem((SStreamQueueItem*)pBlock);
|
|
return code;
|
|
}
|
|
|
|
int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
|
|
stDebug("s-task:%s start checkpoint-trigger monitor in 10s, ref:%d ", pTask->id.idStr, ref);
|
|
streamMetaAcquireOneTask(pTask);
|
|
|
|
if (pActiveInfo->pChkptTriggerTmr == NULL) {
|
|
pActiveInfo->pChkptTriggerTmr = taosTmrStart(checkpointTriggerMonitorFn, 100, pTask, streamTimer);
|
|
} else {
|
|
streamTmrReset(checkpointTriggerMonitorFn, 100, pTask, streamTimer, &pActiveInfo->pChkptTriggerTmr, vgId, "trigger-recv-monitor");
|
|
}
|
|
}
|
|
|
|
if (taskLevel == TASK_LEVEL__SOURCE) {
|
|
int8_t type = pTask->outputInfo.type;
|
|
pActiveInfo->allUpstreamTriggerRecv = 1;
|
|
|
|
// We need to transfer state here, before dispatching checkpoint-trigger to downstream tasks.
|
|
// The transfer of state may generate new data that need to dispatch to downstream tasks,
|
|
// Otherwise, those new generated data by executors that is kept in outputQ, may be lost if this program crashed
|
|
// before the next checkpoint.
|
|
flushStateDataInExecutor(pTask, (SStreamQueueItem*)pBlock);
|
|
|
|
if (type == TASK_OUTPUT__FIXED_DISPATCH || type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
|
stDebug("s-task:%s set childIdx:%d, and add checkpoint-trigger block into outputQ", id, pTask->info.selfChildId);
|
|
(void)continueDispatchCheckpointTriggerBlock(pBlock, pTask); // todo handle this failure
|
|
} else { // only one task exists, no need to dispatch downstream info
|
|
(void)appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT, pActiveInfo->activeId, pActiveInfo->transId,
|
|
-1);
|
|
streamFreeQitem((SStreamQueueItem*)pBlock);
|
|
}
|
|
} else if (taskLevel == TASK_LEVEL__SINK || taskLevel == TASK_LEVEL__AGG) {
|
|
if (pTask->chkInfo.startTs == 0) {
|
|
pTask->chkInfo.startTs = taosGetTimestampMs();
|
|
pTask->execInfo.checkpoint += 1;
|
|
}
|
|
|
|
// todo: handle this
|
|
// update the child Id for downstream tasks
|
|
(void) streamAddCheckpointReadyMsg(pTask, pBlock->srcTaskId, pTask->info.selfChildId, checkpointId);
|
|
|
|
// there are still some upstream tasks not send checkpoint request, do nothing and wait for then
|
|
if (pActiveInfo->allUpstreamTriggerRecv != 1) {
|
|
streamFreeQitem((SStreamQueueItem*)pBlock);
|
|
return code;
|
|
}
|
|
|
|
int32_t num = streamTaskGetNumOfUpstream(pTask);
|
|
if (taskLevel == TASK_LEVEL__SINK) {
|
|
stDebug("s-task:%s process checkpoint-trigger block, all %d upstreams sent, send ready msg to upstream", id, num);
|
|
streamFreeQitem((SStreamQueueItem*)pBlock);
|
|
(void)streamTaskBuildCheckpoint(pTask); // todo: not handle error yet
|
|
} else { // source & agg tasks need to forward the checkpoint msg downwards
|
|
stDebug("s-task:%s process checkpoint-trigger block, all %d upstreams sent, forwards to downstream", id, num);
|
|
|
|
flushStateDataInExecutor(pTask, (SStreamQueueItem*)pBlock);
|
|
|
|
// Put the checkpoint-trigger block into outputQ, to make sure all blocks with less version have been handled by
|
|
// this task already. And then, dispatch check point msg to all downstream tasks
|
|
code = continueDispatchCheckpointTriggerBlock(pBlock, pTask);
|
|
}
|
|
}
|
|
|
|
return code;
|
|
}
|
|
|
|
// only when all downstream tasks are send checkpoint rsp, we can start the checkpoint procedure for the agg task
|
|
static int32_t processCheckpointReadyHelp(SActiveCheckpointInfo* pInfo, int32_t numOfDownstream,
|
|
int32_t downstreamNodeId, int64_t streamId, int32_t downstreamTaskId,
|
|
const char* id, int32_t* pNotReady, int32_t* pTransId) {
|
|
bool received = false;
|
|
int32_t size = taosArrayGetSize(pInfo->pCheckpointReadyRecvList);
|
|
for (int32_t i = 0; i < size; ++i) {
|
|
STaskDownstreamReadyInfo* p = taosArrayGet(pInfo->pCheckpointReadyRecvList, i);
|
|
if (p == NULL) {
|
|
return TSDB_CODE_INVALID_PARA;
|
|
}
|
|
|
|
if (p->downstreamTaskId == downstreamTaskId) {
|
|
received = true;
|
|
break;
|
|
}
|
|
}
|
|
|
|
if (received) {
|
|
stDebug("s-task:%s already recv checkpoint-ready msg from downstream:0x%x, ignore. %d/%d downstream not ready", id,
|
|
downstreamTaskId, (int32_t)(numOfDownstream - taosArrayGetSize(pInfo->pCheckpointReadyRecvList)),
|
|
numOfDownstream);
|
|
} else {
|
|
STaskDownstreamReadyInfo info = {.recvTs = taosGetTimestampMs(),
|
|
.downstreamTaskId = downstreamTaskId,
|
|
.checkpointId = pInfo->activeId,
|
|
.transId = pInfo->transId,
|
|
.streamId = streamId,
|
|
.downstreamNodeId = downstreamNodeId};
|
|
void* p = taosArrayPush(pInfo->pCheckpointReadyRecvList, &info);
|
|
if (p == NULL) {
|
|
stError("s-task:%s failed to set checkpoint ready recv msg, code:%s", id, tstrerror(terrno));
|
|
return terrno;
|
|
}
|
|
}
|
|
|
|
*pNotReady = numOfDownstream - taosArrayGetSize(pInfo->pCheckpointReadyRecvList);
|
|
*pTransId = pInfo->transId;
|
|
return 0;
|
|
}
|
|
|
|
/**
|
|
* All down stream tasks have successfully completed the check point task.
|
|
* Current stream task is allowed to start to do checkpoint things in ASYNC model.
|
|
*/
|
|
int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask, int64_t checkpointId, int32_t downstreamNodeId,
|
|
int32_t downstreamTaskId) {
|
|
SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo;
|
|
|
|
const char* id = pTask->id.idStr;
|
|
int32_t total = streamTaskGetNumOfDownstream(pTask);
|
|
int32_t code = 0;
|
|
int32_t notReady = 0;
|
|
int32_t transId = 0;
|
|
|
|
// 1. not in checkpoint status now
|
|
SStreamTaskState pStat = streamTaskGetStatus(pTask);
|
|
if (pStat.state != TASK_STATUS__CK) {
|
|
stError("s-task:%s status:%s discard checkpoint-ready msg from task:0x%x", id, pStat.name, downstreamTaskId);
|
|
return TSDB_CODE_STREAM_TASK_IVLD_STATUS;
|
|
}
|
|
|
|
// 2. expired checkpoint-ready msg, invalid checkpoint-ready msg
|
|
if (pTask->chkInfo.checkpointId > checkpointId || pInfo->activeId != checkpointId) {
|
|
stError("s-task:%s status:%s checkpointId:%" PRId64 " new arrival checkpoint-ready msg (checkpointId:%" PRId64
|
|
") from task:0x%x, expired and discard",
|
|
id, pStat.name, pTask->chkInfo.checkpointId, checkpointId, downstreamTaskId);
|
|
return TSDB_CODE_INVALID_MSG;
|
|
}
|
|
|
|
streamMutexLock(&pInfo->lock);
|
|
code = processCheckpointReadyHelp(pInfo, total, downstreamNodeId, pTask->id.streamId, downstreamTaskId, id, ¬Ready,
|
|
&transId);
|
|
streamMutexUnlock(&pInfo->lock);
|
|
|
|
if ((notReady == 0) && (code == 0)) {
|
|
stDebug("s-task:%s all downstream tasks have completed build checkpoint, do checkpoint for current task", id);
|
|
(void)appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT, checkpointId, transId, -1);
|
|
}
|
|
|
|
return code;
|
|
}
|
|
|
|
int32_t streamTaskProcessCheckpointReadyRsp(SStreamTask* pTask, int32_t upstreamTaskId, int64_t checkpointId) {
|
|
SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo;
|
|
int64_t now = taosGetTimestampMs();
|
|
int32_t numOfConfirmed = 0;
|
|
|
|
streamMutexLock(&pInfo->lock);
|
|
for (int32_t i = 0; i < taosArrayGetSize(pInfo->pReadyMsgList); ++i) {
|
|
STaskCheckpointReadyInfo* pReadyInfo = taosArrayGet(pInfo->pReadyMsgList, i);
|
|
if (pReadyInfo == NULL) {
|
|
stError("s-task:%s invalid index during iterate the checkpoint-ready msg list, index:%d, ignore and continue",
|
|
pTask->id.idStr, i);
|
|
continue;
|
|
}
|
|
|
|
if (pReadyInfo->upstreamTaskId == upstreamTaskId && pReadyInfo->checkpointId == checkpointId) {
|
|
pReadyInfo->sendCompleted = 1;
|
|
stDebug("s-task:%s send checkpoint-ready msg to upstream:0x%x confirmed, checkpointId:%" PRId64 " ts:%" PRId64,
|
|
pTask->id.idStr, upstreamTaskId, checkpointId, now);
|
|
break;
|
|
}
|
|
}
|
|
|
|
for (int32_t i = 0; i < taosArrayGetSize(pInfo->pReadyMsgList); ++i) {
|
|
STaskCheckpointReadyInfo* pReadyInfo = taosArrayGet(pInfo->pReadyMsgList, i);
|
|
if (pReadyInfo == NULL) {
|
|
stError("s-task:%s invalid index during iterate the checkpoint-ready msg list, index:%d, ignore and continue",
|
|
pTask->id.idStr, i);
|
|
continue;
|
|
}
|
|
|
|
if (pReadyInfo->sendCompleted == 1) {
|
|
numOfConfirmed += 1;
|
|
}
|
|
}
|
|
|
|
stDebug("s-task:%s send checkpoint-ready msg to %d upstream confirmed, checkpointId:%" PRId64, pTask->id.idStr,
|
|
numOfConfirmed, checkpointId);
|
|
|
|
streamMutexUnlock(&pInfo->lock);
|
|
return TSDB_CODE_SUCCESS;
|
|
}
|
|
|
|
void streamTaskClearCheckInfo(SStreamTask* pTask, bool clearChkpReadyMsg) {
|
|
pTask->chkInfo.startTs = 0; // clear the recorded start time
|
|
streamTaskOpenAllUpstreamInput(pTask); // open inputQ for all upstream tasks
|
|
|
|
streamMutexLock(&pTask->chkInfo.pActiveInfo->lock);
|
|
streamTaskClearActiveInfo(pTask->chkInfo.pActiveInfo);
|
|
if (clearChkpReadyMsg) {
|
|
streamClearChkptReadyMsg(pTask->chkInfo.pActiveInfo);
|
|
}
|
|
streamMutexUnlock(&pTask->chkInfo.pActiveInfo->lock);
|
|
}
|
|
|
|
int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SVUpdateCheckpointInfoReq* pReq) {
|
|
SStreamMeta* pMeta = pTask->pMeta;
|
|
int32_t vgId = pMeta->vgId;
|
|
int32_t code = 0;
|
|
const char* id = pTask->id.idStr;
|
|
SCheckpointInfo* pInfo = &pTask->chkInfo;
|
|
|
|
streamMutexLock(&pTask->lock);
|
|
|
|
if (pReq->checkpointId <= pInfo->checkpointId) {
|
|
stDebug("s-task:%s vgId:%d latest checkpointId:%" PRId64 " checkpointVer:%" PRId64
|
|
" no need to update the checkpoint info, updated checkpointId:%" PRId64 " checkpointVer:%" PRId64
|
|
" transId:%d ignored",
|
|
id, vgId, pInfo->checkpointId, pInfo->checkpointVer, pReq->checkpointId, pReq->checkpointVer,
|
|
pReq->transId);
|
|
streamMutexUnlock(&pTask->lock);
|
|
|
|
{ // destroy the related fill-history tasks
|
|
// drop task should not in the meta-lock, and drop the related fill-history task now
|
|
streamMetaWUnLock(pMeta);
|
|
if (pReq->dropRelHTask) {
|
|
(void) streamMetaUnregisterTask(pMeta, pReq->hStreamId, pReq->hTaskId);
|
|
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
|
|
stDebug("s-task:%s vgId:%d related fill-history task:0x%x dropped in update checkpointInfo, remain tasks:%d",
|
|
id, vgId, pReq->taskId, numOfTasks);
|
|
}
|
|
streamMetaWLock(pMeta);
|
|
if (streamMetaCommit(pMeta) < 0) {
|
|
// persist to disk
|
|
}
|
|
}
|
|
|
|
return TSDB_CODE_SUCCESS;
|
|
}
|
|
|
|
SStreamTaskState pStatus = streamTaskGetStatus(pTask);
|
|
|
|
if (!restored) { // during restore procedure, do update checkpoint-info
|
|
stDebug("s-task:%s vgId:%d status:%s update the checkpoint-info during restore, checkpointId:%" PRId64 "->%" PRId64
|
|
" checkpointVer:%" PRId64 "->%" PRId64 " checkpointTs:%" PRId64 "->%" PRId64,
|
|
id, vgId, pStatus.name, pInfo->checkpointId, pReq->checkpointId, pInfo->checkpointVer, pReq->checkpointVer,
|
|
pInfo->checkpointTime, pReq->checkpointTs);
|
|
} else { // not in restore status, must be in checkpoint status
|
|
stDebug("s-task:%s vgId:%d status:%s start to update the checkpoint-info, checkpointId:%" PRId64 "->%" PRId64
|
|
" checkpointVer:%" PRId64 "->%" PRId64 " checkpointTs:%" PRId64 "->%" PRId64,
|
|
id, vgId, pStatus.name, pInfo->checkpointId, pReq->checkpointId, pInfo->checkpointVer, pReq->checkpointVer,
|
|
pInfo->checkpointTime, pReq->checkpointTs);
|
|
}
|
|
|
|
ASSERT(pInfo->checkpointId <= pReq->checkpointId && pInfo->checkpointVer <= pReq->checkpointVer &&
|
|
pInfo->processedVer <= pReq->checkpointVer);
|
|
|
|
pInfo->checkpointId = pReq->checkpointId;
|
|
pInfo->checkpointVer = pReq->checkpointVer;
|
|
pInfo->checkpointTime = pReq->checkpointTs;
|
|
|
|
streamTaskClearCheckInfo(pTask, true);
|
|
|
|
if (pStatus.state == TASK_STATUS__CK) {
|
|
// todo handle error
|
|
code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_CHECKPOINT_DONE);
|
|
} else {
|
|
stDebug("s-task:0x%x vgId:%d not handle checkpoint-done event, status:%s", pReq->taskId, vgId, pStatus.name);
|
|
}
|
|
|
|
if (pReq->dropRelHTask) {
|
|
stDebug("s-task:0x%x vgId:%d drop the related fill-history task:0x%" PRIx64 " after update checkpoint",
|
|
pReq->taskId, vgId, pReq->hTaskId);
|
|
CLEAR_RELATED_FILLHISTORY_TASK(pTask);
|
|
}
|
|
|
|
stDebug("s-task:0x%x set the persistent status attr to be ready, prev:%s, status in sm:%s", pReq->taskId,
|
|
streamTaskGetStatusStr(pTask->status.taskStatus), streamTaskGetStatus(pTask).name);
|
|
|
|
pTask->status.taskStatus = TASK_STATUS__READY;
|
|
|
|
code = streamMetaSaveTask(pMeta, pTask);
|
|
streamMutexUnlock(&pTask->lock);
|
|
|
|
if (code != TSDB_CODE_SUCCESS) {
|
|
stError("s-task:%s vgId:%d failed to save task info after do checkpoint, checkpointId:%" PRId64 ", since %s", id,
|
|
vgId, pReq->checkpointId, terrstr());
|
|
return code;
|
|
}
|
|
|
|
streamMetaWUnLock(pMeta);
|
|
|
|
// drop task should not in the meta-lock, and drop the related fill-history task now
|
|
if (pReq->dropRelHTask) {
|
|
(void) streamMetaUnregisterTask(pMeta, pReq->hStreamId, pReq->hTaskId);
|
|
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
|
|
stDebug("s-task:%s vgId:%d related fill-history task:0x%x dropped, remain tasks:%d", id, vgId,
|
|
(int32_t)pReq->hTaskId, numOfTasks);
|
|
}
|
|
|
|
streamMetaWLock(pMeta);
|
|
|
|
if (streamMetaCommit(pMeta) < 0) {
|
|
// persist to disk
|
|
}
|
|
|
|
return TSDB_CODE_SUCCESS;
|
|
}
|
|
|
|
void streamTaskSetFailedCheckpointId(SStreamTask* pTask) {
|
|
struct SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo;
|
|
|
|
if (pInfo->activeId <= 0) {
|
|
stWarn("s-task:%s checkpoint-info is cleared now, not set the failed checkpoint info", pTask->id.idStr);
|
|
} else {
|
|
pInfo->failedId = pInfo->activeId;
|
|
stDebug("s-task:%s mark the checkpointId:%" PRId64 " (transId:%d) failed", pTask->id.idStr, pInfo->activeId,
|
|
pInfo->transId);
|
|
}
|
|
}
|
|
|
|
static int32_t getCheckpointDataMeta(const char* id, const char* path, SArray* list) {
|
|
int32_t code = 0;
|
|
int32_t cap = strlen(path) + 64;
|
|
|
|
char* filePath = taosMemoryCalloc(1, cap);
|
|
if (filePath == NULL) {
|
|
return TSDB_CODE_OUT_OF_MEMORY;
|
|
}
|
|
|
|
int32_t nBytes = snprintf(filePath, cap, "%s%s%s", path, TD_DIRSEP, "META_TMP");
|
|
if (nBytes <= 0 || nBytes >= cap) {
|
|
taosMemoryFree(filePath);
|
|
return TSDB_CODE_OUT_OF_RANGE;
|
|
}
|
|
|
|
code = downloadCheckpointDataByName(id, "META", filePath);
|
|
if (code != 0) {
|
|
stError("%s chkp failed to download meta file:%s", id, filePath);
|
|
taosMemoryFree(filePath);
|
|
return code;
|
|
}
|
|
|
|
code = remoteChkpGetDelFile(filePath, list);
|
|
if (code != 0) {
|
|
stError("%s chkp failed to get to del:%s", id, filePath);
|
|
taosMemoryFree(filePath);
|
|
}
|
|
return 0;
|
|
}
|
|
|
|
int32_t uploadCheckpointData(SStreamTask* pTask, int64_t checkpointId, int64_t dbRefId, ECHECKPOINT_BACKUP_TYPE type) {
|
|
int32_t code = 0;
|
|
char* path = NULL;
|
|
|
|
SStreamMeta* pMeta = pTask->pMeta;
|
|
const char* idStr = pTask->id.idStr;
|
|
int64_t now = taosGetTimestampMs();
|
|
|
|
SArray* toDelFiles = taosArrayInit(4, POINTER_BYTES);
|
|
if (toDelFiles == NULL) {
|
|
return TSDB_CODE_OUT_OF_MEMORY;
|
|
}
|
|
|
|
if ((code = taskDbGenChkpUploadData(pTask->pBackend, pMeta->bkdChkptMgt, checkpointId, type, &path, toDelFiles,
|
|
pTask->id.idStr)) != 0) {
|
|
stError("s-task:%s failed to gen upload checkpoint:%" PRId64 ", reason:%s", idStr, checkpointId, tstrerror(code));
|
|
}
|
|
|
|
if (type == DATA_UPLOAD_S3) {
|
|
if (code == TSDB_CODE_SUCCESS && (code = getCheckpointDataMeta(idStr, path, toDelFiles)) != 0) {
|
|
stError("s-task:%s failed to get checkpointData for checkpointId:%" PRId64 ", reason:%s", idStr, checkpointId,
|
|
tstrerror(code));
|
|
}
|
|
}
|
|
|
|
if (code == TSDB_CODE_SUCCESS) {
|
|
code = streamTaskUploadCheckpoint(idStr, path);
|
|
if (code == TSDB_CODE_SUCCESS) {
|
|
stDebug("s-task:%s upload checkpointId:%" PRId64 " to remote succ", idStr, checkpointId);
|
|
} else {
|
|
stError("s-task:%s failed to upload checkpointId:%" PRId64 " path:%s,reason:%s", idStr, checkpointId, path,
|
|
tstrerror(code));
|
|
}
|
|
}
|
|
|
|
if (code == TSDB_CODE_SUCCESS) {
|
|
int32_t size = taosArrayGetSize(toDelFiles);
|
|
stDebug("s-task:%s remove redundant %d files", idStr, size);
|
|
|
|
for (int i = 0; i < size; i++) {
|
|
char* pName = taosArrayGetP(toDelFiles, i);
|
|
code = deleteCheckpointFile(idStr, pName);
|
|
if (code != 0) {
|
|
stDebug("s-task:%s failed to remove file: %s", idStr, pName);
|
|
break;
|
|
}
|
|
}
|
|
|
|
stDebug("s-task:%s remove redundant files in uploading checkpointId:%" PRId64 " data", idStr, checkpointId);
|
|
}
|
|
|
|
taosArrayDestroyP(toDelFiles, taosMemoryFree);
|
|
double el = (taosGetTimestampMs() - now) / 1000.0;
|
|
|
|
if (code == TSDB_CODE_SUCCESS) {
|
|
stDebug("s-task:%s complete update checkpointId:%" PRId64 ", elapsed time:%.2fs remove local checkpoint data %s",
|
|
idStr, checkpointId, el, path);
|
|
taosRemoveDir(path);
|
|
} else {
|
|
stDebug("s-task:%s failed to upload checkpointId:%" PRId64 " keep local checkpoint data, elapsed time:%.2fs", idStr,
|
|
checkpointId, el);
|
|
}
|
|
|
|
taosMemoryFree(path);
|
|
return code;
|
|
}
|
|
|
|
int32_t streamTaskRemoteBackupCheckpoint(SStreamTask* pTask, int64_t checkpointId) {
|
|
ECHECKPOINT_BACKUP_TYPE type = streamGetCheckpointBackupType();
|
|
if (type == DATA_UPLOAD_DISABLE) {
|
|
stDebug("s-task:%s not allowed to upload checkpoint data", pTask->id.idStr);
|
|
return 0;
|
|
}
|
|
|
|
if (pTask == NULL || pTask->pBackend == NULL) {
|
|
return 0;
|
|
}
|
|
|
|
int64_t dbRefId = taskGetDBRef(pTask->pBackend);
|
|
void* pBackend = taskAcquireDb(dbRefId);
|
|
if (pBackend == NULL) {
|
|
stError("s-task:%s failed to acquire db during update checkpoint data, failed to upload checkpointData",
|
|
pTask->id.idStr);
|
|
return -1;
|
|
}
|
|
|
|
int32_t code = uploadCheckpointData(pTask, checkpointId, taskGetDBRef(pTask->pBackend), type);
|
|
taskReleaseDb(dbRefId);
|
|
|
|
return code;
|
|
}
|
|
|
|
int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) {
|
|
int32_t code = TSDB_CODE_SUCCESS;
|
|
int64_t startTs = pTask->chkInfo.startTs;
|
|
int64_t ckId = pTask->chkInfo.pActiveInfo->activeId;
|
|
const char* id = pTask->id.idStr;
|
|
bool dropRelHTask = (streamTaskGetPrevStatus(pTask) == TASK_STATUS__HALT);
|
|
SStreamMeta* pMeta = pTask->pMeta;
|
|
|
|
// sink task does not need to save the status, and generated the checkpoint
|
|
if (pTask->info.taskLevel != TASK_LEVEL__SINK) {
|
|
stDebug("s-task:%s level:%d start gen checkpoint, checkpointId:%" PRId64, id, pTask->info.taskLevel, ckId);
|
|
|
|
int64_t ver = pTask->chkInfo.processedVer;
|
|
code = streamBackendDoCheckpoint(pTask->pBackend, ckId, ver);
|
|
if (code != TSDB_CODE_SUCCESS) {
|
|
stError("s-task:%s gen checkpoint:%" PRId64 " failed, code:%s", id, ckId, tstrerror(terrno));
|
|
}
|
|
}
|
|
|
|
// TODO: monitoring the checkpoint-source msg
|
|
// send check point response to upstream task
|
|
if (code == TSDB_CODE_SUCCESS) {
|
|
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
|
|
code = streamTaskSendCheckpointSourceRsp(pTask);
|
|
} else {
|
|
code = streamTaskSendCheckpointReadyMsg(pTask);
|
|
}
|
|
|
|
if (code != TSDB_CODE_SUCCESS) {
|
|
// todo: let's retry send rsp to mnode, checkpoint-ready has monitor now
|
|
stError("s-task:%s failed to send checkpoint rsp to upstream, checkpointId:%" PRId64 ", code:%s", id, ckId,
|
|
tstrerror(code));
|
|
}
|
|
}
|
|
|
|
if (code == TSDB_CODE_SUCCESS) {
|
|
code = streamTaskRemoteBackupCheckpoint(pTask, ckId);
|
|
if (code != TSDB_CODE_SUCCESS) {
|
|
stError("s-task:%s upload checkpointId:%" PRId64 " data failed, code:%s", id, ckId, tstrerror(code));
|
|
}
|
|
} else {
|
|
stError("s-task:%s taskInfo failed, checkpoint:%" PRId64 " failed, code:%s", id, ckId, tstrerror(code));
|
|
}
|
|
|
|
// TODO: monitoring the checkpoint-report msg
|
|
// update the latest checkpoint info if all works are done successfully, for rsma, the pMsgCb is null.
|
|
if (code == TSDB_CODE_SUCCESS) {
|
|
if (pTask->pMsgCb != NULL) {
|
|
code = streamSendChkptReportMsg(pTask, &pTask->chkInfo, dropRelHTask);
|
|
}
|
|
} else { // clear the checkpoint info if failed
|
|
streamMutexLock(&pTask->lock);
|
|
streamTaskSetFailedCheckpointId(pTask); // set failed checkpoint id before clear the checkpoint info
|
|
streamMutexUnlock(&pTask->lock);
|
|
|
|
code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_CHECKPOINT_DONE);
|
|
stDebug("s-task:%s clear checkpoint flag since gen checkpoint failed, checkpointId:%" PRId64, id, ckId);
|
|
}
|
|
|
|
double el = (taosGetTimestampMs() - startTs) / 1000.0;
|
|
stInfo("s-task:%s vgId:%d level:%d, checkpointId:%" PRId64 " ver:%" PRId64 " elapsed time:%.2fs, %s ", id,
|
|
pMeta->vgId, pTask->info.taskLevel, ckId, pTask->chkInfo.checkpointVer, el,
|
|
(code == TSDB_CODE_SUCCESS) ? "succ" : "failed");
|
|
|
|
return code;
|
|
}
|
|
|
|
void checkpointTriggerMonitorFn(void* param, void* tmrId) {
|
|
SStreamTask* pTask = param;
|
|
int32_t vgId = pTask->pMeta->vgId;
|
|
int64_t now = taosGetTimestampMs();
|
|
const char* id = pTask->id.idStr;
|
|
|
|
SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo;
|
|
|
|
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
|
|
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
|
|
stError("s-task:%s source task should not start the checkpoint-trigger monitor fn, ref:%d quit", id, ref);
|
|
streamMetaReleaseTask(pTask->pMeta, pTask);
|
|
return;
|
|
}
|
|
|
|
// check the status every 100ms
|
|
if (streamTaskShouldStop(pTask)) {
|
|
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
|
|
stDebug("s-task:%s vgId:%d quit from monitor checkpoint-trigger, ref:%d", id, vgId, ref);
|
|
streamMetaReleaseTask(pTask->pMeta, pTask);
|
|
return;
|
|
}
|
|
|
|
if (++pActiveInfo->checkCounter < 100) {
|
|
streamTmrReset(checkpointTriggerMonitorFn, 100, pTask, streamTimer, &pActiveInfo->pChkptTriggerTmr, vgId, "trigger-recv-monitor");
|
|
return;
|
|
}
|
|
|
|
pActiveInfo->checkCounter = 0;
|
|
stDebug("s-task:%s vgId:%d checkpoint-trigger monitor in tmr, ts:%" PRId64, id, vgId, now);
|
|
|
|
streamMutexLock(&pTask->lock);
|
|
SStreamTaskState pState = streamTaskGetStatus(pTask);
|
|
if (pState.state != TASK_STATUS__CK) {
|
|
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
|
|
stDebug("s-task:%s vgId:%d not in checkpoint status, quit from monitor checkpoint-trigger, ref:%d", id, vgId, ref);
|
|
|
|
streamMutexUnlock(&pTask->lock);
|
|
streamMetaReleaseTask(pTask->pMeta, pTask);
|
|
return;
|
|
}
|
|
|
|
// checkpoint-trigger recv flag is set, quit
|
|
if (pActiveInfo->allUpstreamTriggerRecv) {
|
|
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
|
|
stDebug("s-task:%s vgId:%d all checkpoint-trigger recv, quit from monitor checkpoint-trigger, ref:%d", id, vgId,
|
|
ref);
|
|
|
|
streamMutexUnlock(&pTask->lock);
|
|
streamMetaReleaseTask(pTask->pMeta, pTask);
|
|
return;
|
|
}
|
|
|
|
streamMutexUnlock(&pTask->lock);
|
|
|
|
streamMutexLock(&pActiveInfo->lock);
|
|
|
|
// send msg to retrieve checkpoint trigger msg
|
|
SArray* pList = pTask->upstreamInfo.pList;
|
|
SArray* pNotSendList = taosArrayInit(4, sizeof(SStreamUpstreamEpInfo));
|
|
if (pNotSendList == NULL) {
|
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
|
stDebug("s-task:%s start to triggerMonitor, reason:%s", id, tstrerror(terrno));
|
|
streamMutexUnlock(&pActiveInfo->lock);
|
|
return;
|
|
}
|
|
|
|
for (int32_t i = 0; i < taosArrayGetSize(pList); ++i) {
|
|
SStreamUpstreamEpInfo* pInfo = taosArrayGetP(pList, i);
|
|
|
|
bool recved = false;
|
|
for (int32_t j = 0; j < taosArrayGetSize(pActiveInfo->pReadyMsgList); ++j) {
|
|
STaskCheckpointReadyInfo* pReady = taosArrayGet(pActiveInfo->pReadyMsgList, j);
|
|
if (pReady == NULL) {
|
|
continue;
|
|
}
|
|
|
|
if (pInfo->nodeId == pReady->upstreamNodeId) {
|
|
recved = true;
|
|
break;
|
|
}
|
|
}
|
|
|
|
if (!recved) { // make sure the inputQ is opened for not recv upstream checkpoint-trigger message
|
|
streamTaskOpenUpstreamInput(pTask, pInfo->taskId);
|
|
(void)taosArrayPush(pNotSendList, pInfo);
|
|
}
|
|
}
|
|
|
|
// do send retrieve checkpoint trigger msg to upstream
|
|
int32_t size = taosArrayGetSize(pNotSendList);
|
|
(void)doSendRetrieveTriggerMsg(pTask, pNotSendList);
|
|
streamMutexUnlock(&pActiveInfo->lock);
|
|
|
|
// check every 100ms
|
|
if (size > 0) {
|
|
stDebug("s-task:%s start to monitor checkpoint-trigger in 10s", id);
|
|
streamTmrReset(checkpointTriggerMonitorFn, 100, pTask, streamTimer, &pActiveInfo->pChkptTriggerTmr, vgId, "trigger-recv-monitor");
|
|
} else {
|
|
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
|
|
stDebug("s-task:%s all checkpoint-trigger recved, quit from monitor checkpoint-trigger tmr, ref:%d", id, ref);
|
|
streamMetaReleaseTask(pTask->pMeta, pTask);
|
|
}
|
|
|
|
taosArrayDestroy(pNotSendList);
|
|
}
|
|
|
|
int32_t doSendRetrieveTriggerMsg(SStreamTask* pTask, SArray* pNotSendList) {
|
|
int32_t code = 0;
|
|
int32_t vgId = pTask->pMeta->vgId;
|
|
const char* pId = pTask->id.idStr;
|
|
int32_t size = taosArrayGetSize(pNotSendList);
|
|
int32_t numOfUpstream = streamTaskGetNumOfUpstream(pTask);
|
|
int64_t checkpointId = pTask->chkInfo.pActiveInfo->activeId;
|
|
|
|
if (size <= 0) {
|
|
stDebug("s-task:%s all upstream checkpoint trigger recved, no need to send retrieve", pId);
|
|
return code;
|
|
}
|
|
|
|
stDebug("s-task:%s %d/%d not recv checkpoint-trigger from upstream(s), start to send trigger-retrieve", pId, size,
|
|
numOfUpstream);
|
|
|
|
for (int32_t i = 0; i < size; i++) {
|
|
SStreamUpstreamEpInfo* pUpstreamTask = taosArrayGet(pNotSendList, i);
|
|
if (pUpstreamTask == NULL) {
|
|
return TSDB_CODE_INVALID_PARA;
|
|
}
|
|
|
|
SRetrieveChkptTriggerReq* pReq = rpcMallocCont(sizeof(SRetrieveChkptTriggerReq));
|
|
if (pReq == NULL) {
|
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
|
stError("vgId:%d failed to create msg to retrieve trigger msg for task:%s exec, code:out of memory", vgId, pId);
|
|
continue;
|
|
}
|
|
|
|
pReq->head.vgId = htonl(pUpstreamTask->nodeId);
|
|
pReq->streamId = pTask->id.streamId;
|
|
pReq->downstreamTaskId = pTask->id.taskId;
|
|
pReq->downstreamNodeId = vgId;
|
|
pReq->upstreamTaskId = pUpstreamTask->taskId;
|
|
pReq->upstreamNodeId = pUpstreamTask->nodeId;
|
|
pReq->checkpointId = checkpointId;
|
|
|
|
SRpcMsg rpcMsg = {0};
|
|
initRpcMsg(&rpcMsg, TDMT_STREAM_RETRIEVE_TRIGGER, pReq, sizeof(SRetrieveChkptTriggerReq));
|
|
|
|
code = tmsgSendReq(&pUpstreamTask->epSet, &rpcMsg);
|
|
if (code == TSDB_CODE_SUCCESS) {
|
|
stDebug("s-task:%s vgId:%d send checkpoint-trigger retrieve msg to 0x%x(vgId:%d) checkpointId:%" PRId64, pId,
|
|
vgId, pUpstreamTask->taskId, pUpstreamTask->nodeId, checkpointId);
|
|
} else {
|
|
stError("s-task:%s vgId:%d failed to send checkpoint-trigger retrieve msg to 0x%x(vgId:%d) checkpointId:%" PRId64,
|
|
pId, vgId, pUpstreamTask->taskId, pUpstreamTask->nodeId, checkpointId);
|
|
}
|
|
}
|
|
|
|
return code;
|
|
}
|
|
|
|
bool streamTaskAlreadySendTrigger(SStreamTask* pTask, int32_t downstreamNodeId) {
|
|
int64_t now = taosGetTimestampMs();
|
|
const char* id = pTask->id.idStr;
|
|
SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo;
|
|
SStreamTaskState pStatus = streamTaskGetStatus(pTask);
|
|
bool alreadySend = false;
|
|
|
|
if (pStatus.state != TASK_STATUS__CK) {
|
|
return false;
|
|
}
|
|
|
|
streamMutexLock(&pInfo->lock);
|
|
if (!pInfo->dispatchTrigger) {
|
|
streamMutexUnlock(&pInfo->lock);
|
|
return false;
|
|
}
|
|
|
|
int32_t num = taosArrayGetSize(pInfo->pDispatchTriggerList);
|
|
for (int32_t i = 0; i < num; ++i) {
|
|
STaskTriggerSendInfo* pSendInfo = taosArrayGet(pInfo->pDispatchTriggerList, i);
|
|
if (pSendInfo == NULL) {
|
|
stError("s-task:%s invalid index in dispatch-trigger list, index:%d, size:%d, ignore and continue", id, i, num);
|
|
continue;
|
|
}
|
|
|
|
if (pSendInfo->nodeId != downstreamNodeId) {
|
|
continue;
|
|
}
|
|
|
|
// has send trigger msg to downstream node,
|
|
double before = (now - pSendInfo->sendTs) / 1000.0;
|
|
if (pSendInfo->recved) {
|
|
stWarn("s-task:%s checkpoint-trigger msg already send at:%" PRId64
|
|
"(%.2fs before) and recv confirmed by downstream:0x%x, checkpointId:%" PRId64 ", transId:%d",
|
|
id, pSendInfo->sendTs, before, pSendInfo->taskId, pInfo->activeId, pInfo->transId);
|
|
} else {
|
|
stWarn("s-task:%s checkpoint-trigger already send at:%" PRId64 "(%.2fs before), checkpointId:%" PRId64
|
|
", transId:%d",
|
|
id, pSendInfo->sendTs, before, pInfo->activeId, pInfo->transId);
|
|
}
|
|
|
|
streamMutexUnlock(&pInfo->lock);
|
|
return true;
|
|
}
|
|
|
|
streamMutexUnlock(&pInfo->lock);
|
|
return false;
|
|
}
|
|
|
|
void streamTaskGetTriggerRecvStatus(SStreamTask* pTask, int32_t* pRecved, int32_t* pTotal) {
|
|
*pRecved = taosArrayGetSize(pTask->chkInfo.pActiveInfo->pReadyMsgList);
|
|
|
|
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
|
|
*pTotal = 1;
|
|
} else {
|
|
*pTotal = streamTaskGetNumOfUpstream(pTask);
|
|
}
|
|
}
|
|
|
|
// record the dispatch checkpoint trigger info in the list
|
|
// memory insufficient may cause the stream computing stopped
|
|
void streamTaskInitTriggerDispatchInfo(SStreamTask* pTask) {
|
|
SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo;
|
|
|
|
int64_t now = taosGetTimestampMs();
|
|
streamMutexLock(&pInfo->lock);
|
|
|
|
// outputQ should be empty here
|
|
ASSERT(streamQueueGetNumOfUnAccessedItems(pTask->outputq.queue) == 0);
|
|
|
|
pInfo->dispatchTrigger = true;
|
|
if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
|
|
STaskDispatcherFixed* pDispatch = &pTask->outputInfo.fixedDispatcher;
|
|
|
|
STaskTriggerSendInfo p = {.sendTs = now, .recved = false, .nodeId = pDispatch->nodeId, .taskId = pDispatch->taskId};
|
|
void* px = taosArrayPush(pInfo->pDispatchTriggerList, &p);
|
|
if (px == NULL) {
|
|
// pause the stream task, if memory not enough
|
|
}
|
|
} else {
|
|
for (int32_t i = 0; i < streamTaskGetNumOfDownstream(pTask); ++i) {
|
|
SVgroupInfo* pVgInfo = taosArrayGet(pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos, i);
|
|
if (pVgInfo == NULL) {
|
|
continue;
|
|
}
|
|
|
|
STaskTriggerSendInfo p = {.sendTs = now, .recved = false, .nodeId = pVgInfo->vgId, .taskId = pVgInfo->taskId};
|
|
void* px = taosArrayPush(pInfo->pDispatchTriggerList, &p);
|
|
if (px == NULL) {
|
|
// pause the stream task, if memory not enough
|
|
}
|
|
}
|
|
}
|
|
|
|
streamMutexUnlock(&pInfo->lock);
|
|
}
|
|
|
|
int32_t streamTaskGetNumOfConfirmed(SStreamTask* pTask) {
|
|
SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo;
|
|
|
|
int32_t num = 0;
|
|
streamMutexLock(&pInfo->lock);
|
|
for (int32_t i = 0; i < taosArrayGetSize(pInfo->pDispatchTriggerList); ++i) {
|
|
STaskTriggerSendInfo* p = taosArrayGet(pInfo->pDispatchTriggerList, i);
|
|
if (p == NULL) {
|
|
continue;
|
|
}
|
|
|
|
if (p->recved) {
|
|
num++;
|
|
}
|
|
}
|
|
streamMutexUnlock(&pInfo->lock);
|
|
return num;
|
|
}
|
|
|
|
void streamTaskSetTriggerDispatchConfirmed(SStreamTask* pTask, int32_t vgId) {
|
|
SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo;
|
|
|
|
int32_t taskId = 0;
|
|
streamMutexLock(&pInfo->lock);
|
|
|
|
for (int32_t i = 0; i < taosArrayGetSize(pInfo->pDispatchTriggerList); ++i) {
|
|
STaskTriggerSendInfo* p = taosArrayGet(pInfo->pDispatchTriggerList, i);
|
|
if (p == NULL) {
|
|
continue;
|
|
}
|
|
|
|
if (p->nodeId == vgId) {
|
|
ASSERT(p->recved == false);
|
|
|
|
p->recved = true;
|
|
p->recvTs = taosGetTimestampMs();
|
|
taskId = p->taskId;
|
|
break;
|
|
}
|
|
}
|
|
|
|
streamMutexUnlock(&pInfo->lock);
|
|
|
|
int32_t numOfConfirmed = streamTaskGetNumOfConfirmed(pTask);
|
|
int32_t total = streamTaskGetNumOfDownstream(pTask);
|
|
if (taskId == 0) {
|
|
stError("s-task:%s recv invalid trigger-dispatch confirm, vgId:%d", pTask->id.idStr, vgId);
|
|
} else {
|
|
stDebug("s-task:%s set downstream:0x%x(vgId:%d) checkpoint-trigger dispatch confirmed, total confirmed:%d/%d",
|
|
pTask->id.idStr, taskId, vgId, numOfConfirmed, total);
|
|
}
|
|
}
|
|
|
|
static int32_t uploadCheckpointToS3(const char* id, const char* path) {
|
|
int32_t code = 0;
|
|
int32_t nBytes = 0;
|
|
|
|
if (s3Init() != 0) {
|
|
return TSDB_CODE_THIRDPARTY_ERROR;
|
|
}
|
|
|
|
TdDirPtr pDir = taosOpenDir(path);
|
|
if (pDir == NULL) {
|
|
return TAOS_SYSTEM_ERROR(errno);
|
|
}
|
|
|
|
TdDirEntryPtr de = NULL;
|
|
while ((de = taosReadDir(pDir)) != NULL) {
|
|
char* name = taosGetDirEntryName(de);
|
|
if (strcmp(name, ".") == 0 || strcmp(name, "..") == 0 || taosDirEntryIsDir(de)) continue;
|
|
|
|
char filename[PATH_MAX] = {0};
|
|
if (path[strlen(path) - 1] == TD_DIRSEP_CHAR) {
|
|
nBytes = snprintf(filename, sizeof(filename), "%s%s", path, name);
|
|
if (nBytes <= 0 || nBytes >= sizeof(filename)) {
|
|
code = TSDB_CODE_OUT_OF_RANGE;
|
|
break;
|
|
}
|
|
} else {
|
|
nBytes = snprintf(filename, sizeof(filename), "%s%s%s", path, TD_DIRSEP, name);
|
|
if (nBytes <= 0 || nBytes >= sizeof(filename)) {
|
|
code = TSDB_CODE_OUT_OF_RANGE;
|
|
break;
|
|
}
|
|
}
|
|
|
|
char object[PATH_MAX] = {0};
|
|
nBytes = snprintf(object, sizeof(object), "%s%s%s", id, TD_DIRSEP, name);
|
|
if (nBytes <= 0 || nBytes >= sizeof(object)) {
|
|
code = TSDB_CODE_OUT_OF_RANGE;
|
|
break;
|
|
}
|
|
|
|
code = s3PutObjectFromFile2(filename, object, 0);
|
|
if (code != 0) {
|
|
stError("[s3] failed to upload checkpoint:%s, reason:%s", filename, tstrerror(code));
|
|
} else {
|
|
stDebug("[s3] upload checkpoint:%s", filename);
|
|
}
|
|
}
|
|
|
|
(void) taosCloseDir(&pDir);
|
|
return code;
|
|
}
|
|
|
|
int32_t downloadCheckpointByNameS3(const char* id, const char* fname, const char* dstName) {
|
|
int32_t nBytes;
|
|
int32_t cap = strlen(id) + strlen(dstName) + 16;
|
|
|
|
char* buf = taosMemoryCalloc(1, cap);
|
|
if (buf == NULL) {
|
|
return TSDB_CODE_OUT_OF_MEMORY;
|
|
}
|
|
|
|
nBytes = snprintf(buf, cap, "%s/%s", id, fname);
|
|
if (nBytes <= 0 || nBytes >= cap) {
|
|
taosMemoryFree(buf);
|
|
return TSDB_CODE_OUT_OF_RANGE;
|
|
}
|
|
int32_t code = s3GetObjectToFile(buf, dstName);
|
|
if (code != 0) {
|
|
taosMemoryFree(buf);
|
|
return TAOS_SYSTEM_ERROR(errno);
|
|
}
|
|
taosMemoryFree(buf);
|
|
return 0;
|
|
}
|
|
|
|
ECHECKPOINT_BACKUP_TYPE streamGetCheckpointBackupType() {
|
|
if (strlen(tsSnodeAddress) != 0) {
|
|
return DATA_UPLOAD_RSYNC;
|
|
} else if (tsS3StreamEnabled) {
|
|
return DATA_UPLOAD_S3;
|
|
} else {
|
|
return DATA_UPLOAD_DISABLE;
|
|
}
|
|
}
|
|
|
|
int32_t streamTaskUploadCheckpoint(const char* id, const char* path) {
|
|
int32_t code = 0;
|
|
if (id == NULL || path == NULL || strlen(id) == 0 || strlen(path) == 0 || strlen(path) >= PATH_MAX) {
|
|
stError("invalid parameters in upload checkpoint, %s", id);
|
|
return TSDB_CODE_INVALID_CFG;
|
|
}
|
|
|
|
if (strlen(tsSnodeAddress) != 0) {
|
|
code = uploadByRsync(id, path);
|
|
if (code != 0) {
|
|
return TAOS_SYSTEM_ERROR(errno);
|
|
}
|
|
} else if (tsS3StreamEnabled) {
|
|
return uploadCheckpointToS3(id, path);
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
// fileName: CURRENT
|
|
int32_t downloadCheckpointDataByName(const char* id, const char* fname, const char* dstName) {
|
|
if (id == NULL || fname == NULL || strlen(id) == 0 || strlen(fname) == 0 || strlen(fname) >= PATH_MAX) {
|
|
stError("down load checkpoint data parameters invalid");
|
|
return TSDB_CODE_INVALID_PARA;
|
|
}
|
|
|
|
if (strlen(tsSnodeAddress) != 0) {
|
|
return 0;
|
|
} else if (tsS3StreamEnabled) {
|
|
return downloadCheckpointByNameS3(id, fname, dstName);
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
int32_t streamTaskDownloadCheckpointData(const char* id, char* path) {
|
|
if (id == NULL || path == NULL || strlen(id) == 0 || strlen(path) == 0 || strlen(path) >= PATH_MAX) {
|
|
stError("down checkpoint data parameters invalid");
|
|
return -1;
|
|
}
|
|
|
|
if (strlen(tsSnodeAddress) != 0) {
|
|
return downloadRsync(id, path);
|
|
} else if (tsS3StreamEnabled) {
|
|
return s3GetObjectsByPrefix(id, path);
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
int32_t deleteCheckpoint(const char* id) {
|
|
if (id == NULL || strlen(id) == 0) {
|
|
stError("deleteCheckpoint parameters invalid");
|
|
return TSDB_CODE_INVALID_PARA;
|
|
}
|
|
if (strlen(tsSnodeAddress) != 0) {
|
|
return deleteRsync(id);
|
|
} else if (tsS3StreamEnabled) {
|
|
s3DeleteObjectsByPrefix(id);
|
|
}
|
|
return 0;
|
|
}
|
|
|
|
int32_t deleteCheckpointFile(const char* id, const char* name) {
|
|
char object[128] = {0};
|
|
|
|
int32_t nBytes = snprintf(object, sizeof(object), "%s/%s", id, name);
|
|
if (nBytes <= 0 || nBytes >= sizeof(object)) {
|
|
return TSDB_CODE_OUT_OF_RANGE;
|
|
}
|
|
|
|
char* tmp = object;
|
|
int32_t code = s3DeleteObjects((const char**)&tmp, 1);
|
|
if (code != 0) {
|
|
return TSDB_CODE_THIRDPARTY_ERROR;
|
|
}
|
|
return code;
|
|
}
|
|
|
|
int32_t streamTaskSendRestoreChkptMsg(SStreamTask* pTask) {
|
|
const char* id = pTask->id.idStr;
|
|
|
|
streamMutexLock(&pTask->lock);
|
|
if (pTask->status.sendConsensusChkptId == true) {
|
|
stDebug("s-task:%s already start to consensus-checkpointId, not start again before it completed", id);
|
|
streamMutexUnlock(&pTask->lock);
|
|
return TSDB_CODE_SUCCESS;
|
|
} else {
|
|
pTask->status.sendConsensusChkptId = true;
|
|
}
|
|
|
|
streamMutexUnlock(&pTask->lock);
|
|
|
|
ASSERT(pTask->pBackend == NULL);
|
|
pTask->status.requireConsensusChkptId = true;
|
|
|
|
return 0;
|
|
}
|
|
|
|
int32_t streamTaskSendCheckpointsourceRsp(SStreamTask* pTask) {
|
|
int32_t code = 0;
|
|
if (pTask->info.taskLevel != TASK_LEVEL__SOURCE) {
|
|
return code;
|
|
}
|
|
|
|
streamMutexLock(&pTask->lock);
|
|
SStreamTaskState p = streamTaskGetStatus(pTask);
|
|
if (p.state == TASK_STATUS__CK) {
|
|
code = streamTaskSendCheckpointSourceRsp(pTask);
|
|
}
|
|
streamMutexUnlock(&pTask->lock);
|
|
|
|
return code;
|
|
} |