Merge pull request #26150 from taosdata/fix/3_liaohj
fix(stream): move the expansion of the stream task related task to right place.
This commit is contained in:
commit
62b69932b2
|
@ -43,7 +43,7 @@ int32_t tqStreamTaskProcessTaskPauseReq(SStreamMeta* pMeta, char* pMsg);
|
||||||
int32_t tqStreamTaskProcessTaskResumeReq(void* handle, int64_t sversion, char* pMsg, bool fromVnode);
|
int32_t tqStreamTaskProcessTaskResumeReq(void* handle, int64_t sversion, char* pMsg, bool fromVnode);
|
||||||
int32_t tqStreamTaskProcessUpdateCheckpointReq(SStreamMeta* pMeta, char* msg, int32_t msgLen);
|
int32_t tqStreamTaskProcessUpdateCheckpointReq(SStreamMeta* pMeta, char* msg, int32_t msgLen);
|
||||||
|
|
||||||
int32_t tqExpandStreamTask(SStreamTask* pTask, SStreamMeta* pMeta);
|
|
||||||
void tqSetRestoreVersionInfo(SStreamTask* pTask);
|
void tqSetRestoreVersionInfo(SStreamTask* pTask);
|
||||||
|
int32_t tqExpandStreamTask(SStreamTask* pTask);
|
||||||
|
|
||||||
#endif // TDENGINE_TQ_COMMON_H
|
#endif // TDENGINE_TQ_COMMON_H
|
||||||
|
|
|
@ -157,7 +157,8 @@ typedef enum EStreamTaskEvent {
|
||||||
|
|
||||||
typedef void FTbSink(SStreamTask* pTask, void* vnode, void* data);
|
typedef void FTbSink(SStreamTask* pTask, void* vnode, void* data);
|
||||||
typedef void FSmaSink(void* vnode, int64_t smaId, const SArray* data);
|
typedef void FSmaSink(void* vnode, int64_t smaId, const SArray* data);
|
||||||
typedef int32_t FTaskExpand(void* ahandle, SStreamTask* pTask, int64_t ver);
|
typedef int32_t FTaskBuild(void* ahandle, SStreamTask* pTask, int64_t ver);
|
||||||
|
typedef int32_t FTaskExpand(SStreamTask* pTask);
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int8_t type;
|
int8_t type;
|
||||||
|
@ -486,7 +487,8 @@ typedef struct SStreamMeta {
|
||||||
SArray* pTaskList; // SArray<STaskId*>
|
SArray* pTaskList; // SArray<STaskId*>
|
||||||
void* ahandle;
|
void* ahandle;
|
||||||
TXN* txn;
|
TXN* txn;
|
||||||
FTaskExpand* expandFunc;
|
FTaskBuild* buildTaskFn;
|
||||||
|
FTaskExpand* expandTaskFn;
|
||||||
int32_t vgId;
|
int32_t vgId;
|
||||||
int64_t stage;
|
int64_t stage;
|
||||||
int32_t role;
|
int32_t role;
|
||||||
|
@ -710,8 +712,8 @@ SScanhistoryDataInfo streamScanHistoryData(SStreamTask* pTask, int64_t st);
|
||||||
// stream task meta
|
// stream task meta
|
||||||
void streamMetaInit();
|
void streamMetaInit();
|
||||||
void streamMetaCleanup();
|
void streamMetaCleanup();
|
||||||
SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId, int64_t stage,
|
SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskBuild expandFunc, FTaskExpand expandTaskFn,
|
||||||
startComplete_fn_t fn);
|
int32_t vgId, int64_t stage, startComplete_fn_t fn);
|
||||||
void streamMetaClose(SStreamMeta* streamMeta);
|
void streamMetaClose(SStreamMeta* streamMeta);
|
||||||
int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask); // save to stream meta store
|
int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask); // save to stream meta store
|
||||||
int32_t streamMetaRemoveTask(SStreamMeta* pMeta, STaskId* pKey);
|
int32_t streamMetaRemoveTask(SStreamMeta* pMeta, STaskId* pKey);
|
||||||
|
|
|
@ -25,14 +25,6 @@
|
||||||
#define sndDebug(...) do { if (sndDebugFlag & DEBUG_DEBUG) { taosPrintLog("SND ", DEBUG_DEBUG, sndDebugFlag, __VA_ARGS__);}} while (0)
|
#define sndDebug(...) do { if (sndDebugFlag & DEBUG_DEBUG) { taosPrintLog("SND ", DEBUG_DEBUG, sndDebugFlag, __VA_ARGS__);}} while (0)
|
||||||
// clang-format on
|
// clang-format on
|
||||||
|
|
||||||
static STaskId replaceStreamTaskId(SStreamTask *pTask) {
|
|
||||||
ASSERT(pTask->info.fillHistory);
|
|
||||||
STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
|
|
||||||
pTask->id.streamId = pTask->streamTaskId.streamId;
|
|
||||||
pTask->id.taskId = pTask->streamTaskId.taskId;
|
|
||||||
return id;
|
|
||||||
}
|
|
||||||
|
|
||||||
static void restoreStreamTaskId(SStreamTask *pTask, STaskId *pId) {
|
static void restoreStreamTaskId(SStreamTask *pTask, STaskId *pId) {
|
||||||
ASSERT(pTask->info.fillHistory);
|
ASSERT(pTask->info.fillHistory);
|
||||||
pTask->id.taskId = pId->taskId;
|
pTask->id.taskId = pId->taskId;
|
||||||
|
@ -85,7 +77,7 @@ SSnode *sndOpen(const char *path, const SSnodeOpt *pOption) {
|
||||||
startRsync();
|
startRsync();
|
||||||
|
|
||||||
pSnode->msgCb = pOption->msgCb;
|
pSnode->msgCb = pOption->msgCb;
|
||||||
pSnode->pMeta = streamMetaOpen(path, pSnode, (FTaskExpand *)sndExpandTask, SNODE_HANDLE, taosGetTimestampMs(), tqStartTaskCompleteCallback);
|
pSnode->pMeta = streamMetaOpen(path, pSnode, (FTaskBuild *)sndExpandTask, tqExpandStreamTask, SNODE_HANDLE, taosGetTimestampMs(), tqStartTaskCompleteCallback);
|
||||||
if (pSnode->pMeta == NULL) {
|
if (pSnode->pMeta == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
goto FAIL;
|
goto FAIL;
|
||||||
|
|
|
@ -264,7 +264,7 @@ int32_t tqProcessStreamReqCheckpointRsp(STQ* pTq, SRpcMsg* pMsg);
|
||||||
int32_t tqProcessTaskChkptReportRsp(STQ* pTq, SRpcMsg* pMsg);
|
int32_t tqProcessTaskChkptReportRsp(STQ* pTq, SRpcMsg* pMsg);
|
||||||
int32_t tqProcessTaskCheckpointReadyRsp(STQ* pTq, SRpcMsg* pMsg);
|
int32_t tqProcessTaskCheckpointReadyRsp(STQ* pTq, SRpcMsg* pMsg);
|
||||||
|
|
||||||
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver);
|
int32_t tqBuildStreamTask(void* pTq, SStreamTask* pTask, int64_t ver);
|
||||||
int32_t tqScanWal(STQ* pTq);
|
int32_t tqScanWal(STQ* pTq);
|
||||||
|
|
||||||
int tqCommit(STQ*);
|
int tqCommit(STQ*);
|
||||||
|
|
|
@ -90,7 +90,8 @@ STQ* tqOpen(const char* path, SVnode* pVnode) {
|
||||||
|
|
||||||
int32_t tqInitialize(STQ* pTq) {
|
int32_t tqInitialize(STQ* pTq) {
|
||||||
int32_t vgId = TD_VID(pTq->pVnode);
|
int32_t vgId = TD_VID(pTq->pVnode);
|
||||||
pTq->pStreamMeta = streamMetaOpen(pTq->path, pTq, (FTaskExpand*)tqExpandTask, vgId, -1, tqStartTaskCompleteCallback);
|
pTq->pStreamMeta =
|
||||||
|
streamMetaOpen(pTq->path, pTq, tqBuildStreamTask, tqExpandStreamTask, vgId, -1, tqStartTaskCompleteCallback);
|
||||||
if (pTq->pStreamMeta == NULL) {
|
if (pTq->pStreamMeta == NULL) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -713,7 +714,9 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
|
||||||
|
|
||||||
static void freePtr(void* ptr) { taosMemoryFree(*(void**)ptr); }
|
static void freePtr(void* ptr) { taosMemoryFree(*(void**)ptr); }
|
||||||
|
|
||||||
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t nextProcessVer) {
|
int32_t tqBuildStreamTask(void* pTqObj, SStreamTask* pTask, int64_t nextProcessVer) {
|
||||||
|
STQ* pTq = (STQ*) pTqObj;
|
||||||
|
|
||||||
int32_t vgId = TD_VID(pTq->pVnode);
|
int32_t vgId = TD_VID(pTq->pVnode);
|
||||||
tqDebug("s-task:0x%x start to build task", pTask->id.taskId);
|
tqDebug("s-task:0x%x start to build task", pTask->id.taskId);
|
||||||
|
|
||||||
|
|
|
@ -541,14 +541,14 @@ int32_t streamProcessTransstateBlock(SStreamTask* pTask, SStreamDataBlock* pBloc
|
||||||
//static void streamTaskSetIdleInfo(SStreamTask* pTask, int32_t idleTime) { pTask->status.schedIdleTime = idleTime; }
|
//static void streamTaskSetIdleInfo(SStreamTask* pTask, int32_t idleTime) { pTask->status.schedIdleTime = idleTime; }
|
||||||
static void setLastExecTs(SStreamTask* pTask, int64_t ts) { pTask->status.lastExecTs = ts; }
|
static void setLastExecTs(SStreamTask* pTask, int64_t ts) { pTask->status.lastExecTs = ts; }
|
||||||
|
|
||||||
static void doStreamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pBlock) {
|
static void doStreamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pBlock, int32_t num) {
|
||||||
const char* id = pTask->id.idStr;
|
const char* id = pTask->id.idStr;
|
||||||
int32_t blockSize = 0;
|
int32_t blockSize = 0;
|
||||||
int64_t st = taosGetTimestampMs();
|
int64_t st = taosGetTimestampMs();
|
||||||
SCheckpointInfo* pInfo = &pTask->chkInfo;
|
SCheckpointInfo* pInfo = &pTask->chkInfo;
|
||||||
int64_t ver = pInfo->processedVer;
|
int64_t ver = pInfo->processedVer;
|
||||||
|
|
||||||
stDebug("s-task:%s start to process batch of blocks, num:%d, type:%s", id, 1, "checkpoint-trigger");
|
stDebug("s-task:%s start to process batch blocks, num:%d, type:%s", id, num, streamQueueItemGetTypeStr(pBlock->type));
|
||||||
|
|
||||||
doSetStreamInputBlock(pTask, pBlock, &ver, id);
|
doSetStreamInputBlock(pTask, pBlock, &ver, id);
|
||||||
|
|
||||||
|
@ -607,7 +607,7 @@ void flushStateDataInExecutor(SStreamTask* pTask, SStreamQueueItem* pCheckpointB
|
||||||
}
|
}
|
||||||
|
|
||||||
// 2. flush data in executor to K/V store, which should be completed before do checkpoint in the K/V.
|
// 2. flush data in executor to K/V store, which should be completed before do checkpoint in the K/V.
|
||||||
doStreamTaskExecImpl(pTask, pCheckpointBlock);
|
doStreamTaskExecImpl(pTask, pCheckpointBlock, 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -698,7 +698,7 @@ static int32_t doStreamExecTask(SStreamTask* pTask) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (type != STREAM_INPUT__CHECKPOINT) {
|
if (type != STREAM_INPUT__CHECKPOINT) {
|
||||||
doStreamTaskExecImpl(pTask, pInput);
|
doStreamTaskExecImpl(pTask, pInput, numOfBlocks);
|
||||||
streamFreeQitem(pInput);
|
streamFreeQitem(pInput);
|
||||||
} else { // todo other thread may change the status
|
} else { // todo other thread may change the status
|
||||||
// do nothing after sync executor state to storage backend, untill the vnode-level checkpoint is completed.
|
// do nothing after sync executor state to storage backend, untill the vnode-level checkpoint is completed.
|
||||||
|
|
|
@ -299,8 +299,8 @@ void streamMetaRemoveDB(void* arg, char* key) {
|
||||||
taosThreadMutexUnlock(&pMeta->backendMutex);
|
taosThreadMutexUnlock(&pMeta->backendMutex);
|
||||||
}
|
}
|
||||||
|
|
||||||
SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId, int64_t stage,
|
SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskBuild buildTaskFn, FTaskExpand expandTaskFn,
|
||||||
startComplete_fn_t fn) {
|
int32_t vgId, int64_t stage, startComplete_fn_t fn) {
|
||||||
SStreamMeta* pMeta = taosMemoryCalloc(1, sizeof(SStreamMeta));
|
SStreamMeta* pMeta = taosMemoryCalloc(1, sizeof(SStreamMeta));
|
||||||
if (pMeta == NULL) {
|
if (pMeta == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
@ -369,7 +369,8 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
|
||||||
pMeta->scanInfo.scanCounter = 0;
|
pMeta->scanInfo.scanCounter = 0;
|
||||||
pMeta->vgId = vgId;
|
pMeta->vgId = vgId;
|
||||||
pMeta->ahandle = ahandle;
|
pMeta->ahandle = ahandle;
|
||||||
pMeta->expandFunc = expandFunc;
|
pMeta->buildTaskFn = buildTaskFn;
|
||||||
|
pMeta->expandTaskFn = expandTaskFn;
|
||||||
pMeta->stage = stage;
|
pMeta->stage = stage;
|
||||||
pMeta->role = (vgId == SNODE_HANDLE) ? NODE_ROLE_LEADER : NODE_ROLE_UNINIT;
|
pMeta->role = (vgId == SNODE_HANDLE) ? NODE_ROLE_LEADER : NODE_ROLE_UNINIT;
|
||||||
pMeta->updateInfo.transId = -1;
|
pMeta->updateInfo.transId = -1;
|
||||||
|
@ -602,7 +603,7 @@ int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTa
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pMeta->expandFunc(pMeta->ahandle, pTask, ver) < 0) {
|
if (pMeta->buildTaskFn(pMeta->ahandle, pTask, ver) < 0) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -901,7 +902,7 @@ void streamMetaLoadAllTasks(SStreamMeta* pMeta) {
|
||||||
STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
|
STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
|
||||||
void* p = taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
|
void* p = taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
|
||||||
if (p == NULL) {
|
if (p == NULL) {
|
||||||
code = pMeta->expandFunc(pMeta->ahandle, pTask, pTask->chkInfo.checkpointVer + 1);
|
code = pMeta->buildTaskFn(pMeta->ahandle, pTask, pTask->chkInfo.checkpointVer + 1);
|
||||||
if (code < 0) {
|
if (code < 0) {
|
||||||
stError("failed to expand s-task:0x%"PRIx64", code:%s, continue", id.taskId, tstrerror(terrno));
|
stError("failed to expand s-task:0x%"PRIx64", code:%s, continue", id.taskId, tstrerror(terrno));
|
||||||
tFreeStreamTask(pTask);
|
tFreeStreamTask(pTask);
|
||||||
|
@ -1522,6 +1523,7 @@ bool streamMetaAllTasksReady(const SStreamMeta* pMeta) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, __stream_task_expand_fn expandFn) {
|
int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, __stream_task_expand_fn expandFn) {
|
||||||
|
int32_t code = 0;
|
||||||
int32_t vgId = pMeta->vgId;
|
int32_t vgId = pMeta->vgId;
|
||||||
stInfo("vgId:%d start task:0x%x by checking it's downstream status", vgId, taskId);
|
stInfo("vgId:%d start task:0x%x by checking it's downstream status", vgId, taskId);
|
||||||
|
|
||||||
|
@ -1541,40 +1543,22 @@ int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t tas
|
||||||
|
|
||||||
ASSERT(pTask->status.downstreamReady == 0);
|
ASSERT(pTask->status.downstreamReady == 0);
|
||||||
if (pTask->pBackend == NULL) {
|
if (pTask->pBackend == NULL) {
|
||||||
int32_t code = expandFn(pTask);
|
code = expandFn(pTask);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
streamMetaAddFailedTaskSelf(pTask, pInfo->readyTs);
|
streamMetaAddFailedTaskSelf(pTask, pInfo->readyTs);
|
||||||
streamMetaReleaseTask(pMeta, pTask);
|
}
|
||||||
return code;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
|
if (code == TSDB_CODE_SUCCESS) {
|
||||||
SStreamTask* pHTask = streamMetaAcquireTask(pMeta, pTask->hTaskInfo.id.streamId, pTask->hTaskInfo.id.taskId);
|
code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_INIT);
|
||||||
if (pHTask != NULL) {
|
|
||||||
if (pHTask->pBackend == NULL) {
|
|
||||||
code = expandFn(pHTask);
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
streamMetaAddFailedTaskSelf(pHTask, pInfo->readyTs);
|
|
||||||
|
|
||||||
streamMetaReleaseTask(pMeta, pHTask);
|
|
||||||
streamMetaReleaseTask(pMeta, pTask);
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
streamMetaReleaseTask(pMeta, pHTask);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t ret = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_INIT);
|
|
||||||
if (ret != TSDB_CODE_SUCCESS) {
|
|
||||||
stError("s-task:%s vgId:%d failed to handle event:%d", pTask->id.idStr, pMeta->vgId, TASK_EVENT_INIT);
|
stError("s-task:%s vgId:%d failed to handle event:%d", pTask->id.idStr, pMeta->vgId, TASK_EVENT_INIT);
|
||||||
streamMetaAddFailedTaskSelf(pTask, pInfo->readyTs);
|
streamMetaAddFailedTaskSelf(pTask, pInfo->readyTs);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
streamMetaReleaseTask(pMeta, pTask);
|
streamMetaReleaseTask(pMeta, pTask);
|
||||||
return ret;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void displayStatusInfo(SStreamMeta* pMeta, SHashObj* pTaskSet, bool succ) {
|
static void displayStatusInfo(SStreamMeta* pMeta, SHashObj* pTaskSet, bool succ) {
|
||||||
|
|
|
@ -197,6 +197,9 @@ int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) {
|
||||||
const char* idStr = pTask->id.idStr;
|
const char* idStr = pTask->id.idStr;
|
||||||
int64_t hStreamId = pTask->hTaskInfo.id.streamId;
|
int64_t hStreamId = pTask->hTaskInfo.id.streamId;
|
||||||
int32_t hTaskId = pTask->hTaskInfo.id.taskId;
|
int32_t hTaskId = pTask->hTaskInfo.id.taskId;
|
||||||
|
int64_t now = taosGetTimestampMs();
|
||||||
|
int32_t code = 0;
|
||||||
|
|
||||||
ASSERT(hTaskId != 0);
|
ASSERT(hTaskId != 0);
|
||||||
|
|
||||||
// check stream task status in the first place.
|
// check stream task status in the first place.
|
||||||
|
@ -226,9 +229,20 @@ int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) {
|
||||||
stDebug("s-task:%s fill-history task is ready, no need to check downstream", pHisTask->id.idStr);
|
stDebug("s-task:%s fill-history task is ready, no need to check downstream", pHisTask->id.idStr);
|
||||||
streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->checkTs, pExecInfo->readyTs, true);
|
streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->checkTs, pExecInfo->readyTs, true);
|
||||||
} else { // exist, but not ready, continue check downstream task status
|
} else { // exist, but not ready, continue check downstream task status
|
||||||
|
if (pHisTask->pBackend == NULL) {
|
||||||
|
code = pMeta->expandTaskFn(pHisTask);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
streamMetaAddFailedTaskSelf(pHisTask, now);
|
||||||
|
stError("s-task:%s failed to expand fill-history task, code:%s", pHisTask->id.idStr, tstrerror(code));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (code == TSDB_CODE_SUCCESS) {
|
||||||
checkFillhistoryTaskStatus(pTask, pHisTask);
|
checkFillhistoryTaskStatus(pTask, pHisTask);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
streamMetaReleaseTask(pMeta, pHisTask);
|
streamMetaReleaseTask(pMeta, pHisTask);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -306,6 +320,7 @@ void tryLaunchHistoryTask(void* param, void* tmrId) {
|
||||||
SLaunchHTaskInfo* pInfo = param;
|
SLaunchHTaskInfo* pInfo = param;
|
||||||
SStreamMeta* pMeta = pInfo->pMeta;
|
SStreamMeta* pMeta = pInfo->pMeta;
|
||||||
int64_t now = taosGetTimestampMs();
|
int64_t now = taosGetTimestampMs();
|
||||||
|
int32_t code = 0;
|
||||||
|
|
||||||
streamMetaWLock(pMeta);
|
streamMetaWLock(pMeta);
|
||||||
|
|
||||||
|
@ -362,14 +377,23 @@ void tryLaunchHistoryTask(void* param, void* tmrId) {
|
||||||
streamMetaReleaseTask(pMeta, pTask);
|
streamMetaReleaseTask(pMeta, pTask);
|
||||||
return;
|
return;
|
||||||
} else {
|
} else {
|
||||||
checkFillhistoryTaskStatus(pTask, pHTask);
|
if (pHTask->pBackend == NULL) {
|
||||||
streamMetaReleaseTask(pMeta, pHTask);
|
code = pMeta->expandTaskFn(pHTask);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
streamMetaAddFailedTaskSelf(pHTask, now);
|
||||||
|
stError("failed to expand fill-history task:%s, code:%s", pHTask->id.idStr, tstrerror(code));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (code == TSDB_CODE_SUCCESS) {
|
||||||
|
checkFillhistoryTaskStatus(pTask, pHTask);
|
||||||
// not in timer anymore
|
// not in timer anymore
|
||||||
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
|
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
|
||||||
stDebug("s-task:0x%x fill-history task launch completed, retry times:%d, ref:%d", (int32_t)pInfo->id.taskId,
|
stDebug("s-task:0x%x fill-history task launch completed, retry times:%d, ref:%d", (int32_t)pInfo->id.taskId,
|
||||||
pHTaskInfo->retryTimes, ref);
|
pHTaskInfo->retryTimes, ref);
|
||||||
}
|
}
|
||||||
|
streamMetaReleaseTask(pMeta, pHTask);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
streamMetaReleaseTask(pMeta, pTask);
|
streamMetaReleaseTask(pMeta, pTask);
|
||||||
|
|
|
@ -43,7 +43,7 @@ SStreamState *stateCreate(const char *path) {
|
||||||
pTask->ver = 1024;
|
pTask->ver = 1024;
|
||||||
pTask->id.streamId = 1023;
|
pTask->id.streamId = 1023;
|
||||||
pTask->id.taskId = 1111111;
|
pTask->id.taskId = 1111111;
|
||||||
SStreamMeta *pMeta = streamMetaOpen((path), NULL, NULL, 0, 0, NULL);
|
SStreamMeta *pMeta = streamMetaOpen((path), NULL, NULL, NULL, 0, 0, NULL);
|
||||||
pTask->pMeta = pMeta;
|
pTask->pMeta = pMeta;
|
||||||
|
|
||||||
SStreamState *p = streamStateOpen((char *)path, pTask, true, 32, 32 * 1024);
|
SStreamState *p = streamStateOpen((char *)path, pTask, true, 32, 32 * 1024);
|
||||||
|
|
Loading…
Reference in New Issue