refactor: do some internal refactor.
This commit is contained in:
parent
46e9fe6f97
commit
7ad7ef7cfc
|
@ -13,9 +13,9 @@ extern "C" {
|
||||||
|
|
||||||
void stopRsync();
|
void stopRsync();
|
||||||
void startRsync();
|
void startRsync();
|
||||||
int uploadRsync(char* id, char* path);
|
int uploadRsync(const char* id, const char* path);
|
||||||
int downloadRsync(char* id, char* path);
|
int downloadRsync(const char* id, const char* path);
|
||||||
int deleteRsync(char* id);
|
int deleteRsync(const char* id);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -33,7 +33,6 @@ int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, SMsgCb* cb, int64_t sve
|
||||||
bool isLeader, bool restored);
|
bool isLeader, bool restored);
|
||||||
int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen);
|
int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen);
|
||||||
int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLeader);
|
int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLeader);
|
||||||
int32_t tqStreamTaskResetStatus(SStreamMeta* pMeta);
|
|
||||||
int32_t tqStartTaskCompleteCallback(SStreamMeta* pMeta);
|
int32_t tqStartTaskCompleteCallback(SStreamMeta* pMeta);
|
||||||
int32_t tqStreamTasksGetTotalNum(SStreamMeta* pMeta);
|
int32_t tqStreamTasksGetTotalNum(SStreamMeta* pMeta);
|
||||||
int32_t tqStreamTaskProcessTaskResetReq(SStreamMeta* pMeta, SRpcMsg* pMsg);
|
int32_t tqStreamTaskProcessTaskResetReq(SStreamMeta* pMeta, SRpcMsg* pMsg);
|
||||||
|
|
|
@ -554,6 +554,12 @@ typedef struct SStreamMeta {
|
||||||
void* bkdChkptMgt;
|
void* bkdChkptMgt;
|
||||||
} SStreamMeta;
|
} SStreamMeta;
|
||||||
|
|
||||||
|
typedef struct STaskUpdateEntry {
|
||||||
|
int64_t streamId;
|
||||||
|
int32_t taskId;
|
||||||
|
int32_t transId;
|
||||||
|
} STaskUpdateEntry;
|
||||||
|
|
||||||
int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo);
|
int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo);
|
||||||
int32_t tDecodeStreamEpInfo(SDecoder* pDecoder, SStreamChildEpInfo* pInfo);
|
int32_t tDecodeStreamEpInfo(SDecoder* pDecoder, SStreamChildEpInfo* pInfo);
|
||||||
|
|
||||||
|
@ -794,6 +800,8 @@ void streamTaskInputFail(SStreamTask* pTask);
|
||||||
int32_t streamExecTask(SStreamTask* pTask);
|
int32_t streamExecTask(SStreamTask* pTask);
|
||||||
int32_t streamResumeTask(SStreamTask* pTask);
|
int32_t streamResumeTask(SStreamTask* pTask);
|
||||||
int32_t streamSchedExec(SStreamTask* pTask);
|
int32_t streamSchedExec(SStreamTask* pTask);
|
||||||
|
int32_t streamTaskSchedTask(SMsgCb* pMsgCb, int32_t vgId, int64_t streamId, int32_t taskId, int32_t execType);
|
||||||
|
|
||||||
bool streamTaskShouldStop(const SStreamTask* pStatus);
|
bool streamTaskShouldStop(const SStreamTask* pStatus);
|
||||||
bool streamTaskShouldPause(const SStreamTask* pStatus);
|
bool streamTaskShouldPause(const SStreamTask* pStatus);
|
||||||
bool streamTaskIsIdle(const SStreamTask* pTask);
|
bool streamTaskIsIdle(const SStreamTask* pTask);
|
||||||
|
@ -889,6 +897,8 @@ int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId,
|
||||||
int64_t endTs, bool ready);
|
int64_t endTs, bool ready);
|
||||||
int32_t streamMetaResetTaskStatus(SStreamMeta* pMeta);
|
int32_t streamMetaResetTaskStatus(SStreamMeta* pMeta);
|
||||||
int32_t streamMetaAddFailedTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId);
|
int32_t streamMetaAddFailedTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId);
|
||||||
|
void streamMetaAddIntoUpdateTaskList(SStreamMeta* pMeta, SStreamTask* pTask, SStreamTask* pHTask, int32_t transId,
|
||||||
|
int64_t startTs);
|
||||||
|
|
||||||
void streamMetaRLock(SStreamMeta* pMeta);
|
void streamMetaRLock(SStreamMeta* pMeta);
|
||||||
void streamMetaRUnLock(SStreamMeta* pMeta);
|
void streamMetaRUnLock(SStreamMeta* pMeta);
|
||||||
|
|
|
@ -9,7 +9,7 @@
|
||||||
#define ERRNO_ERR_DATA errno,strerror(errno)
|
#define ERRNO_ERR_DATA errno,strerror(errno)
|
||||||
|
|
||||||
// deleteRsync function produce empty directories, traverse base directory to remove them
|
// deleteRsync function produce empty directories, traverse base directory to remove them
|
||||||
static void removeEmptyDir(){
|
static void removeEmptyDir() {
|
||||||
TdDirPtr pDir = taosOpenDir(tsCheckpointBackupDir);
|
TdDirPtr pDir = taosOpenDir(tsCheckpointBackupDir);
|
||||||
if (pDir == NULL) return;
|
if (pDir == NULL) return;
|
||||||
|
|
||||||
|
@ -53,7 +53,7 @@ static void changeDirFromWindowsToLinux(char* from, char* to){
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
static int generateConfigFile(char* confDir){
|
static int generateConfigFile(char* confDir) {
|
||||||
TdFilePtr pFile = taosOpenFile(confDir, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
|
TdFilePtr pFile = taosOpenFile(confDir, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
|
||||||
if (pFile == NULL) {
|
if (pFile == NULL) {
|
||||||
uError("[rsync] open conf file error, dir:%s,"ERRNO_ERR_FORMAT, confDir, ERRNO_ERR_DATA);
|
uError("[rsync] open conf file error, dir:%s,"ERRNO_ERR_FORMAT, confDir, ERRNO_ERR_DATA);
|
||||||
|
@ -111,7 +111,7 @@ static int execCommand(char* command){
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
void stopRsync(){
|
void stopRsync() {
|
||||||
int code =
|
int code =
|
||||||
#ifdef WINDOWS
|
#ifdef WINDOWS
|
||||||
system("taskkill /f /im rsync.exe");
|
system("taskkill /f /im rsync.exe");
|
||||||
|
@ -125,7 +125,7 @@ void stopRsync(){
|
||||||
uDebug("[rsync] stop rsync server successful");
|
uDebug("[rsync] stop rsync server successful");
|
||||||
}
|
}
|
||||||
|
|
||||||
void startRsync(){
|
void startRsync() {
|
||||||
if(taosMulMkDir(tsCheckpointBackupDir) != 0){
|
if(taosMulMkDir(tsCheckpointBackupDir) != 0){
|
||||||
uError("[rsync] build checkpoint backup dir failed, dir:%s,"ERRNO_ERR_FORMAT, tsCheckpointBackupDir, ERRNO_ERR_DATA);
|
uError("[rsync] build checkpoint backup dir failed, dir:%s,"ERRNO_ERR_FORMAT, tsCheckpointBackupDir, ERRNO_ERR_DATA);
|
||||||
return;
|
return;
|
||||||
|
@ -151,7 +151,7 @@ void startRsync(){
|
||||||
uDebug("[rsync] start server successful");
|
uDebug("[rsync] start server successful");
|
||||||
}
|
}
|
||||||
|
|
||||||
int uploadRsync(char* id, char* path){
|
int uploadRsync(const char* id, const char* path) {
|
||||||
#ifdef WINDOWS
|
#ifdef WINDOWS
|
||||||
char pathTransform[PATH_MAX] = {0};
|
char pathTransform[PATH_MAX] = {0};
|
||||||
changeDirFromWindowsToLinux(path, pathTransform);
|
changeDirFromWindowsToLinux(path, pathTransform);
|
||||||
|
@ -188,7 +188,7 @@ int uploadRsync(char* id, char* path){
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int downloadRsync(char* id, char* path){
|
int downloadRsync(const char* id, const char* path) {
|
||||||
#ifdef WINDOWS
|
#ifdef WINDOWS
|
||||||
char pathTransform[PATH_MAX] = {0};
|
char pathTransform[PATH_MAX] = {0};
|
||||||
changeDirFromWindowsToLinux(path, pathTransform);
|
changeDirFromWindowsToLinux(path, pathTransform);
|
||||||
|
@ -212,7 +212,7 @@ int downloadRsync(char* id, char* path){
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int deleteRsync(char* id){
|
int deleteRsync(const char* id) {
|
||||||
char* tmp = "./tmp_empty/";
|
char* tmp = "./tmp_empty/";
|
||||||
int code = taosMkDir(tmp);
|
int code = taosMkDir(tmp);
|
||||||
if(code != 0){
|
if(code != 0){
|
||||||
|
|
|
@ -149,27 +149,13 @@ int32_t tqScanWalAsync(STQ* pTq, bool ckPause) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq));
|
|
||||||
if (pRunReq == NULL) {
|
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
tqError("vgId:%d failed to create msg to start wal scanning to launch stream tasks, code:%s", vgId, terrstr());
|
|
||||||
streamMetaWUnLock(pMeta);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
tqDebug("vgId:%d create msg to start wal scan to launch stream tasks, numOfTasks:%d, vnd restored:%d", vgId,
|
tqDebug("vgId:%d create msg to start wal scan to launch stream tasks, numOfTasks:%d, vnd restored:%d", vgId,
|
||||||
numOfTasks, alreadyRestored);
|
numOfTasks, alreadyRestored);
|
||||||
|
|
||||||
pRunReq->head.vgId = vgId;
|
int32_t code = streamTaskSchedTask(&pTq->pVnode->msgCb, vgId, 0, 0, STREAM_EXEC_T_EXTRACT_WAL_DATA);
|
||||||
pRunReq->streamId = 0;
|
|
||||||
pRunReq->taskId = 0;
|
|
||||||
pRunReq->reqType = STREAM_EXEC_T_EXTRACT_WAL_DATA;
|
|
||||||
|
|
||||||
SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)};
|
|
||||||
tmsgPutToQueue(&pTq->pVnode->msgCb, STREAM_QUEUE, &msg);
|
|
||||||
streamMetaWUnLock(pMeta);
|
streamMetaWUnLock(pMeta);
|
||||||
|
|
||||||
return 0;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqStopStreamTasksAsync(STQ* pTq) {
|
int32_t tqStopStreamTasksAsync(STQ* pTq) {
|
||||||
|
|
|
@ -17,12 +17,6 @@
|
||||||
#include "tq.h"
|
#include "tq.h"
|
||||||
#include "tstream.h"
|
#include "tstream.h"
|
||||||
|
|
||||||
typedef struct STaskUpdateEntry {
|
|
||||||
int64_t streamId;
|
|
||||||
int32_t taskId;
|
|
||||||
int32_t transId;
|
|
||||||
} STaskUpdateEntry;
|
|
||||||
|
|
||||||
typedef struct SMStreamCheckpointReadyRspMsg {
|
typedef struct SMStreamCheckpointReadyRspMsg {
|
||||||
SMsgHead head;
|
SMsgHead head;
|
||||||
} SMStreamCheckpointReadyRspMsg;
|
} SMStreamCheckpointReadyRspMsg;
|
||||||
|
@ -116,22 +110,10 @@ int32_t tqStreamTaskStartAsync(SStreamMeta* pMeta, SMsgCb* cb, bool restart) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq));
|
|
||||||
if (pRunReq == NULL) {
|
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
tqError("vgId:%d failed to create msg to start wal scanning to launch stream tasks, code:%s", vgId, terrstr());
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
tqDebug("vgId:%d start all %d stream task(s) async", vgId, numOfTasks);
|
tqDebug("vgId:%d start all %d stream task(s) async", vgId, numOfTasks);
|
||||||
pRunReq->head.vgId = vgId;
|
|
||||||
pRunReq->streamId = 0;
|
|
||||||
pRunReq->taskId = 0;
|
|
||||||
pRunReq->reqType = restart ? STREAM_EXEC_T_RESTART_ALL_TASKS : STREAM_EXEC_T_START_ALL_TASKS;
|
|
||||||
|
|
||||||
SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)};
|
int32_t type = restart ? STREAM_EXEC_T_RESTART_ALL_TASKS : STREAM_EXEC_T_START_ALL_TASKS;
|
||||||
tmsgPutToQueue(cb, STREAM_QUEUE, &msg);
|
return streamTaskSchedTask(cb, vgId, 0, 0, type);
|
||||||
return 0;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqStreamStartOneTaskAsync(SStreamMeta* pMeta, SMsgCb* cb, int64_t streamId, int32_t taskId) {
|
int32_t tqStreamStartOneTaskAsync(SStreamMeta* pMeta, SMsgCb* cb, int64_t streamId, int32_t taskId) {
|
||||||
|
@ -143,22 +125,8 @@ int32_t tqStreamStartOneTaskAsync(SStreamMeta* pMeta, SMsgCb* cb, int64_t stream
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq));
|
|
||||||
if (pRunReq == NULL) {
|
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
tqError("vgId:%d failed to create msg to start task:0x%x, code:%s", vgId, taskId, terrstr());
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
tqDebug("vgId:%d start task:0x%x async", vgId, taskId);
|
tqDebug("vgId:%d start task:0x%x async", vgId, taskId);
|
||||||
pRunReq->head.vgId = vgId;
|
return streamTaskSchedTask(cb, vgId, streamId, taskId, STREAM_EXEC_T_START_ONE_TASK);
|
||||||
pRunReq->streamId = streamId;
|
|
||||||
pRunReq->taskId = taskId;
|
|
||||||
pRunReq->reqType = STREAM_EXEC_T_START_ONE_TASK;
|
|
||||||
|
|
||||||
SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)};
|
|
||||||
tmsgPutToQueue(cb, STREAM_QUEUE, &msg);
|
|
||||||
return 0;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pMsg, bool restored) {
|
int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pMsg, bool restored) {
|
||||||
|
@ -259,6 +227,7 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// save
|
||||||
if (updated) {
|
if (updated) {
|
||||||
tqDebug("s-task:%s vgId:%d save task after update epset, and stop task", idstr, vgId);
|
tqDebug("s-task:%s vgId:%d save task after update epset, and stop task", idstr, vgId);
|
||||||
streamMetaSaveTask(pMeta, pTask);
|
streamMetaSaveTask(pMeta, pTask);
|
||||||
|
@ -269,22 +238,15 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM
|
||||||
tqDebug("s-task:%s vgId:%d not save task since not update epset actually, stop task", idstr, vgId);
|
tqDebug("s-task:%s vgId:%d not save task since not update epset actually, stop task", idstr, vgId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// stop
|
||||||
streamTaskStop(pTask);
|
streamTaskStop(pTask);
|
||||||
|
|
||||||
// keep the already updated info
|
|
||||||
taosHashPut(pMeta->updateInfo.pTasks, &entry, sizeof(entry), NULL, 0);
|
|
||||||
|
|
||||||
int64_t now = taosGetTimestampMs();
|
|
||||||
if (ppHTask != NULL) {
|
if (ppHTask != NULL) {
|
||||||
streamTaskStop(*ppHTask);
|
streamTaskStop(*ppHTask);
|
||||||
tqDebug("s-task:%s vgId:%d task nodeEp update completed, streamTask/fill-history closed, elapsed:%" PRId64 " ms",
|
|
||||||
idstr, vgId, now - st);
|
|
||||||
taosHashPut(pMeta->updateInfo.pTasks, &(*ppHTask)->id, sizeof(pTask->id), NULL, 0);
|
|
||||||
} else {
|
|
||||||
tqDebug("s-task:%s vgId:%d, task nodeEp update completed, streamTask closed, elapsed time:%" PRId64 "ms", idstr,
|
|
||||||
vgId, now - st);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// keep info
|
||||||
|
streamMetaAddIntoUpdateTaskList(pMeta, pTask, (ppHTask != NULL) ? (*ppHTask) : NULL, req.transId, st);
|
||||||
|
|
||||||
rsp.code = 0;
|
rsp.code = 0;
|
||||||
|
|
||||||
// possibly only handle the stream task.
|
// possibly only handle the stream task.
|
||||||
|
@ -307,10 +269,8 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM
|
||||||
} else {
|
} else {
|
||||||
tqDebug("vgId:%d all %d task(s) nodeEp updated and closed, transId:%d", vgId, numOfTasks, req.transId);
|
tqDebug("vgId:%d all %d task(s) nodeEp updated and closed, transId:%d", vgId, numOfTasks, req.transId);
|
||||||
#if 0
|
#if 0
|
||||||
// for test purpose, to trigger the leader election
|
taosMSleep(5000);// for test purpose, to trigger the leader election
|
||||||
taosMSleep(5000);
|
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
tqStreamTaskStartAsync(pMeta, cb, true);
|
tqStreamTaskStartAsync(pMeta, cb, true);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -712,26 +672,6 @@ int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqStreamTaskResetStatus(SStreamMeta* pMeta) {
|
|
||||||
int32_t vgId = pMeta->vgId;
|
|
||||||
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
|
|
||||||
|
|
||||||
tqDebug("vgId:%d reset all %d stream task(s) status to be uninit", vgId, numOfTasks);
|
|
||||||
if (numOfTasks == 0) {
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
for (int32_t i = 0; i < numOfTasks; ++i) {
|
|
||||||
SStreamTaskId* pTaskId = taosArrayGet(pMeta->pTaskList, i);
|
|
||||||
|
|
||||||
STaskId id = {.streamId = pTaskId->streamId, .taskId = pTaskId->taskId};
|
|
||||||
SStreamTask** pTask = taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
|
|
||||||
streamTaskResetStatus(*pTask);
|
|
||||||
}
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t restartStreamTasks(SStreamMeta* pMeta, bool isLeader) {
|
static int32_t restartStreamTasks(SStreamMeta* pMeta, bool isLeader) {
|
||||||
int32_t vgId = pMeta->vgId;
|
int32_t vgId = pMeta->vgId;
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
@ -781,7 +721,7 @@ static int32_t restartStreamTasks(SStreamMeta* pMeta, bool isLeader) {
|
||||||
} else {
|
} else {
|
||||||
streamMetaResetStartInfo(&pMeta->startInfo);
|
streamMetaResetStartInfo(&pMeta->startInfo);
|
||||||
streamMetaWUnLock(pMeta);
|
streamMetaWUnLock(pMeta);
|
||||||
tqInfo("vgId:%d, follower node not start stream tasks", vgId);
|
tqInfo("vgId:%d, follower node not start stream tasks or stream is disabled", vgId);
|
||||||
}
|
}
|
||||||
|
|
||||||
code = terrno;
|
code = terrno;
|
||||||
|
|
|
@ -1,308 +0,0 @@
|
||||||
/*
|
|
||||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
|
||||||
*
|
|
||||||
* This program is free software: you can use, redistribute, and/or modify
|
|
||||||
* it under the terms of the GNU Affero General Public License, version 3
|
|
||||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
|
||||||
*
|
|
||||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
|
||||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
|
||||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
|
||||||
*
|
|
||||||
* You should have received a copy of the GNU Affero General Public License
|
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
||||||
*/
|
|
||||||
|
|
||||||
#include "streamInt.h"
|
|
||||||
#include "ttimer.h"
|
|
||||||
|
|
||||||
void* streamTimer = NULL;
|
|
||||||
|
|
||||||
int32_t streamTimerInit() {
|
|
||||||
streamTimer = taosTmrInit(1000, 100, 10000, "STREAM");
|
|
||||||
if (streamTimer == NULL) {
|
|
||||||
stError("init stream timer failed, code:%s", tstrerror(terrno));
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
stInfo("init stream timer, %p", streamTimer);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
void streamTimerCleanUp() {
|
|
||||||
stInfo("cleanup stream timer, %p", streamTimer);
|
|
||||||
taosTmrCleanUp(streamTimer);
|
|
||||||
streamTimer = NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
tmr_h streamTimerGetInstance() {
|
|
||||||
return streamTimer;
|
|
||||||
}
|
|
||||||
|
|
||||||
char* createStreamTaskIdStr(int64_t streamId, int32_t taskId) {
|
|
||||||
char buf[128] = {0};
|
|
||||||
sprintf(buf, "0x%" PRIx64 "-0x%x", streamId, taskId);
|
|
||||||
return taosStrdup(buf);
|
|
||||||
}
|
|
||||||
|
|
||||||
static void streamSchedByTimer(void* param, void* tmrId) {
|
|
||||||
SStreamTask* pTask = (void*)param;
|
|
||||||
const char* id = pTask->id.idStr;
|
|
||||||
int32_t nextTrigger = (int32_t)pTask->info.triggerParam;
|
|
||||||
|
|
||||||
int8_t status = atomic_load_8(&pTask->schedInfo.status);
|
|
||||||
stTrace("s-task:%s in scheduler, trigger status:%d, next:%dms", id, status, nextTrigger);
|
|
||||||
|
|
||||||
if (streamTaskShouldStop(pTask) || streamTaskShouldPause(pTask)) {
|
|
||||||
stDebug("s-task:%s jump out of schedTimer", id);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (streamTaskGetStatus(pTask)->state == TASK_STATUS__CK) {
|
|
||||||
stDebug("s-task:%s in checkpoint procedure, not retrieve result, next:%dms", id, nextTrigger);
|
|
||||||
} else {
|
|
||||||
if (status == TASK_TRIGGER_STATUS__ACTIVE) {
|
|
||||||
SStreamTrigger* pTrigger = taosAllocateQitem(sizeof(SStreamTrigger), DEF_QITEM, 0);
|
|
||||||
if (pTrigger == NULL) {
|
|
||||||
stError("s-task:%s failed to prepare retrieve data trigger, code:%s, try again in %dms", id, "out of memory",
|
|
||||||
nextTrigger);
|
|
||||||
taosTmrReset(streamSchedByTimer, nextTrigger, pTask, streamTimer, &pTask->schedInfo.pDelayTimer);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
pTrigger->type = STREAM_INPUT__GET_RES;
|
|
||||||
pTrigger->pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
|
|
||||||
if (pTrigger->pBlock == NULL) {
|
|
||||||
taosFreeQitem(pTrigger);
|
|
||||||
|
|
||||||
stError("s-task:%s failed to prepare retrieve data trigger, code:%s, try again in %dms", id, "out of memory",
|
|
||||||
nextTrigger);
|
|
||||||
taosTmrReset(streamSchedByTimer, nextTrigger, pTask, streamTimer, &pTask->schedInfo.pDelayTimer);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
atomic_store_8(&pTask->schedInfo.status, TASK_TRIGGER_STATUS__INACTIVE);
|
|
||||||
pTrigger->pBlock->info.type = STREAM_GET_ALL;
|
|
||||||
|
|
||||||
int32_t code = streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pTrigger);
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
|
||||||
taosTmrReset(streamSchedByTimer, nextTrigger, pTask, streamTimer, &pTask->schedInfo.pDelayTimer);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
streamSchedExec(pTask);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
taosTmrReset(streamSchedByTimer, nextTrigger, pTask, streamTimer, &pTask->schedInfo.pDelayTimer);
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t streamSetupScheduleTrigger(SStreamTask* pTask) {
|
|
||||||
if (pTask->info.triggerParam != 0 && pTask->info.fillHistory == 0) {
|
|
||||||
int32_t ref = atomic_add_fetch_32(&pTask->refCnt, 1);
|
|
||||||
ASSERT(ref == 2 && pTask->schedInfo.pDelayTimer == NULL);
|
|
||||||
|
|
||||||
stDebug("s-task:%s setup scheduler trigger, delay:%" PRId64 " ms", pTask->id.idStr, pTask->info.triggerParam);
|
|
||||||
|
|
||||||
pTask->schedInfo.pDelayTimer = taosTmrStart(streamSchedByTimer, (int32_t)pTask->info.triggerParam, pTask, streamTimer);
|
|
||||||
pTask->schedInfo.status = TASK_TRIGGER_STATUS__INACTIVE;
|
|
||||||
}
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t streamSchedExec(SStreamTask* pTask) {
|
|
||||||
if (streamTaskSetSchedStatusWait(pTask)) {
|
|
||||||
SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq));
|
|
||||||
if (pRunReq == NULL) {
|
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
/*int8_t status = */streamTaskSetSchedStatusInactive(pTask);
|
|
||||||
stError("failed to create msg to aunch s-task:%s, reason out of memory", pTask->id.idStr);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
pRunReq->head.vgId = pTask->info.nodeId;
|
|
||||||
pRunReq->streamId = pTask->id.streamId;
|
|
||||||
pRunReq->taskId = pTask->id.taskId;
|
|
||||||
|
|
||||||
stDebug("trigger to run s-task:%s", pTask->id.idStr);
|
|
||||||
|
|
||||||
SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)};
|
|
||||||
tmsgPutToQueue(pTask->pMsgCb, STREAM_QUEUE, &msg);
|
|
||||||
} else {
|
|
||||||
stTrace("s-task:%s not launch task since sched status:%d", pTask->id.idStr, pTask->status.schedStatus);
|
|
||||||
}
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t buildDispatchRsp(const SStreamTask* pTask, const SStreamDispatchReq* pReq, int32_t status, void** pBuf) {
|
|
||||||
*pBuf = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamDispatchRsp));
|
|
||||||
if (*pBuf == NULL) {
|
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
}
|
|
||||||
|
|
||||||
((SMsgHead*)(*pBuf))->vgId = htonl(pReq->upstreamNodeId);
|
|
||||||
ASSERT(((SMsgHead*)(*pBuf))->vgId != 0);
|
|
||||||
|
|
||||||
SStreamDispatchRsp* pDispatchRsp = POINTER_SHIFT((*pBuf), sizeof(SMsgHead));
|
|
||||||
|
|
||||||
pDispatchRsp->stage = htobe64(pReq->stage);
|
|
||||||
pDispatchRsp->msgId = htonl(pReq->msgId);
|
|
||||||
pDispatchRsp->inputStatus = status;
|
|
||||||
pDispatchRsp->streamId = htobe64(pReq->streamId);
|
|
||||||
pDispatchRsp->upstreamNodeId = htonl(pReq->upstreamNodeId);
|
|
||||||
pDispatchRsp->upstreamTaskId = htonl(pReq->upstreamTaskId);
|
|
||||||
pDispatchRsp->downstreamNodeId = htonl(pTask->info.nodeId);
|
|
||||||
pDispatchRsp->downstreamTaskId = htonl(pTask->id.taskId);
|
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t streamTaskAppendInputBlocks(SStreamTask* pTask, const SStreamDispatchReq* pReq) {
|
|
||||||
int8_t status = 0;
|
|
||||||
|
|
||||||
SStreamDataBlock* pBlock = createStreamBlockFromDispatchMsg(pReq, pReq->type, pReq->srcVgId);
|
|
||||||
if (pBlock == NULL) {
|
|
||||||
streamTaskInputFail(pTask);
|
|
||||||
status = TASK_INPUT_STATUS__FAILED;
|
|
||||||
stError("vgId:%d, s-task:%s failed to receive dispatch msg, reason: out of memory", pTask->pMeta->vgId,
|
|
||||||
pTask->id.idStr);
|
|
||||||
} else {
|
|
||||||
if (pBlock->type == STREAM_INPUT__TRANS_STATE) {
|
|
||||||
pTask->status.appendTranstateBlock = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t code = streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pBlock);
|
|
||||||
// input queue is full, upstream is blocked now
|
|
||||||
status = (code == TSDB_CODE_SUCCESS) ? TASK_INPUT_STATUS__NORMAL : TASK_INPUT_STATUS__BLOCKED;
|
|
||||||
}
|
|
||||||
|
|
||||||
return status;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pReq) {
|
|
||||||
SStreamDataBlock* pData = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, sizeof(SStreamDataBlock));
|
|
||||||
int8_t status = TASK_INPUT_STATUS__NORMAL;
|
|
||||||
|
|
||||||
// enqueue
|
|
||||||
if (pData != NULL) {
|
|
||||||
stDebug("s-task:%s (child %d) recv retrieve req from task:0x%x(vgId:%d), reqId:0x%" PRIx64, pTask->id.idStr,
|
|
||||||
pTask->info.selfChildId, pReq->srcTaskId, pReq->srcNodeId, pReq->reqId);
|
|
||||||
|
|
||||||
pData->type = STREAM_INPUT__DATA_RETRIEVE;
|
|
||||||
pData->srcVgId = 0;
|
|
||||||
streamRetrieveReqToData(pReq, pData);
|
|
||||||
if (streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pData) == 0) {
|
|
||||||
status = TASK_INPUT_STATUS__NORMAL;
|
|
||||||
} else {
|
|
||||||
status = TASK_INPUT_STATUS__FAILED;
|
|
||||||
}
|
|
||||||
} else { // todo handle oom
|
|
||||||
/*streamTaskInputFail(pTask);*/
|
|
||||||
/*status = TASK_INPUT_STATUS__FAILED;*/
|
|
||||||
}
|
|
||||||
|
|
||||||
return status == TASK_INPUT_STATUS__NORMAL ? 0 : -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pRsp) {
|
|
||||||
int32_t status = 0;
|
|
||||||
SStreamMeta* pMeta = pTask->pMeta;
|
|
||||||
const char* id = pTask->id.idStr;
|
|
||||||
|
|
||||||
stDebug("s-task:%s receive dispatch msg from taskId:0x%x(vgId:%d), msgLen:%" PRId64 ", msgId:%d", id,
|
|
||||||
pReq->upstreamTaskId, pReq->upstreamNodeId, pReq->totalLen, pReq->msgId);
|
|
||||||
|
|
||||||
SStreamChildEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, pReq->upstreamTaskId);
|
|
||||||
ASSERT(pInfo != NULL);
|
|
||||||
|
|
||||||
if (pMeta->role == NODE_ROLE_FOLLOWER) {
|
|
||||||
stError("s-task:%s task on follower received dispatch msgs, dispatch msg rejected", id);
|
|
||||||
status = TASK_INPUT_STATUS__REFUSED;
|
|
||||||
} else {
|
|
||||||
if (pReq->stage > pInfo->stage) {
|
|
||||||
// upstream task has restarted/leader-follower switch/transferred to other dnodes
|
|
||||||
stError("s-task:%s upstream task:0x%x (vgId:%d) has restart/leader-switch/vnode-transfer, prev stage:%" PRId64
|
|
||||||
", current:%" PRId64 " dispatch msg rejected",
|
|
||||||
id, pReq->upstreamTaskId, pReq->upstreamNodeId, pInfo->stage, pReq->stage);
|
|
||||||
status = TASK_INPUT_STATUS__REFUSED;
|
|
||||||
} else {
|
|
||||||
if (!pInfo->dataAllowed) {
|
|
||||||
stWarn("s-task:%s data from task:0x%x is denied, since inputQ is closed for it", id, pReq->upstreamTaskId);
|
|
||||||
status = TASK_INPUT_STATUS__BLOCKED;
|
|
||||||
} else {
|
|
||||||
// This task has received the checkpoint req from the upstream task, from which all the messages should be
|
|
||||||
// blocked. Note that there is no race condition here.
|
|
||||||
if (pReq->type == STREAM_INPUT__CHECKPOINT_TRIGGER) {
|
|
||||||
atomic_add_fetch_32(&pTask->upstreamInfo.numOfClosed, 1);
|
|
||||||
streamTaskCloseUpstreamInput(pTask, pReq->upstreamTaskId);
|
|
||||||
stDebug("s-task:%s close inputQ for upstream:0x%x, msgId:%d", id, pReq->upstreamTaskId, pReq->msgId);
|
|
||||||
} else if (pReq->type == STREAM_INPUT__TRANS_STATE) {
|
|
||||||
atomic_add_fetch_32(&pTask->upstreamInfo.numOfClosed, 1);
|
|
||||||
streamTaskCloseUpstreamInput(pTask, pReq->upstreamTaskId);
|
|
||||||
|
|
||||||
// disable the related stream task here to avoid it to receive the newly arrived data after the transfer-state
|
|
||||||
STaskId* pRelTaskId = &pTask->streamTaskId;
|
|
||||||
SStreamTask* pStreamTask = streamMetaAcquireTask(pMeta, pRelTaskId->streamId, pRelTaskId->taskId);
|
|
||||||
if (pStreamTask != NULL) {
|
|
||||||
atomic_add_fetch_32(&pStreamTask->upstreamInfo.numOfClosed, 1);
|
|
||||||
streamTaskCloseUpstreamInput(pStreamTask, pReq->upstreamRelTaskId);
|
|
||||||
streamMetaReleaseTask(pMeta, pStreamTask);
|
|
||||||
}
|
|
||||||
|
|
||||||
stDebug("s-task:%s close inputQ for upstream:0x%x since trans-state msgId:%d recv, rel stream-task:0x%" PRIx64
|
|
||||||
" close inputQ for upstream:0x%x",
|
|
||||||
id, pReq->upstreamTaskId, pReq->msgId, pTask->streamTaskId.taskId, pReq->upstreamRelTaskId);
|
|
||||||
}
|
|
||||||
|
|
||||||
status = streamTaskAppendInputBlocks(pTask, pReq);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// disable the data from upstream tasks
|
|
||||||
// if (streamTaskGetStatus(pTask)->state == TASK_STATUS__HALT) {
|
|
||||||
// status = TASK_INPUT_STATUS__BLOCKED;
|
|
||||||
// }
|
|
||||||
|
|
||||||
{
|
|
||||||
// do send response with the input status
|
|
||||||
int32_t code = buildDispatchRsp(pTask, pReq, status, &pRsp->pCont);
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
|
||||||
stError("s-task:%s failed to build dispatch rsp, msgId:%d, code:%s", id, pReq->msgId, tstrerror(code));
|
|
||||||
terrno = code;
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
pRsp->contLen = sizeof(SMsgHead) + sizeof(SStreamDispatchRsp);
|
|
||||||
tmsgSendRsp(pRsp);
|
|
||||||
}
|
|
||||||
|
|
||||||
streamSchedExec(pTask);
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq) {
|
|
||||||
int32_t code = streamTaskEnqueueRetrieve(pTask, pReq);
|
|
||||||
if(code != 0){
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
return streamSchedExec(pTask);
|
|
||||||
}
|
|
||||||
|
|
||||||
void streamTaskInputFail(SStreamTask* pTask) { atomic_store_8(&pTask->inputq.status, TASK_INPUT_STATUS__FAILED); }
|
|
||||||
|
|
||||||
SStreamChildEpInfo* streamTaskGetUpstreamTaskEpInfo(SStreamTask* pTask, int32_t taskId) {
|
|
||||||
int32_t num = taosArrayGetSize(pTask->upstreamInfo.pList);
|
|
||||||
for (int32_t i = 0; i < num; ++i) {
|
|
||||||
SStreamChildEpInfo* pInfo = taosArrayGetP(pTask->upstreamInfo.pList, i);
|
|
||||||
if (pInfo->taskId == taskId) {
|
|
||||||
return pInfo;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
stError("s-task:%s failed to find upstream task:0x%x", pTask->id.idStr, taskId);
|
|
||||||
return NULL;
|
|
||||||
}
|
|
|
@ -342,23 +342,23 @@ int32_t streamTaskCompleteCheckRsp(STaskCheckInfo* pInfo, bool lock, const char*
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!pInfo->inCheckProcess) {
|
if (!pInfo->inCheckProcess) {
|
||||||
// stWarn("s-task:%s already not in-check-procedure", id);
|
int64_t el = (pInfo->startTs != 0) ? (taosGetTimestampMs() - pInfo->startTs) : 0;
|
||||||
|
stDebug("s-task:%s clear the in check-rsp flag, set the check-rsp done, elapsed time:%" PRId64 " ms", id, el);
|
||||||
|
|
||||||
|
pInfo->startTs = 0;
|
||||||
|
pInfo->timeoutStartTs = 0;
|
||||||
|
pInfo->notReadyTasks = 0;
|
||||||
|
pInfo->inCheckProcess = 0;
|
||||||
|
pInfo->stopCheckProcess = 0;
|
||||||
|
|
||||||
|
pInfo->notReadyRetryCount = 0;
|
||||||
|
pInfo->timeoutRetryCount = 0;
|
||||||
|
|
||||||
|
taosArrayClear(pInfo->pList);
|
||||||
|
} else {
|
||||||
|
stDebug("s-task:%s already not in check-rsp procedure", id);
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t el = (pInfo->startTs != 0) ? (taosGetTimestampMs() - pInfo->startTs) : 0;
|
|
||||||
stDebug("s-task:%s clear the in check-rsp flag, not in check-rsp anymore, elapsed time:%" PRId64 " ms", id, el);
|
|
||||||
|
|
||||||
pInfo->startTs = 0;
|
|
||||||
pInfo->timeoutStartTs = 0;
|
|
||||||
pInfo->notReadyTasks = 0;
|
|
||||||
pInfo->inCheckProcess = 0;
|
|
||||||
pInfo->stopCheckProcess = 0;
|
|
||||||
|
|
||||||
pInfo->notReadyRetryCount = 0;
|
|
||||||
pInfo->timeoutRetryCount = 0;
|
|
||||||
|
|
||||||
taosArrayClear(pInfo->pList);
|
|
||||||
|
|
||||||
if (lock) {
|
if (lock) {
|
||||||
taosThreadMutexUnlock(&pInfo->checkInfoLock);
|
taosThreadMutexUnlock(&pInfo->checkInfoLock);
|
||||||
}
|
}
|
||||||
|
@ -527,23 +527,7 @@ void handleNotReadyDownstreamTask(SStreamTask* pTask, SArray* pNotReadyList) {
|
||||||
// The restart of all tasks requires that all tasks should not have active timer for now. Therefore, the execution
|
// The restart of all tasks requires that all tasks should not have active timer for now. Therefore, the execution
|
||||||
// of restart in timer thread will result in a dead lock.
|
// of restart in timer thread will result in a dead lock.
|
||||||
int32_t addDownstreamFailedStatusResultAsync(SMsgCb* pMsgCb, int32_t vgId, int64_t streamId, int32_t taskId) {
|
int32_t addDownstreamFailedStatusResultAsync(SMsgCb* pMsgCb, int32_t vgId, int64_t streamId, int32_t taskId) {
|
||||||
SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq));
|
return streamTaskSchedTask(pMsgCb, vgId, streamId, taskId, STREAM_EXEC_T_ADD_FAILED_TASK);
|
||||||
if (pRunReq == NULL) {
|
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
stError("vgId:%d failed to create msg to stop tasks async, code:%s", vgId, terrstr());
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
stDebug("vgId:%d create msg add failed s-task:0x%x", vgId, taskId);
|
|
||||||
|
|
||||||
pRunReq->head.vgId = vgId;
|
|
||||||
pRunReq->streamId = streamId;
|
|
||||||
pRunReq->taskId = taskId;
|
|
||||||
pRunReq->reqType = STREAM_EXEC_T_ADD_FAILED_TASK;
|
|
||||||
|
|
||||||
SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)};
|
|
||||||
tmsgPutToQueue(pMsgCb, STREAM_QUEUE, &msg);
|
|
||||||
return 0;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// this function is executed in timer thread
|
// this function is executed in timer thread
|
||||||
|
|
|
@ -30,8 +30,8 @@ typedef struct {
|
||||||
|
|
||||||
static int32_t downloadCheckpointDataByName(const char* id, const char* fname, const char* dstName);
|
static int32_t downloadCheckpointDataByName(const char* id, const char* fname, const char* dstName);
|
||||||
static int32_t deleteCheckpointFile(const char* id, const char* name);
|
static int32_t deleteCheckpointFile(const char* id, const char* name);
|
||||||
static int32_t streamTaskBackupCheckpoint(char* id, char* path);
|
static int32_t streamTaskBackupCheckpoint(const char* id, const char* path);
|
||||||
static int32_t deleteCheckpoint(char* id);
|
static int32_t deleteCheckpoint(const char* id);
|
||||||
|
|
||||||
int32_t tEncodeStreamCheckpointSourceReq(SEncoder* pEncoder, const SStreamCheckpointSourceReq* pReq) {
|
int32_t tEncodeStreamCheckpointSourceReq(SEncoder* pEncoder, const SStreamCheckpointSourceReq* pReq) {
|
||||||
if (tStartEncode(pEncoder) < 0) return -1;
|
if (tStartEncode(pEncoder) < 0) return -1;
|
||||||
|
@ -578,7 +578,7 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t uploadCheckpointToS3(char* id, char* path) {
|
static int32_t uploadCheckpointToS3(const char* id, const char* path) {
|
||||||
TdDirPtr pDir = taosOpenDir(path);
|
TdDirPtr pDir = taosOpenDir(path);
|
||||||
if (pDir == NULL) return -1;
|
if (pDir == NULL) return -1;
|
||||||
|
|
||||||
|
@ -631,7 +631,7 @@ ECHECKPOINT_BACKUP_TYPE streamGetCheckpointBackupType() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamTaskBackupCheckpoint(char* id, char* path) {
|
int32_t streamTaskBackupCheckpoint(const char* id, const char* path) {
|
||||||
if (id == NULL || path == NULL || strlen(id) == 0 || strlen(path) == 0 || strlen(path) >= PATH_MAX) {
|
if (id == NULL || path == NULL || strlen(id) == 0 || strlen(path) == 0 || strlen(path) >= PATH_MAX) {
|
||||||
stError("streamTaskBackupCheckpoint parameters invalid");
|
stError("streamTaskBackupCheckpoint parameters invalid");
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -675,7 +675,7 @@ int32_t streamTaskDownloadCheckpointData(char* id, char* path) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t deleteCheckpoint(char* id) {
|
int32_t deleteCheckpoint(const char* id) {
|
||||||
if (id == NULL || strlen(id) == 0) {
|
if (id == NULL || strlen(id) == 0) {
|
||||||
stError("deleteCheckpoint parameters invalid");
|
stError("deleteCheckpoint parameters invalid");
|
||||||
return -1;
|
return -1;
|
||||||
|
|
|
@ -1137,6 +1137,129 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t buildDispatchRsp(const SStreamTask* pTask, const SStreamDispatchReq* pReq, int32_t status, void** pBuf) {
|
||||||
|
*pBuf = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamDispatchRsp));
|
||||||
|
if (*pBuf == NULL) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
|
||||||
|
((SMsgHead*)(*pBuf))->vgId = htonl(pReq->upstreamNodeId);
|
||||||
|
ASSERT(((SMsgHead*)(*pBuf))->vgId != 0);
|
||||||
|
|
||||||
|
SStreamDispatchRsp* pDispatchRsp = POINTER_SHIFT((*pBuf), sizeof(SMsgHead));
|
||||||
|
|
||||||
|
pDispatchRsp->stage = htobe64(pReq->stage);
|
||||||
|
pDispatchRsp->msgId = htonl(pReq->msgId);
|
||||||
|
pDispatchRsp->inputStatus = status;
|
||||||
|
pDispatchRsp->streamId = htobe64(pReq->streamId);
|
||||||
|
pDispatchRsp->upstreamNodeId = htonl(pReq->upstreamNodeId);
|
||||||
|
pDispatchRsp->upstreamTaskId = htonl(pReq->upstreamTaskId);
|
||||||
|
pDispatchRsp->downstreamNodeId = htonl(pTask->info.nodeId);
|
||||||
|
pDispatchRsp->downstreamTaskId = htonl(pTask->id.taskId);
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t streamTaskAppendInputBlocks(SStreamTask* pTask, const SStreamDispatchReq* pReq) {
|
||||||
|
int8_t status = 0;
|
||||||
|
|
||||||
|
SStreamDataBlock* pBlock = createStreamBlockFromDispatchMsg(pReq, pReq->type, pReq->srcVgId);
|
||||||
|
if (pBlock == NULL) {
|
||||||
|
streamTaskInputFail(pTask);
|
||||||
|
status = TASK_INPUT_STATUS__FAILED;
|
||||||
|
stError("vgId:%d, s-task:%s failed to receive dispatch msg, reason: out of memory", pTask->pMeta->vgId,
|
||||||
|
pTask->id.idStr);
|
||||||
|
} else {
|
||||||
|
if (pBlock->type == STREAM_INPUT__TRANS_STATE) {
|
||||||
|
pTask->status.appendTranstateBlock = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t code = streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pBlock);
|
||||||
|
// input queue is full, upstream is blocked now
|
||||||
|
status = (code == TSDB_CODE_SUCCESS) ? TASK_INPUT_STATUS__NORMAL : TASK_INPUT_STATUS__BLOCKED;
|
||||||
|
}
|
||||||
|
|
||||||
|
return status;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pRsp) {
|
||||||
|
int32_t status = 0;
|
||||||
|
SStreamMeta* pMeta = pTask->pMeta;
|
||||||
|
const char* id = pTask->id.idStr;
|
||||||
|
|
||||||
|
stDebug("s-task:%s receive dispatch msg from taskId:0x%x(vgId:%d), msgLen:%" PRId64 ", msgId:%d", id,
|
||||||
|
pReq->upstreamTaskId, pReq->upstreamNodeId, pReq->totalLen, pReq->msgId);
|
||||||
|
|
||||||
|
SStreamChildEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, pReq->upstreamTaskId);
|
||||||
|
ASSERT(pInfo != NULL);
|
||||||
|
|
||||||
|
if (pMeta->role == NODE_ROLE_FOLLOWER) {
|
||||||
|
stError("s-task:%s task on follower received dispatch msgs, dispatch msg rejected", id);
|
||||||
|
status = TASK_INPUT_STATUS__REFUSED;
|
||||||
|
} else {
|
||||||
|
if (pReq->stage > pInfo->stage) {
|
||||||
|
// upstream task has restarted/leader-follower switch/transferred to other dnodes
|
||||||
|
stError("s-task:%s upstream task:0x%x (vgId:%d) has restart/leader-switch/vnode-transfer, prev stage:%" PRId64
|
||||||
|
", current:%" PRId64 " dispatch msg rejected",
|
||||||
|
id, pReq->upstreamTaskId, pReq->upstreamNodeId, pInfo->stage, pReq->stage);
|
||||||
|
status = TASK_INPUT_STATUS__REFUSED;
|
||||||
|
} else {
|
||||||
|
if (!pInfo->dataAllowed) {
|
||||||
|
stWarn("s-task:%s data from task:0x%x is denied, since inputQ is closed for it", id, pReq->upstreamTaskId);
|
||||||
|
status = TASK_INPUT_STATUS__BLOCKED;
|
||||||
|
} else {
|
||||||
|
// This task has received the checkpoint req from the upstream task, from which all the messages should be
|
||||||
|
// blocked. Note that there is no race condition here.
|
||||||
|
if (pReq->type == STREAM_INPUT__CHECKPOINT_TRIGGER) {
|
||||||
|
atomic_add_fetch_32(&pTask->upstreamInfo.numOfClosed, 1);
|
||||||
|
streamTaskCloseUpstreamInput(pTask, pReq->upstreamTaskId);
|
||||||
|
stDebug("s-task:%s close inputQ for upstream:0x%x, msgId:%d", id, pReq->upstreamTaskId, pReq->msgId);
|
||||||
|
} else if (pReq->type == STREAM_INPUT__TRANS_STATE) {
|
||||||
|
atomic_add_fetch_32(&pTask->upstreamInfo.numOfClosed, 1);
|
||||||
|
streamTaskCloseUpstreamInput(pTask, pReq->upstreamTaskId);
|
||||||
|
|
||||||
|
// disable the related stream task here to avoid it to receive the newly arrived data after the transfer-state
|
||||||
|
STaskId* pRelTaskId = &pTask->streamTaskId;
|
||||||
|
SStreamTask* pStreamTask = streamMetaAcquireTask(pMeta, pRelTaskId->streamId, pRelTaskId->taskId);
|
||||||
|
if (pStreamTask != NULL) {
|
||||||
|
atomic_add_fetch_32(&pStreamTask->upstreamInfo.numOfClosed, 1);
|
||||||
|
streamTaskCloseUpstreamInput(pStreamTask, pReq->upstreamRelTaskId);
|
||||||
|
streamMetaReleaseTask(pMeta, pStreamTask);
|
||||||
|
}
|
||||||
|
|
||||||
|
stDebug("s-task:%s close inputQ for upstream:0x%x since trans-state msgId:%d recv, rel stream-task:0x%" PRIx64
|
||||||
|
" close inputQ for upstream:0x%x",
|
||||||
|
id, pReq->upstreamTaskId, pReq->msgId, pTask->streamTaskId.taskId, pReq->upstreamRelTaskId);
|
||||||
|
}
|
||||||
|
|
||||||
|
status = streamTaskAppendInputBlocks(pTask, pReq);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// disable the data from upstream tasks
|
||||||
|
// if (streamTaskGetStatus(pTask)->state == TASK_STATUS__HALT) {
|
||||||
|
// status = TASK_INPUT_STATUS__BLOCKED;
|
||||||
|
// }
|
||||||
|
|
||||||
|
{
|
||||||
|
// do send response with the input status
|
||||||
|
int32_t code = buildDispatchRsp(pTask, pReq, status, &pRsp->pCont);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
stError("s-task:%s failed to build dispatch rsp, msgId:%d, code:%s", id, pReq->msgId, tstrerror(code));
|
||||||
|
terrno = code;
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
pRsp->contLen = sizeof(SMsgHead) + sizeof(SStreamDispatchRsp);
|
||||||
|
tmsgSendRsp(pRsp);
|
||||||
|
}
|
||||||
|
|
||||||
|
streamSchedExec(pTask);
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t tEncodeStreamTaskUpdateMsg(SEncoder* pEncoder, const SStreamTaskNodeUpdateMsg* pMsg) {
|
int32_t tEncodeStreamTaskUpdateMsg(SEncoder* pEncoder, const SStreamTaskNodeUpdateMsg* pMsg) {
|
||||||
if (tStartEncode(pEncoder) < 0) return -1;
|
if (tStartEncode(pEncoder) < 0) return -1;
|
||||||
if (tEncodeI64(pEncoder, pMsg->streamId) < 0) return -1;
|
if (tEncodeI64(pEncoder, pMsg->streamId) < 0) return -1;
|
||||||
|
|
|
@ -1216,7 +1216,7 @@ void metaHbToMnode(void* param, void* tmrId) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// need to stop, stop now
|
// need to stop, stop now
|
||||||
if (pMeta->pHbInfo->stopFlag == STREAM_META_WILL_STOP) {
|
if (pMeta->pHbInfo->stopFlag == STREAM_META_WILL_STOP) { // todo refactor: not need this now, use closeFlag in Meta
|
||||||
pMeta->pHbInfo->stopFlag = STREAM_META_OK_TO_STOP;
|
pMeta->pHbInfo->stopFlag = STREAM_META_OK_TO_STOP;
|
||||||
stDebug("vgId:%d jump out of meta timer", pMeta->vgId);
|
stDebug("vgId:%d jump out of meta timer", pMeta->vgId);
|
||||||
taosReleaseRef(streamMetaId, rid);
|
taosReleaseRef(streamMetaId, rid);
|
||||||
|
@ -1307,7 +1307,7 @@ void streamMetaNotifyClose(SStreamMeta* pMeta) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
stDebug("vgId:%d start to check all tasks", vgId);
|
stDebug("vgId:%d start to check all tasks for closing", vgId);
|
||||||
int64_t st = taosGetTimestampMs();
|
int64_t st = taosGetTimestampMs();
|
||||||
|
|
||||||
while (streamMetaTaskInTimer(pMeta)) {
|
while (streamMetaTaskInTimer(pMeta)) {
|
||||||
|
@ -1757,4 +1757,26 @@ int32_t streamMetaAddFailedTask(SStreamMeta* pMeta, int64_t streamId, int32_t ta
|
||||||
}
|
}
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
void streamMetaAddIntoUpdateTaskList(SStreamMeta* pMeta, SStreamTask* pTask, SStreamTask* pHTask, int32_t transId,
|
||||||
|
int64_t startTs) {
|
||||||
|
const char* id = pTask->id.idStr;
|
||||||
|
int32_t vgId = pTask->pMeta->vgId;
|
||||||
|
|
||||||
|
// keep the already updated info
|
||||||
|
STaskUpdateEntry entry = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId, .transId = transId};
|
||||||
|
taosHashPut(pMeta->updateInfo.pTasks, &entry, sizeof(entry), NULL, 0);
|
||||||
|
|
||||||
|
int64_t el = taosGetTimestampMs() - startTs;
|
||||||
|
if (pHTask != NULL) {
|
||||||
|
STaskUpdateEntry hEntry = {.streamId = pHTask->id.streamId, .taskId = pHTask->id.taskId, .transId = transId};
|
||||||
|
taosHashPut(pMeta->updateInfo.pTasks, &hEntry, sizeof(hEntry), NULL, 0);
|
||||||
|
|
||||||
|
stDebug("s-task:%s vgId:%d transId:%d task nodeEp update completed, streamTask/hTask closed, elapsed:%" PRId64
|
||||||
|
" ms", id, vgId, transId, el);
|
||||||
|
} else {
|
||||||
|
stDebug("s-task:%s vgId:%d transId:%d task nodeEp update completed, streamTask closed, elapsed time:%" PRId64 "ms",
|
||||||
|
id, vgId, transId, el);
|
||||||
|
}
|
||||||
}
|
}
|
|
@ -424,4 +424,6 @@ void streamTaskPutbackToken(STokenBucket* pBucket) {
|
||||||
// size in KB
|
// size in KB
|
||||||
void streamTaskConsumeQuota(STokenBucket* pBucket, int32_t bytes) {
|
void streamTaskConsumeQuota(STokenBucket* pBucket, int32_t bytes) {
|
||||||
pBucket->quotaRemain -= SIZE_IN_MiB(bytes);
|
pBucket->quotaRemain -= SIZE_IN_MiB(bytes);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void streamTaskInputFail(SStreamTask* pTask) { atomic_store_8(&pTask->inputq.status, TASK_INPUT_STATUS__FAILED); }
|
|
@ -0,0 +1,129 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "streamInt.h"
|
||||||
|
#include "ttimer.h"
|
||||||
|
|
||||||
|
static void streamSchedByTimer(void* param, void* tmrId) {
|
||||||
|
SStreamTask* pTask = (void*)param;
|
||||||
|
const char* id = pTask->id.idStr;
|
||||||
|
int32_t nextTrigger = (int32_t)pTask->info.triggerParam;
|
||||||
|
|
||||||
|
int8_t status = atomic_load_8(&pTask->schedInfo.status);
|
||||||
|
stTrace("s-task:%s in scheduler, trigger status:%d, next:%dms", id, status, nextTrigger);
|
||||||
|
|
||||||
|
if (streamTaskShouldStop(pTask) || streamTaskShouldPause(pTask)) {
|
||||||
|
stDebug("s-task:%s jump out of schedTimer", id);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (streamTaskGetStatus(pTask)->state == TASK_STATUS__CK) {
|
||||||
|
stDebug("s-task:%s in checkpoint procedure, not retrieve result, next:%dms", id, nextTrigger);
|
||||||
|
} else {
|
||||||
|
if (status == TASK_TRIGGER_STATUS__ACTIVE) {
|
||||||
|
SStreamTrigger* pTrigger = taosAllocateQitem(sizeof(SStreamTrigger), DEF_QITEM, 0);
|
||||||
|
if (pTrigger == NULL) {
|
||||||
|
stError("s-task:%s failed to prepare retrieve data trigger, code:%s, try again in %dms", id, "out of memory",
|
||||||
|
nextTrigger);
|
||||||
|
taosTmrReset(streamSchedByTimer, nextTrigger, pTask, streamTimer, &pTask->schedInfo.pDelayTimer);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
pTrigger->type = STREAM_INPUT__GET_RES;
|
||||||
|
pTrigger->pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
|
||||||
|
if (pTrigger->pBlock == NULL) {
|
||||||
|
taosFreeQitem(pTrigger);
|
||||||
|
|
||||||
|
stError("s-task:%s failed to prepare retrieve data trigger, code:%s, try again in %dms", id, "out of memory",
|
||||||
|
nextTrigger);
|
||||||
|
taosTmrReset(streamSchedByTimer, nextTrigger, pTask, streamTimer, &pTask->schedInfo.pDelayTimer);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
atomic_store_8(&pTask->schedInfo.status, TASK_TRIGGER_STATUS__INACTIVE);
|
||||||
|
pTrigger->pBlock->info.type = STREAM_GET_ALL;
|
||||||
|
|
||||||
|
int32_t code = streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pTrigger);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
taosTmrReset(streamSchedByTimer, nextTrigger, pTask, streamTimer, &pTask->schedInfo.pDelayTimer);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
streamSchedExec(pTask);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
taosTmrReset(streamSchedByTimer, nextTrigger, pTask, streamTimer, &pTask->schedInfo.pDelayTimer);
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t streamSetupScheduleTrigger(SStreamTask* pTask) {
|
||||||
|
if (pTask->info.triggerParam != 0 && pTask->info.fillHistory == 0) {
|
||||||
|
int32_t ref = atomic_add_fetch_32(&pTask->refCnt, 1);
|
||||||
|
ASSERT(ref == 2 && pTask->schedInfo.pDelayTimer == NULL);
|
||||||
|
|
||||||
|
stDebug("s-task:%s setup scheduler trigger, delay:%" PRId64 " ms", pTask->id.idStr, pTask->info.triggerParam);
|
||||||
|
|
||||||
|
pTask->schedInfo.pDelayTimer = taosTmrStart(streamSchedByTimer, (int32_t)pTask->info.triggerParam, pTask, streamTimer);
|
||||||
|
pTask->schedInfo.status = TASK_TRIGGER_STATUS__INACTIVE;
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t streamSchedExec(SStreamTask* pTask) {
|
||||||
|
if (streamTaskSetSchedStatusWait(pTask)) {
|
||||||
|
SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq));
|
||||||
|
if (pRunReq == NULL) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
/*int8_t status = */streamTaskSetSchedStatusInactive(pTask);
|
||||||
|
stError("failed to create msg to aunch s-task:%s, reason out of memory", pTask->id.idStr);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
pRunReq->head.vgId = pTask->info.nodeId;
|
||||||
|
pRunReq->streamId = pTask->id.streamId;
|
||||||
|
pRunReq->taskId = pTask->id.taskId;
|
||||||
|
|
||||||
|
stDebug("trigger to run s-task:%s", pTask->id.idStr);
|
||||||
|
|
||||||
|
SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)};
|
||||||
|
tmsgPutToQueue(pTask->pMsgCb, STREAM_QUEUE, &msg);
|
||||||
|
} else {
|
||||||
|
stTrace("s-task:%s not launch task since sched status:%d", pTask->id.idStr, pTask->status.schedStatus);
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t streamTaskSchedTask(SMsgCb* pMsgCb, int32_t vgId, int64_t streamId, int32_t taskId, int32_t execType) {
|
||||||
|
SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq));
|
||||||
|
if (pRunReq == NULL) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
stError("vgId:%d failed to create msg to start stream task:0x%x, type:%d, code:%s", vgId, taskId, execType,
|
||||||
|
terrstr());
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
stDebug("vgId:%d create msg to start stream task:0x%x", vgId, taskId);
|
||||||
|
|
||||||
|
pRunReq->head.vgId = vgId;
|
||||||
|
pRunReq->streamId = streamId;
|
||||||
|
pRunReq->taskId = taskId;
|
||||||
|
pRunReq->reqType = execType;
|
||||||
|
|
||||||
|
SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)};
|
||||||
|
tmsgPutToQueue(pMsgCb, STREAM_QUEUE, &msg);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
|
@ -992,3 +992,55 @@ int32_t streamTaskSendCheckpointReq(SStreamTask* pTask) {
|
||||||
tmsgSendReq(&pTask->info.mnodeEpset, &msg);
|
tmsgSendReq(&pTask->info.mnodeEpset, &msg);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
SStreamChildEpInfo* streamTaskGetUpstreamTaskEpInfo(SStreamTask* pTask, int32_t taskId) {
|
||||||
|
int32_t num = taosArrayGetSize(pTask->upstreamInfo.pList);
|
||||||
|
for (int32_t i = 0; i < num; ++i) {
|
||||||
|
SStreamChildEpInfo* pInfo = taosArrayGetP(pTask->upstreamInfo.pList, i);
|
||||||
|
if (pInfo->taskId == taskId) {
|
||||||
|
return pInfo;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
stError("s-task:%s failed to find upstream task:0x%x", pTask->id.idStr, taskId);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
char* createStreamTaskIdStr(int64_t streamId, int32_t taskId) {
|
||||||
|
char buf[128] = {0};
|
||||||
|
sprintf(buf, "0x%" PRIx64 "-0x%x", streamId, taskId);
|
||||||
|
return taosStrdup(buf);
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pReq) {
|
||||||
|
SStreamDataBlock* pData = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, sizeof(SStreamDataBlock));
|
||||||
|
int8_t status = TASK_INPUT_STATUS__NORMAL;
|
||||||
|
|
||||||
|
// enqueue
|
||||||
|
if (pData != NULL) {
|
||||||
|
stDebug("s-task:%s (child %d) recv retrieve req from task:0x%x(vgId:%d), reqId:0x%" PRIx64, pTask->id.idStr,
|
||||||
|
pTask->info.selfChildId, pReq->srcTaskId, pReq->srcNodeId, pReq->reqId);
|
||||||
|
|
||||||
|
pData->type = STREAM_INPUT__DATA_RETRIEVE;
|
||||||
|
pData->srcVgId = 0;
|
||||||
|
streamRetrieveReqToData(pReq, pData);
|
||||||
|
if (streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pData) == 0) {
|
||||||
|
status = TASK_INPUT_STATUS__NORMAL;
|
||||||
|
} else {
|
||||||
|
status = TASK_INPUT_STATUS__FAILED;
|
||||||
|
}
|
||||||
|
} else { // todo handle oom
|
||||||
|
/*streamTaskInputFail(pTask);*/
|
||||||
|
/*status = TASK_INPUT_STATUS__FAILED;*/
|
||||||
|
}
|
||||||
|
|
||||||
|
return status == TASK_INPUT_STATUS__NORMAL ? 0 : -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq) {
|
||||||
|
int32_t code = streamTaskEnqueueRetrieve(pTask, pReq);
|
||||||
|
if(code != 0){
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
return streamSchedExec(pTask);
|
||||||
|
}
|
|
@ -0,0 +1,40 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "streamInt.h"
|
||||||
|
#include "ttimer.h"
|
||||||
|
|
||||||
|
void* streamTimer = NULL;
|
||||||
|
|
||||||
|
int32_t streamTimerInit() {
|
||||||
|
streamTimer = taosTmrInit(1000, 100, 10000, "STREAM");
|
||||||
|
if (streamTimer == NULL) {
|
||||||
|
stError("init stream timer failed, code:%s", tstrerror(terrno));
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
stInfo("init stream timer, %p", streamTimer);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
void streamTimerCleanUp() {
|
||||||
|
stInfo("cleanup stream timer, %p", streamTimer);
|
||||||
|
taosTmrCleanUp(streamTimer);
|
||||||
|
streamTimer = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
tmr_h streamTimerGetInstance() {
|
||||||
|
return streamTimer;
|
||||||
|
}
|
Loading…
Reference in New Issue