Merge pull request #24179 from taosdata/fix/TD-27957-30
fix snode crash
This commit is contained in:
commit
67edc29a65
|
@ -19,26 +19,24 @@
|
||||||
#include "tqCommon.h"
|
#include "tqCommon.h"
|
||||||
#include "tuuid.h"
|
#include "tuuid.h"
|
||||||
|
|
||||||
#define sndError(...) \
|
// clang-format off
|
||||||
do { \
|
#define sndError(...) do { if (sndDebugFlag & DEBUG_ERROR) {taosPrintLog("SND ERROR ", DEBUG_ERROR, sndDebugFlag, __VA_ARGS__);}} while (0)
|
||||||
if (sndDebugFlag & DEBUG_ERROR) { \
|
#define sndInfo(...) do { if (sndDebugFlag & DEBUG_INFO) { taosPrintLog("SND INFO ", DEBUG_INFO, sndDebugFlag, __VA_ARGS__);}} while (0)
|
||||||
taosPrintLog("SND ERROR ", DEBUG_ERROR, sndDebugFlag, __VA_ARGS__); \
|
#define sndDebug(...) do { if (sndDebugFlag & DEBUG_DEBUG) { taosPrintLog("SND ", DEBUG_DEBUG, sndDebugFlag, __VA_ARGS__);}} while (0)
|
||||||
} \
|
// clang-format on
|
||||||
} while (0)
|
|
||||||
|
|
||||||
#define sndInfo(...) \
|
static STaskId replaceStreamTaskId(SStreamTask *pTask) {
|
||||||
do { \
|
ASSERT(pTask->info.fillHistory);
|
||||||
if (sndDebugFlag & DEBUG_INFO) { \
|
STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
|
||||||
taosPrintLog("SND INFO ", DEBUG_INFO, sndDebugFlag, __VA_ARGS__); \
|
pTask->id.streamId = pTask->streamTaskId.streamId;
|
||||||
} \
|
pTask->id.taskId = pTask->streamTaskId.taskId;
|
||||||
} while (0)
|
return id;
|
||||||
|
}
|
||||||
#define sndDebug(...) \
|
static void restoreStreamTaskId(SStreamTask *pTask, STaskId *pId) {
|
||||||
do { \
|
ASSERT(pTask->info.fillHistory);
|
||||||
if (sndDebugFlag & DEBUG_DEBUG) { \
|
pTask->id.taskId = pId->taskId;
|
||||||
taosPrintLog("SND ", DEBUG_DEBUG, sndDebugFlag, __VA_ARGS__); \
|
pTask->id.streamId = pId->streamId;
|
||||||
} \
|
}
|
||||||
} while (0)
|
|
||||||
|
|
||||||
int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t nextProcessVer) {
|
int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t nextProcessVer) {
|
||||||
ASSERT(pTask->info.taskLevel == TASK_LEVEL__AGG && taosArrayGetSize(pTask->upstreamInfo.pList) != 0);
|
ASSERT(pTask->info.taskLevel == TASK_LEVEL__AGG && taosArrayGetSize(pTask->upstreamInfo.pList) != 0);
|
||||||
|
@ -50,23 +48,22 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t nextProcessVer
|
||||||
|
|
||||||
streamTaskOpenAllUpstreamInput(pTask);
|
streamTaskOpenAllUpstreamInput(pTask);
|
||||||
|
|
||||||
SStreamTask *pSateTask = pTask;
|
STaskId taskId = {0};
|
||||||
SStreamTask task = {0};
|
|
||||||
if (pTask->info.fillHistory) {
|
if (pTask->info.fillHistory) {
|
||||||
task.id.streamId = pTask->streamTaskId.streamId;
|
taskId = replaceStreamTaskId(pTask);
|
||||||
task.id.taskId = pTask->streamTaskId.taskId;
|
|
||||||
task.pMeta = pTask->pMeta;
|
|
||||||
pSateTask = &task;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pTask->pState = streamStateOpen(pSnode->path, pSateTask, false, -1, -1);
|
pTask->pState = streamStateOpen(pSnode->path, pTask, false, -1, -1);
|
||||||
if (pTask->pState == NULL) {
|
if (pTask->pState == NULL) {
|
||||||
sndError("s-task:%s failed to open state for task", pTask->id.idStr);
|
sndError("s-task:%s failed to open state for task", pTask->id.idStr);
|
||||||
return -1;
|
return -1;
|
||||||
} else {
|
} else {
|
||||||
sndDebug("s-task:%s state:%p", pTask->id.idStr, pTask->pState);
|
sndDebug("s-task:%s state:%p", pTask->id.idStr, pTask->pState);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (pTask->info.fillHistory) {
|
||||||
|
restoreStreamTaskId(pTask, &taskId);
|
||||||
|
}
|
||||||
|
|
||||||
int32_t numOfVgroups = (int32_t)taosArrayGetSize(pTask->upstreamInfo.pList);
|
int32_t numOfVgroups = (int32_t)taosArrayGetSize(pTask->upstreamInfo.pList);
|
||||||
SReadHandle handle = {
|
SReadHandle handle = {
|
||||||
|
@ -90,8 +87,8 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t nextProcessVer
|
||||||
// checkpoint ver is the kept version, handled data should be the next version.
|
// checkpoint ver is the kept version, handled data should be the next version.
|
||||||
if (pTask->chkInfo.checkpointId != 0) {
|
if (pTask->chkInfo.checkpointId != 0) {
|
||||||
pTask->chkInfo.nextProcessVer = pTask->chkInfo.checkpointVer + 1;
|
pTask->chkInfo.nextProcessVer = pTask->chkInfo.checkpointVer + 1;
|
||||||
sndInfo("s-task:%s restore from the checkpointId:%" PRId64 " ver:%" PRId64 " nextProcessVer:%" PRId64, pTask->id.idStr,
|
sndInfo("s-task:%s restore from the checkpointId:%" PRId64 " ver:%" PRId64 " nextProcessVer:%" PRId64,
|
||||||
pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer);
|
pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer);
|
||||||
}
|
}
|
||||||
|
|
||||||
char *p = NULL;
|
char *p = NULL;
|
||||||
|
@ -99,18 +96,18 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t nextProcessVer
|
||||||
|
|
||||||
if (pTask->info.fillHistory) {
|
if (pTask->info.fillHistory) {
|
||||||
sndInfo("vgId:%d expand stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64
|
sndInfo("vgId:%d expand stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64
|
||||||
" nextProcessVer:%" PRId64
|
" nextProcessVer:%" PRId64
|
||||||
" child id:%d, level:%d, status:%s fill-history:%d, related stream task:0x%x trigger:%" PRId64 " ms",
|
" child id:%d, level:%d, status:%s fill-history:%d, related stream task:0x%x trigger:%" PRId64 " ms",
|
||||||
SNODE_HANDLE, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer,
|
SNODE_HANDLE, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer,
|
||||||
pTask->info.selfChildId, pTask->info.taskLevel, p, pTask->info.fillHistory,
|
pTask->info.selfChildId, pTask->info.taskLevel, p, pTask->info.fillHistory,
|
||||||
(int32_t)pTask->streamTaskId.taskId, pTask->info.triggerParam);
|
(int32_t)pTask->streamTaskId.taskId, pTask->info.triggerParam);
|
||||||
} else {
|
} else {
|
||||||
sndInfo("vgId:%d expand stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64
|
sndInfo("vgId:%d expand stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64
|
||||||
" nextProcessVer:%" PRId64
|
" nextProcessVer:%" PRId64
|
||||||
" child id:%d, level:%d, status:%s fill-history:%d, related fill-task:0x%x trigger:%" PRId64 " ms",
|
" child id:%d, level:%d, status:%s fill-history:%d, related fill-task:0x%x trigger:%" PRId64 " ms",
|
||||||
SNODE_HANDLE, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer,
|
SNODE_HANDLE, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer,
|
||||||
pTask->info.selfChildId, pTask->info.taskLevel, p, pTask->info.fillHistory,
|
pTask->info.selfChildId, pTask->info.taskLevel, p, pTask->info.fillHistory,
|
||||||
(int32_t)pTask->hTaskInfo.id.taskId, pTask->info.triggerParam);
|
(int32_t)pTask->hTaskInfo.id.taskId, pTask->info.triggerParam);
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -149,7 +146,7 @@ FAIL:
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t sndInit(SSnode * pSnode) {
|
int32_t sndInit(SSnode *pSnode) {
|
||||||
resetStreamTaskStatus(pSnode->pMeta);
|
resetStreamTaskStatus(pSnode->pMeta);
|
||||||
startStreamTasks(pSnode->pMeta);
|
startStreamTasks(pSnode->pMeta);
|
||||||
return 0;
|
return 0;
|
||||||
|
|
Loading…
Reference in New Issue