Merge pull request #26242 from taosdata/fix/3_liaohj

fix(stream): disable pause if task is un-init.
This commit is contained in:
Haojun Liao 2024-06-23 10:01:49 +08:00 committed by GitHub
commit ffef405c3a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 224 additions and 148 deletions

View File

@ -41,7 +41,7 @@ int32_t tqStreamTaskProcessRetrieveTriggerReq(SStreamMeta* pMeta, SRpcMsg* pMsg)
int32_t tqStreamTaskProcessRetrieveTriggerRsp(SStreamMeta* pMeta, SRpcMsg* pMsg);
int32_t tqStreamTaskProcessTaskPauseReq(SStreamMeta* pMeta, char* pMsg);
int32_t tqStreamTaskProcessTaskResumeReq(void* handle, int64_t sversion, char* pMsg, bool fromVnode);
int32_t tqStreamTaskProcessUpdateCheckpointReq(SStreamMeta* pMeta, char* msg, int32_t msgLen);
int32_t tqStreamTaskProcessUpdateCheckpointReq(SStreamMeta* pMeta, bool restored, char* msg, int32_t msgLen);
void tqSetRestoreVersionInfo(SStreamTask* pTask);
int32_t tqExpandStreamTask(SStreamTask* pTask);

View File

@ -681,7 +681,8 @@ bool streamTaskAlreadySendTrigger(SStreamTask* pTask, int32_t downstreamNodeI
void streamTaskGetTriggerRecvStatus(SStreamTask* pTask, int32_t* pRecved, int32_t* pTotal);
void streamTaskInitTriggerDispatchInfo(SStreamTask* pTask);
void streamTaskSetTriggerDispatchConfirmed(SStreamTask* pTask, int32_t vgId);
int32_t streamTaskSendCheckpointTriggerMsg(SStreamTask* pTask, int32_t dstTaskId, SRpcHandleInfo* pInfo, int32_t code);
int32_t streamTaskSendCheckpointTriggerMsg(SStreamTask* pTask, int32_t dstTaskId, int32_t downstreamNodeId,
SRpcHandleInfo* pInfo, int32_t code);
int32_t streamQueueGetNumOfItems(const SStreamQueue* pQueue);
int32_t streamQueueGetNumOfUnAccessedItems(const SStreamQueue* pQueue);
@ -770,7 +771,7 @@ int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHa
int32_t streamTaskBuildCheckpointSourceRsp(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SRpcMsg* pMsg,
int32_t setCode);
int32_t streamSendChkptReportMsg(SStreamTask* pTask, SCheckpointInfo* pCheckpointInfo, int8_t dropRelHTask);
int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, SVUpdateCheckpointInfoReq* pReq);
int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SVUpdateCheckpointInfoReq* pReq);
SActiveCheckpointInfo* streamTaskCreateActiveChkptInfo();
// stream task state machine, and event handling

View File

@ -1075,7 +1075,7 @@ static bool taskNodeIsUpdated(SMnode *pMnode) {
mWarn("not all vnodes ready, quit from vnodes status check");
taosArrayDestroy(pNodeSnapshot);
taosThreadMutexUnlock(&execInfo.lock);
return 0;
return true;
}
SVgroupChangeInfo changeInfo = mndFindChangedNodeInfo(pMnode, execInfo.pNodeList, pNodeSnapshot);
@ -1911,9 +1911,51 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) {
bool updated = taskNodeIsUpdated(pMnode);
if (updated) {
mError("tasks are not ready for pause, node update detected");
sdbRelease(pMnode->pSdb, pStream);
return -1;
}
{ // check for tasks, if tasks are not ready, not allowed to pause
bool found = false;
bool readyToPause = true;
taosThreadMutexLock(&execInfo.lock);
for(int32_t i = 0; i < taosArrayGetSize(execInfo.pTaskList); ++i) {
STaskId *p = taosArrayGet(execInfo.pTaskList, i);
STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, p, sizeof(*p));
if (pEntry == NULL) {
continue;
}
if (pEntry->id.streamId != pStream->uid) {
continue;
}
if (pEntry->status == TASK_STATUS__UNINIT || pEntry->status == TASK_STATUS__CK) {
mError("stream:%s uid:0x%" PRIx64 " vgId:%d task:0x%" PRIx64 " status:%s, not ready for pause", pStream->name,
pStream->uid, pEntry->nodeId, pEntry->id.taskId, streamTaskGetStatusStr(pEntry->status));
readyToPause = false;
}
found = true;
}
taosThreadMutexUnlock(&execInfo.lock);
if (!found) {
mError("stream:%s task not report status yet, not ready for pause", pauseReq.name);
sdbRelease(pMnode->pSdb, pStream);
return -1;
}
if (!readyToPause) {
mError("stream:%s task not ready for pause yet", pauseReq.name);
sdbRelease(pMnode->pSdb, pStream);
return -1;
}
}
STrans *pTrans =
doCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_PAUSE_NAME, "pause the stream");
if (pTrans == NULL) {

View File

@ -154,7 +154,7 @@ int32_t sndProcessWriteMsg(SSnode *pSnode, SRpcMsg *pMsg, SRpcMsg *pRsp) {
case TDMT_STREAM_TASK_RESUME:
return tqStreamTaskProcessTaskResumeReq(pSnode->pMeta, pMsg->info.conn.applyIndex, pMsg->pCont, false);
case TDMT_STREAM_TASK_UPDATE_CHKPT:
return tqStreamTaskProcessUpdateCheckpointReq(pSnode->pMeta, pMsg->pCont, pMsg->contLen);
return tqStreamTaskProcessUpdateCheckpointReq(pSnode->pMeta, true, pMsg->pCont, pMsg->contLen);
default:
ASSERT(0);
}

View File

@ -1013,7 +1013,7 @@ int32_t tqProcessTaskDropReq(STQ* pTq, char* msg, int32_t msgLen) {
}
int32_t tqProcessTaskUpdateCheckpointReq(STQ* pTq, char* msg, int32_t msgLen) {
return tqStreamTaskProcessUpdateCheckpointReq(pTq->pStreamMeta, msg, msgLen);
return tqStreamTaskProcessUpdateCheckpointReq(pTq->pStreamMeta, pTq->pVnode->restored, msg, msgLen);
}
int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {

View File

@ -640,7 +640,7 @@ int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen
return 0;
}
int32_t tqStreamTaskProcessUpdateCheckpointReq(SStreamMeta* pMeta, char* msg, int32_t msgLen) {
int32_t tqStreamTaskProcessUpdateCheckpointReq(SStreamMeta* pMeta, bool restored, char* msg, int32_t msgLen) {
SVUpdateCheckpointInfoReq* pReq = (SVUpdateCheckpointInfoReq*)msg;
int32_t vgId = pMeta->vgId;
@ -652,13 +652,14 @@ int32_t tqStreamTaskProcessUpdateCheckpointReq(SStreamMeta* pMeta, char* msg, in
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
if (ppTask != NULL && (*ppTask) != NULL) {
streamTaskUpdateTaskCheckpointInfo(*ppTask, pReq);
streamTaskUpdateTaskCheckpointInfo(*ppTask, restored, pReq);
} else { // failed to get the task.
tqError("vgId:%d failed to locate the s-task:0x%x to update the checkpoint info, it may have been dropped already",
vgId, pReq->taskId);
}
streamMetaWUnLock(pMeta);
// always return success when handling the requirement issued by mnode during transaction.
return TSDB_CODE_SUCCESS;
}
@ -853,7 +854,6 @@ int32_t tqStreamTaskProcessTaskResetReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
} else if (pState->state == TASK_STATUS__UNINIT) {
tqDebug("s-task:%s start task by checking downstream tasks", pTask->id.idStr);
ASSERT(pTask->status.downstreamReady == 0);
// /*int32_t ret = */ streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_INIT);
tqStreamStartOneTaskAsync(pMeta, pTask->pMsgCb, pTask->id.streamId, pTask->id.taskId);
} else {
tqDebug("s-task:%s status:%s do nothing after receiving reset-task from mnode", pTask->id.idStr, pState->name);
@ -883,7 +883,7 @@ int32_t tqStreamTaskProcessRetrieveTriggerReq(SStreamMeta* pMeta, SRpcMsg* pMsg)
tqError("s-task:%s not ready for checkpoint-trigger retrieve from 0x%x, since downstream not ready",
pTask->id.idStr, (int32_t)pReq->downstreamTaskId);
streamTaskSendCheckpointTriggerMsg(pTask, pReq->downstreamTaskId, &pMsg->info, TSDB_CODE_STREAM_TASK_IVLD_STATUS);
streamTaskSendCheckpointTriggerMsg(pTask, pReq->downstreamTaskId, pReq->downstreamNodeId, &pMsg->info, TSDB_CODE_STREAM_TASK_IVLD_STATUS);
streamMetaReleaseTask(pMeta, pTask);
return TSDB_CODE_SUCCESS;
@ -901,7 +901,7 @@ int32_t tqStreamTaskProcessRetrieveTriggerReq(SStreamMeta* pMeta, SRpcMsg* pMsg)
// re-send the lost checkpoint-trigger msg to downstream task
tqDebug("s-task:%s re-send checkpoint-trigger to:0x%x, checkpointId:%" PRId64 ", transId:%d", pTask->id.idStr,
(int32_t)pReq->downstreamTaskId, checkpointId, transId);
streamTaskSendCheckpointTriggerMsg(pTask, pReq->downstreamTaskId, &pMsg->info, TSDB_CODE_SUCCESS);
streamTaskSendCheckpointTriggerMsg(pTask, pReq->downstreamTaskId, pReq->downstreamNodeId, &pMsg->info, TSDB_CODE_SUCCESS);
} else { // not send checkpoint-trigger yet, wait
int32_t recv = 0, total = 0;
streamTaskGetTriggerRecvStatus(pTask, &recv, &total);
@ -914,7 +914,7 @@ int32_t tqStreamTaskProcessRetrieveTriggerReq(SStreamMeta* pMeta, SRpcMsg* pMsg)
"sending checkpoint-source/trigger",
pTask->id.idStr, recv, total);
}
streamTaskSendCheckpointTriggerMsg(pTask, pReq->downstreamTaskId, &pMsg->info, TSDB_CODE_ACTION_IN_PROGRESS);
streamTaskSendCheckpointTriggerMsg(pTask, pReq->downstreamTaskId, pReq->downstreamNodeId, &pMsg->info, TSDB_CODE_ACTION_IN_PROGRESS);
}
} else { // upstream not recv the checkpoint-source/trigger till now
ASSERT(pState->state == TASK_STATUS__READY || pState->state == TASK_STATUS__HALT);
@ -922,7 +922,7 @@ int32_t tqStreamTaskProcessRetrieveTriggerReq(SStreamMeta* pMeta, SRpcMsg* pMsg)
"s-task:%s not recv checkpoint-source from mnode or checkpoint-trigger from upstream yet, wait for all "
"upstream sending checkpoint-source/trigger",
pTask->id.idStr);
streamTaskSendCheckpointTriggerMsg(pTask, pReq->downstreamTaskId, &pMsg->info, TSDB_CODE_ACTION_IN_PROGRESS);
streamTaskSendCheckpointTriggerMsg(pTask, pReq->downstreamTaskId, pReq->downstreamNodeId, &pMsg->info, TSDB_CODE_ACTION_IN_PROGRESS);
}
streamMetaReleaseTask(pMeta, pTask);
@ -998,8 +998,18 @@ static int32_t tqProcessTaskResumeImpl(void* handle, SStreamTask* pTask, int64_t
int32_t level = pTask->info.taskLevel;
if (level == TASK_LEVEL__SINK) {
if (status == TASK_STATUS__UNINIT) {
}
ASSERT (status != TASK_STATUS__UNINIT); /*{
// tqDebug("s-task:%s initialize the uninit sink stream task after resume from pause", pTask->id.idStr);
//
// if (pTask->pBackend == NULL) { // TODO: add test cases for this
// int32_t code = pMeta->expandTaskFn(pTask);
// if (code != TSDB_CODE_SUCCESS) {
// tqError("s-task:%s vgId:%d failed to expand stream backend", pTask->id.idStr, vgId);
// streamMetaAddFailedTaskSelf(pTask, pTask->execInfo.readyTs);
// }
// }
// int32_t ret = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_INIT);
}*/
streamMetaReleaseTask(pMeta, pTask);
return 0;
}
@ -1025,11 +1035,21 @@ static int32_t tqProcessTaskResumeImpl(void* handle, SStreamTask* pTask, int64_t
} else {
streamTrySchedExec(pTask);
}
} else if (status == TASK_STATUS__UNINIT) {
// todo: fill-history task init ?
if (pTask->info.fillHistory == 0) {
streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_INIT);
}
} else {
ASSERT (status != TASK_STATUS__UNINIT);// { // todo: fill-history task init ?
// if (pTask->info.fillHistory == 0) {
// tqDebug("s-task:%s initialize the uninit task after resume from pause", pTask->id.idStr);
//
// if (pTask->pBackend == NULL) { // TODO: add test cases for this
// int32_t code = pMeta->expandTaskFn(pTask);
// if (code != TSDB_CODE_SUCCESS) {
// tqError("s-task:%s vgId:%d failed to expand stream backend", pTask->id.idStr, vgId);
// streamMetaAddFailedTaskSelf(pTask, pTask->execInfo.readyTs);
// }
// }
// int32_t ret = */streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_INIT);
// }
// }
}
streamMetaReleaseTask(pMeta, pTask);

View File

@ -257,7 +257,8 @@ int32_t bkdMgtGetDelta(SBkdMgt* bm, char* taskId, int64_t chkpId, SArray* list,
int32_t bkdMgtDumpTo(SBkdMgt* bm, char* taskId, char* dname);
void bkdMgtDestroy(SBkdMgt* bm);
int32_t taskDbGenChkpUploadData(void* arg, void* bkdMgt, int64_t chkpId, int8_t type, char** path, SArray* list);
int32_t taskDbGenChkpUploadData(void* arg, void* bkdMgt, int64_t chkpId, int8_t type, char** path, SArray* list,
const char* id);
void* taskAcquireDb(int64_t refId);
void taskReleaseDb(int64_t refId);

View File

@ -194,7 +194,7 @@ void streamTaskSetRetryInfoForLaunch(SHistoryTaskInfo* pInfo);
int32_t streamTaskResetTimewindowFilter(SStreamTask* pTask);
void streamTaskClearActiveInfo(SActiveCheckpointInfo* pInfo);
void streamClearChkptReadyMsg(SStreamTask* pTask);
void streamClearChkptReadyMsg(SActiveCheckpointInfo* pActiveInfo);
EExtractDataCode streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks,
int32_t* blockSize);
int32_t streamQueueItemGetSize(const SStreamQueueItem* pItem);

View File

@ -2053,7 +2053,11 @@ STaskDbWrapper* taskDbOpenImpl(const char* key, char* statePath, char* dbPath) {
stInfo("%s newly create db in state-backend", key);
// pre create db
pTaskDb->db = rocksdb_open(pTaskDb->pCfOpts[0], dbPath, &err);
if (pTaskDb->db == NULL) goto _EXIT;
if (pTaskDb->db == NULL) {
stError("%s open state-backend failed, reason:%s", key, err);
goto _EXIT;
}
rocksdb_close(pTaskDb->db);
if (cfNames != NULL) {
@ -2181,7 +2185,6 @@ void taskDbDestroy(void* pDb, bool flush) {
void taskDbDestroy2(void* pDb) { taskDbDestroy(pDb, true); }
int32_t taskDbGenChkpUploadData__rsync(STaskDbWrapper* pDb, int64_t chkpId, char** path) {
int64_t st = taosGetTimestampMs();
int32_t code = -1;
int64_t refId = pDb->refId;
@ -2202,15 +2205,15 @@ int32_t taskDbGenChkpUploadData__rsync(STaskDbWrapper* pDb, int64_t chkpId, char
return code;
}
int32_t taskDbGenChkpUploadData__s3(STaskDbWrapper* pDb, void* bkdChkpMgt, int64_t chkpId, char** path, SArray* list) {
int32_t taskDbGenChkpUploadData__s3(STaskDbWrapper* pDb, void* bkdChkpMgt, int64_t chkpId, char** path, SArray* list, const char* idStr) {
int32_t code = 0;
SBkdMgt* p = (SBkdMgt*)bkdChkpMgt;
char* temp = taosMemoryCalloc(1, strlen(pDb->path) + 32);
sprintf(temp, "%s%s%s%" PRId64 "", pDb->path, TD_DIRSEP, "tmp", chkpId);
sprintf(temp, "%s%s%s%" PRId64, pDb->path, TD_DIRSEP, "tmp", chkpId);
if (taosDirExist(temp)) {
cleanDir(temp, "");
cleanDir(temp, idStr);
} else {
taosMkDir(temp);
}
@ -2220,7 +2223,8 @@ int32_t taskDbGenChkpUploadData__s3(STaskDbWrapper* pDb, void* bkdChkpMgt, int64
return code;
}
int32_t taskDbGenChkpUploadData(void* arg, void* mgt, int64_t chkpId, int8_t type, char** path, SArray* list) {
int32_t taskDbGenChkpUploadData(void* arg, void* mgt, int64_t chkpId, int8_t type, char** path, SArray* list, const char* idStr) {
int32_t code = -1;
STaskDbWrapper* pDb = arg;
ECHECKPOINT_BACKUP_TYPE utype = type;
@ -2229,7 +2233,7 @@ int32_t taskDbGenChkpUploadData(void* arg, void* mgt, int64_t chkpId, int8_t typ
if (utype == DATA_UPLOAD_RSYNC) {
code = taskDbGenChkpUploadData__rsync(pDb, chkpId, path);
} else if (utype == DATA_UPLOAD_S3) {
code = taskDbGenChkpUploadData__s3(pDb, mgt, chkpId, path, list);
code = taskDbGenChkpUploadData__s3(pDb, mgt, chkpId, path, list, idStr);
}
taskDbUnRefChkp(pDb, chkpId);
return code;

View File

@ -58,8 +58,7 @@ int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_
}
if (pInfo->stage < stage) {
stError("s-task:%s receive check msg from upstream task:0x%x(vgId:%d), new stage received:%" PRId64
", prev:%" PRId64,
stError("s-task:%s receive check msg from upstream task:0x%x(vgId:%d), new stage received:%" PRId64 ", prev:%" PRId64,
id, upstreamTaskId, vgId, stage, pInfo->stage);
// record the checkpoint failure id and sent to mnode
taosThreadMutexLock(&pTask->lock);
@ -170,13 +169,13 @@ void streamTaskProcessCheckMsg(SStreamMeta* pMeta, SStreamTaskCheckReq* pReq, SS
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, taskId);
if (pTask != NULL) {
pRsp->status = streamTaskCheckStatus(pTask, pReq->upstreamTaskId, pReq->upstreamNodeId, pReq->stage, &pRsp->oldStage);
streamMetaReleaseTask(pMeta, pTask);
SStreamTaskState* pState = streamTaskGetStatus(pTask);
stDebug("s-task:%s status:%s, stage:%" PRId64 " recv task check req(reqId:0x%" PRIx64
") task:0x%x (vgId:%d), check_status:%d",
pTask->id.idStr, pState->name, pRsp->oldStage, pRsp->reqId, pRsp->upstreamTaskId, pRsp->upstreamNodeId,
pRsp->status);
streamMetaReleaseTask(pMeta, pTask);
} else {
pRsp->status = TASK_DOWNSTREAM_NOT_READY;
stDebug("tq recv task check(taskId:0x%" PRIx64 "-0x%x not built yet) req(reqId:0x%" PRIx64

View File

@ -18,16 +18,6 @@
#include "streamBackendRocksdb.h"
#include "streamInt.h"
typedef struct {
ECHECKPOINT_BACKUP_TYPE type;
char* taskId;
int64_t chkpId;
SStreamTask* pTask;
int64_t dbRefId;
void* pMeta;
} SAsyncUploadArg;
static int32_t downloadCheckpointDataByName(const char* id, const char* fname, const char* dstName);
static int32_t deleteCheckpointFile(const char* id, const char* name);
static int32_t streamTaskUploadCheckpoint(const char* id, const char* path);
@ -114,8 +104,15 @@ int32_t streamTaskProcessCheckpointTriggerRsp(SStreamTask* pTask, SCheckpointTri
return TSDB_CODE_SUCCESS;
}
int32_t streamTaskSendCheckpointTriggerMsg(SStreamTask* pTask, int32_t dstTaskId, SRpcHandleInfo* pRpcInfo, int32_t code) {
SCheckpointTriggerRsp* pRsp = rpcMallocCont(sizeof(SCheckpointTriggerRsp));
int32_t streamTaskSendCheckpointTriggerMsg(SStreamTask* pTask, int32_t dstTaskId, int32_t downstreamNodeId,
SRpcHandleInfo* pRpcInfo, int32_t code) {
int32_t size = sizeof(SMsgHead) + sizeof(SCheckpointTriggerRsp);
void* pBuf = rpcMallocCont(size);
SCheckpointTriggerRsp* pRsp = POINTER_SHIFT(pBuf, sizeof(SMsgHead));
((SMsgHead*)pBuf)->vgId = htonl(downstreamNodeId);
pRsp->streamId = pTask->id.streamId;
pRsp->upstreamTaskId = pTask->id.taskId;
pRsp->taskId = dstTaskId;
@ -130,7 +127,7 @@ int32_t streamTaskSendCheckpointTriggerMsg(SStreamTask* pTask, int32_t dstTaskId
pRsp->rspCode = code;
SRpcMsg rspMsg = {.code = 0, .pCont = pRsp, .contLen = sizeof(SCheckpointTriggerRsp), .info = *pRpcInfo};
SRpcMsg rspMsg = {.code = 0, .pCont = pRsp, .contLen = size, .info = *pRpcInfo};
tmsgSendRsp(&rspMsg);
return 0;
}
@ -408,11 +405,11 @@ void streamTaskClearCheckInfo(SStreamTask* pTask, bool clearChkpReadyMsg) {
streamTaskClearActiveInfo(pTask->chkInfo.pActiveInfo);
streamTaskOpenAllUpstreamInput(pTask); // open inputQ for all upstream tasks
if (clearChkpReadyMsg) {
streamClearChkptReadyMsg(pTask);
streamClearChkptReadyMsg(pTask->chkInfo.pActiveInfo);
}
}
int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, SVUpdateCheckpointInfoReq* pReq) {
int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SVUpdateCheckpointInfoReq* pReq) {
SStreamMeta* pMeta = pTask->pMeta;
int32_t vgId = pMeta->vgId;
int32_t code = 0;
@ -429,7 +426,7 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, SVUpdateCheckpoin
pReq->transId);
taosThreadMutexUnlock(&pTask->lock);
{ // destroy the related fill-history tasks
{ // destroy the related fill-history tasks
// drop task should not in the meta-lock, and drop the related fill-history task now
streamMetaWUnLock(pMeta);
if (pReq->dropRelHTask) {
@ -446,34 +443,42 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, SVUpdateCheckpoin
SStreamTaskState* pStatus = streamTaskGetStatus(pTask);
stDebug("s-task:%s vgId:%d status:%s start to update the checkpoint info, checkpointId:%" PRId64 "->%" PRId64
" checkpointVer:%" PRId64 "->%" PRId64 " checkpointTs:%" PRId64 "->%" PRId64,
id, vgId, pStatus->name, pInfo->checkpointId, pReq->checkpointId, pInfo->checkpointVer, pReq->checkpointVer,
pInfo->checkpointTime, pReq->checkpointTs);
if (pStatus->state != TASK_STATUS__DROPPING) {
ASSERT(pInfo->checkpointId <= pReq->checkpointId && pInfo->checkpointVer <= pReq->checkpointVer);
pInfo->checkpointId = pReq->checkpointId;
pInfo->checkpointVer = pReq->checkpointVer;
pInfo->checkpointTime = pReq->checkpointTs;
streamTaskClearCheckInfo(pTask, false);
// todo handle error
if (pStatus->state == TASK_STATUS__CK) {
code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_CHECKPOINT_DONE);
} else {
stDebug("s-task:0x%x vgId:%d not handle checkpoint-done event, status:%s", pReq->taskId, vgId, pStatus->name);
}
} else {
stDebug("s-task:0x%x vgId:%d status:%s not update checkpoint info, checkpointId:%" PRId64 "->%" PRId64 " failed",
pReq->taskId, vgId, pStatus->name, pInfo->checkpointId, pReq->checkpointId);
if ((!restored) && (pStatus->state != TASK_STATUS__CK)) {
stDebug("s-task:0x%x vgId:%d restored:%d status:%s not update checkpoint-info, checkpointId:%" PRId64 "->%" PRId64
" failed",
pReq->taskId, vgId, restored, pStatus->name, pInfo->checkpointId, pReq->checkpointId);
taosThreadMutexUnlock(&pTask->lock);
return TSDB_CODE_STREAM_TASK_IVLD_STATUS;
}
if (!restored) { // during restore procedure, do update checkpoint-info
stDebug("s-task:%s vgId:%d status:%s update the checkpoint-info during restore, checkpointId:%" PRId64 "->%" PRId64
" checkpointVer:%" PRId64 "->%" PRId64 " checkpointTs:%" PRId64 "->%" PRId64,
id, vgId, pStatus->name, pInfo->checkpointId, pReq->checkpointId, pInfo->checkpointVer, pReq->checkpointVer,
pInfo->checkpointTime, pReq->checkpointTs);
} else { // not in restore status, must be in checkpoint status
stDebug("s-task:%s vgId:%d status:%s start to update the checkpoint-info, checkpointId:%" PRId64 "->%" PRId64
" checkpointVer:%" PRId64 "->%" PRId64 " checkpointTs:%" PRId64 "->%" PRId64,
id, vgId, pStatus->name, pInfo->checkpointId, pReq->checkpointId, pInfo->checkpointVer, pReq->checkpointVer,
pInfo->checkpointTime, pReq->checkpointTs);
}
ASSERT(pInfo->checkpointId <= pReq->checkpointId && pInfo->checkpointVer <= pReq->checkpointVer &&
pInfo->processedVer <= pReq->checkpointVer);
pInfo->checkpointId = pReq->checkpointId;
pInfo->checkpointVer = pReq->checkpointVer;
pInfo->checkpointTime = pReq->checkpointTs;
streamTaskClearCheckInfo(pTask, true);
if (pStatus->state == TASK_STATUS__CK) {
// todo handle error
code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_CHECKPOINT_DONE);
} else {
stDebug("s-task:0x%x vgId:%d not handle checkpoint-done event, status:%s", pReq->taskId, vgId, pStatus->name);
}
if (pReq->dropRelHTask) {
stDebug("s-task:0x%x vgId:%d drop the related fill-history task:0x%" PRIx64 " after update checkpoint",
pReq->taskId, vgId, pReq->hTaskId);
@ -562,78 +567,70 @@ static int32_t getCheckpointDataMeta(const char* id, const char* path, SArray* l
return code;
}
int32_t uploadCheckpointData(void* param) {
SAsyncUploadArg* pParam = param;
int32_t uploadCheckpointData(SStreamTask* pTask, int64_t checkpointId, int64_t dbRefId, ECHECKPOINT_BACKUP_TYPE type) {
char* path = NULL;
int32_t code = 0;
SArray* toDelFiles = taosArrayInit(4, POINTER_BYTES);
char* taskStr = pParam->taskId ? pParam->taskId : "NULL";
int64_t now = taosGetTimestampMs();
SStreamMeta* pMeta = pTask->pMeta;
const char* idStr = pTask->id.idStr;
void* pBackend = taskAcquireDb(pParam->dbRefId);
if (pBackend == NULL) {
stError("s-task:%s failed to acquire db", taskStr);
taosMemoryFree(pParam->taskId);
taosMemoryFree(pParam);
return -1;
if ((code = taskDbGenChkpUploadData(pTask->pBackend, pMeta->bkdChkptMgt, checkpointId, type, &path, toDelFiles,
pTask->id.idStr)) != 0) {
stError("s-task:%s failed to gen upload checkpoint:%" PRId64, idStr, checkpointId);
}
if ((code = taskDbGenChkpUploadData(pParam->pTask->pBackend, ((SStreamMeta*)pParam->pMeta)->bkdChkptMgt,
pParam->chkpId, (int8_t)(pParam->type), &path, toDelFiles)) != 0) {
stError("s-task:%s failed to gen upload checkpoint:%" PRId64, taskStr, pParam->chkpId);
}
if (pParam->type == DATA_UPLOAD_S3) {
if (code == 0 && (code = getCheckpointDataMeta(pParam->taskId, path, toDelFiles)) != 0) {
stError("s-task:%s failed to get checkpointData for checkpointId:%" PRId64 " meta", taskStr, pParam->chkpId);
if (type == DATA_UPLOAD_S3) {
if (code == TSDB_CODE_SUCCESS && (code = getCheckpointDataMeta(idStr, path, toDelFiles)) != 0) {
stError("s-task:%s failed to get checkpointData for checkpointId:%" PRId64 " meta", idStr, checkpointId);
}
}
if (code == TSDB_CODE_SUCCESS) {
code = streamTaskUploadCheckpoint(pParam->taskId, path);
code = streamTaskUploadCheckpoint(idStr, path);
if (code == TSDB_CODE_SUCCESS) {
stDebug("s-task:%s upload checkpointId:%" PRId64 " to remote succ", taskStr, pParam->chkpId);
stDebug("s-task:%s upload checkpointId:%" PRId64 " to remote succ", idStr, checkpointId);
} else {
stError("s-task:%s failed to upload checkpointId:%" PRId64 " data:%s", taskStr, pParam->chkpId, path);
stError("s-task:%s failed to upload checkpointId:%" PRId64 " data:%s", idStr, checkpointId, path);
}
}
taskReleaseDb(pParam->dbRefId);
if (code == 0) {
if (code == TSDB_CODE_SUCCESS) {
int32_t size = taosArrayGetSize(toDelFiles);
stDebug("s-task:%s remove redundant %d files", taskStr, size);
stDebug("s-task:%s remove redundant %d files", idStr, size);
for (int i = 0; i < size; i++) {
char* pName = taosArrayGetP(toDelFiles, i);
code = deleteCheckpointFile(pParam->taskId, pName);
code = deleteCheckpointFile(idStr, pName);
if (code != 0) {
stDebug("s-task:%s failed to del file: %s", taskStr, pName);
stDebug("s-task:%s failed to remove file: %s", idStr, pName);
break;
}
}
stDebug("s-task:%s remove redundant files done", taskStr);
stDebug("s-task:%s remove redundant files in uploading checkpointId:%" PRId64 " data", idStr, checkpointId);
}
taosArrayDestroyP(toDelFiles, taosMemoryFree);
double el = (taosGetTimestampMs() - now) / 1000.0;
if (code == TSDB_CODE_SUCCESS) {
stDebug("s-task:%s remove local checkpointId:%" PRId64 " data %s", taskStr, pParam->chkpId, path);
stDebug("s-task:%s complete update checkpointId:%" PRId64 ", elapsed time:%.2fs remove local checkpoint data %s",
idStr, checkpointId, el, path);
taosRemoveDir(path);
} else {
stDebug("s-task:%s update checkpointId:%" PRId64 " keep local checkpoint data", taskStr, pParam->chkpId);
stDebug("s-task:%s failed to upload checkpointId:%" PRId64 " keep local checkpoint data, elapsed time:%.2fs",
idStr, checkpointId, el);
}
taosMemoryFree(path);
taosMemoryFree(pParam->taskId);
taosMemoryFree(pParam);
return code;
}
int32_t streamTaskRemoteBackupCheckpoint(SStreamTask* pTask, int64_t checkpointId, char* taskId) {
int32_t streamTaskRemoteBackupCheckpoint(SStreamTask* pTask, int64_t checkpointId) {
ECHECKPOINT_BACKUP_TYPE type = streamGetCheckpointBackupType();
if (type == DATA_UPLOAD_DISABLE) {
stDebug("s-task:%s not allowed to upload checkpoint data", pTask->id.idStr);
return 0;
}
@ -641,15 +638,17 @@ int32_t streamTaskRemoteBackupCheckpoint(SStreamTask* pTask, int64_t checkpointI
return 0;
}
SAsyncUploadArg* arg = taosMemoryCalloc(1, sizeof(SAsyncUploadArg));
arg->type = type;
arg->taskId = taosStrdup(taskId);
arg->chkpId = checkpointId;
arg->pTask = pTask;
arg->dbRefId = taskGetDBRef(pTask->pBackend);
arg->pMeta = pTask->pMeta;
int64_t dbRefId = taskGetDBRef(pTask->pBackend);
void* pBackend = taskAcquireDb(dbRefId);
if (pBackend == NULL) {
stError("s-task:%s failed to acquire db during update checkpoint data, failed to upload checkpointData", pTask->id.idStr);
return -1;
}
return streamMetaAsyncExec(pTask->pMeta, uploadCheckpointData, arg, NULL);
int32_t code = uploadCheckpointData(pTask, checkpointId, taskGetDBRef(pTask->pBackend), type);
taskReleaseDb(dbRefId);
return code;
}
int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) {
@ -670,6 +669,7 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) {
}
}
// TODO: monitoring the checkpoint-source msg
// send check point response to upstream task
if (code == TSDB_CODE_SUCCESS) {
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
@ -679,38 +679,39 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) {
}
if (code != TSDB_CODE_SUCCESS) {
// todo: let's retry send rsp to upstream/mnode
// todo: let's retry send rsp to mnode, checkpoint-ready has monitor now
stError("s-task:%s failed to send checkpoint rsp to upstream, checkpointId:%" PRId64 ", code:%s", id, ckId,
tstrerror(code));
}
}
// update the latest checkpoint info if all works are done successfully, for rsma, the pMsgCb is null.
if (code == TSDB_CODE_SUCCESS && (pTask->pMsgCb != NULL)) {
code = streamSendChkptReportMsg(pTask, &pTask->chkInfo, dropRelHTask);
if (code == TSDB_CODE_SUCCESS) {
code = streamTaskRemoteBackupCheckpoint(pTask, ckId, (char*)id);
if (code != TSDB_CODE_SUCCESS) {
stError("s-task:%s failed to upload checkpoint:%" PRId64 " failed", id, ckId);
}
} else {
stError("s-task:%s commit taskInfo failed, checkpoint:%" PRId64 " failed, code:%s", id, ckId, tstrerror(code));
if (code == TSDB_CODE_SUCCESS) {
code = streamTaskRemoteBackupCheckpoint(pTask, ckId);
if (code != TSDB_CODE_SUCCESS) {
stError("s-task:%s upload checkpointId:%" PRId64 " data failed, code:%s", id, ckId, tstrerror(code));
}
} else {
stError("s-task:%s taskInfo failed, checkpoint:%" PRId64 " failed, code:%s", id, ckId, tstrerror(code));
}
// clear the checkpoint info if failed
if (code != TSDB_CODE_SUCCESS) {
// TODO: monitoring the checkpoint-report msg
// update the latest checkpoint info if all works are done successfully, for rsma, the pMsgCb is null.
if (code == TSDB_CODE_SUCCESS) {
if (pTask->pMsgCb != NULL) {
code = streamSendChkptReportMsg(pTask, &pTask->chkInfo, dropRelHTask);
}
} else { // clear the checkpoint info if failed
taosThreadMutexLock(&pTask->lock);
streamTaskClearCheckInfo(pTask, false);
code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_CHECKPOINT_DONE);
taosThreadMutexUnlock(&pTask->lock);
code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_CHECKPOINT_DONE);
streamTaskSetFailedCheckpointId(pTask);
stDebug("s-task:%s clear checkpoint flag since gen checkpoint failed, checkpointId:%" PRId64, id, ckId);
}
double el = (taosGetTimestampMs() - startTs) / 1000.0;
stInfo("s-task:%s vgId:%d level:%d, checkpointId:%" PRId64 " ver:%" PRId64 " elapsed time:%.2f Sec, %s ", id,
stInfo("s-task:%s vgId:%d level:%d, checkpointId:%" PRId64 " ver:%" PRId64 " elapsed time:%.2fs, %s ", id,
pMeta->vgId, pTask->info.taskLevel, ckId, pTask->chkInfo.checkpointVer, el,
(code == TSDB_CODE_SUCCESS) ? "succ" : "failed");
@ -739,14 +740,13 @@ void checkpointTriggerMonitorFn(void* param, void* tmrId) {
}
pActiveInfo->checkCounter = 0;
stDebug("s-task:%s vgId:%d checkpoint-trigger monitor in tmr, ts:%" PRId64, pTask->id.idStr, vgId, now);
stDebug("s-task:%s vgId:%d checkpoint-trigger monitor in tmr, ts:%" PRId64, id, vgId, now);
taosThreadMutexLock(&pTask->lock);
SStreamTaskState* pState = streamTaskGetStatus(pTask);
if (pState->state != TASK_STATUS__CK) {
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s vgId:%d not in checkpoint status, quit from monitor checkpoint-trigger, ref:%d", pTask->id.idStr,
vgId, ref);
stDebug("s-task:%s vgId:%d not in checkpoint status, quit from monitor checkpoint-trigger, ref:%d", id, vgId, ref);
taosThreadMutexUnlock(&pTask->lock);
streamMetaReleaseTask(pTask->pMeta, pTask);
@ -756,8 +756,8 @@ void checkpointTriggerMonitorFn(void* param, void* tmrId) {
// checkpoint-trigger recv flag is set, quit
if (pActiveInfo->allUpstreamTriggerRecv) {
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s vgId:%d all checkpoint-trigger recv, quit from monitor checkpoint-trigger, ref:%d",
pTask->id.idStr, vgId, ref);
stDebug("s-task:%s vgId:%d all checkpoint-trigger recv, quit from monitor checkpoint-trigger, ref:%d", id, vgId,
ref);
taosThreadMutexUnlock(&pTask->lock);
streamMetaReleaseTask(pTask->pMeta, pTask);
@ -815,6 +815,7 @@ int32_t doSendRetrieveTriggerMsg(SStreamTask* pTask, SArray* pNotSendList) {
const char* pId = pTask->id.idStr;
int32_t size = taosArrayGetSize(pNotSendList);
int32_t numOfUpstream = streamTaskGetNumOfUpstream(pTask);
int64_t checkpointId = pTask->chkInfo.pActiveInfo->activeId;
if (size <= 0) {
stDebug("s-task:%s all upstream checkpoint trigger recved, no need to send retrieve", pId);
@ -840,15 +841,14 @@ int32_t doSendRetrieveTriggerMsg(SStreamTask* pTask, SArray* pNotSendList) {
pReq->downstreamNodeId = vgId;
pReq->upstreamTaskId = pUpstreamTask->taskId;
pReq->upstreamNodeId = pUpstreamTask->nodeId;
pReq->checkpointId = pTask->chkInfo.pActiveInfo->activeId;
pReq->checkpointId = checkpointId;
SRpcMsg rpcMsg = {0};
initRpcMsg(&rpcMsg, TDMT_STREAM_RETRIEVE_TRIGGER, pReq, sizeof(SRetrieveChkptTriggerReq));
code = tmsgSendReq(&pUpstreamTask->epSet, &rpcMsg);
stDebug("s-task:%s vgId:%d send checkpoint-trigger retrieve msg to 0x%x(vgId:%d) checkpointId:%" PRId64, pId, vgId,
pUpstreamTask->taskId, pUpstreamTask->nodeId, pReq->checkpointId);
pUpstreamTask->taskId, pUpstreamTask->nodeId, checkpointId);
}
return TSDB_CODE_SUCCESS;

View File

@ -781,7 +781,7 @@ static void checkpointReadyMsgSendMonitorFn(void* param, void* tmrId) {
// check the status every 100ms
if (streamTaskShouldStop(pTask)) {
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s vgId:%d quit from monitor checkpoint-trigger, ref:%d", id, vgId, ref);
stDebug("s-task:%s vgId:%d status:stop, quit from monitor checkpoint-trigger, ref:%d", id, vgId, ref);
streamMetaReleaseTask(pTask->pMeta, pTask);
return;
}
@ -795,6 +795,18 @@ static void checkpointReadyMsgSendMonitorFn(void* param, void* tmrId) {
pActiveInfo->sendReadyCheckCounter = 0;
stDebug("s-task:%s in sending checkpoint-ready msg monitor timer", id);
taosThreadMutexLock(&pTask->lock);
SStreamTaskState* pState = streamTaskGetStatus(pTask);
if (pState->state != TASK_STATUS__CK) {
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s vgId:%d status:%s not in checkpoint, quit from monitor checkpoint-ready send, ref:%d", id, vgId,
pState->name, ref);
taosThreadMutexUnlock(&pTask->lock);
streamMetaReleaseTask(pTask->pMeta, pTask);
return;
}
taosThreadMutexUnlock(&pTask->lock);
taosThreadMutexLock(&pActiveInfo->lock);
SArray* pList = pActiveInfo->pReadyMsgList;
@ -844,7 +856,7 @@ static void checkpointReadyMsgSendMonitorFn(void* param, void* tmrId) {
"and quit from timer, ref:%d",
id, vgId, ref);
streamClearChkptReadyMsg(pTask);
streamClearChkptReadyMsg(pActiveInfo);
taosThreadMutexUnlock(&pActiveInfo->lock);
streamMetaReleaseTask(pTask->pMeta, pTask);
}
@ -906,9 +918,9 @@ int32_t streamTaskSendCheckpointSourceRsp(SStreamTask* pTask) {
tmsgSendRsp(&pInfo->msg);
taosArrayClear(pList);
stDebug("s-task:%s level:%d source checkpoint completed msg sent to mnode", pTask->id.idStr, pTask->info.taskLevel);
stDebug("s-task:%s level:%d checkpoint-source rsp completed msg sent to mnode", pTask->id.idStr, pTask->info.taskLevel);
} else {
stDebug("s-task:%s level:%d already send rsp checkpoint success to mnode", pTask->id.idStr, pTask->info.taskLevel);
stDebug("s-task:%s level:%d already send checkpoint-source rsp success to mnode", pTask->id.idStr, pTask->info.taskLevel);
}
taosThreadMutexUnlock(&pTask->chkInfo.pActiveInfo->lock);
@ -1116,8 +1128,7 @@ int32_t streamAddCheckpointReadyMsg(SStreamTask* pTask, int32_t upstreamTaskId,
return 0;
}
void streamClearChkptReadyMsg(SStreamTask* pTask) {
SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo;
void streamClearChkptReadyMsg(SActiveCheckpointInfo* pActiveInfo) {
if (pActiveInfo == NULL) {
return;
}

View File

@ -231,7 +231,6 @@ int32_t streamMetaMayCvtDbFormat(SStreamMeta* pMeta) {
return 0;
} else if (compatible == STREAM_STATA_NEED_CONVERT) {
stInfo("vgId:%d stream state need covert backend format", pMeta->vgId);
return streamMetaCvtDbFormat(pMeta);
} else if (compatible == STREAM_STATA_NO_COMPATIBLE) {
stError(

View File

@ -240,7 +240,6 @@ int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) {
if (code == TSDB_CODE_SUCCESS) {
checkFillhistoryTaskStatus(pTask, pHisTask);
}
}
streamMetaReleaseTask(pMeta, pHisTask);

View File

@ -254,7 +254,7 @@ void tFreeStreamTask(SStreamTask* pTask) {
walCloseReader(pTask->exec.pWalReader);
}
streamClearChkptReadyMsg(pTask);
streamClearChkptReadyMsg(pTask->chkInfo.pActiveInfo);
if (pTask->msgInfo.pData != NULL) {
clearBufferedDispatchMsg(pTask);
@ -836,7 +836,7 @@ static int32_t taskPauseCallback(SStreamTask* pTask, void* param) {
SStreamMeta* pMeta = pTask->pMeta;
int32_t num = atomic_add_fetch_32(&pMeta->numOfPausedTasks, 1);
stInfo("vgId:%d s-task:%s pause stream task. pause task num:%d", pMeta->vgId, pTask->id.idStr, num);
stInfo("vgId:%d s-task:%s pause stream task. paused task num:%d", pMeta->vgId, pTask->id.idStr, num);
// in case of fill-history task, stop the tsdb file scan operation.
if (pTask->info.fillHistory == 1) {

View File

@ -623,9 +623,9 @@ void doInitStateTransferTable(void) {
taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS__HALT, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, &info);
taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, NULL);
trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, &info);
taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS__PAUSE, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, NULL);
taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS__STOP, TASK_STATUS__STOP, TASK_EVENT_PAUSE, NULL, NULL, NULL);