fix snode crash

This commit is contained in:
yihaoDeng 2023-12-21 18:52:25 +08:00
parent 949ff6b18c
commit 913fda4a36
1 changed files with 37 additions and 40 deletions

View File

@ -19,26 +19,24 @@
#include "tqCommon.h"
#include "tuuid.h"
#define sndError(...) \
do { \
if (sndDebugFlag & DEBUG_ERROR) { \
taosPrintLog("SND ERROR ", DEBUG_ERROR, sndDebugFlag, __VA_ARGS__); \
} \
} while (0)
// clang-format off
#define sndError(...) do { if (sndDebugFlag & DEBUG_ERROR) {taosPrintLog("SND ERROR ", DEBUG_ERROR, sndDebugFlag, __VA_ARGS__);}} while (0)
#define sndInfo(...) do { if (sndDebugFlag & DEBUG_INFO) { taosPrintLog("SND INFO ", DEBUG_INFO, sndDebugFlag, __VA_ARGS__);}} while (0)
#define sndDebug(...) do { if (sndDebugFlag & DEBUG_DEBUG) { taosPrintLog("SND ", DEBUG_DEBUG, sndDebugFlag, __VA_ARGS__);}} while (0)
// clang-format on
#define sndInfo(...) \
do { \
if (sndDebugFlag & DEBUG_INFO) { \
taosPrintLog("SND INFO ", DEBUG_INFO, sndDebugFlag, __VA_ARGS__); \
} \
} while (0)
#define sndDebug(...) \
do { \
if (sndDebugFlag & DEBUG_DEBUG) { \
taosPrintLog("SND ", DEBUG_DEBUG, sndDebugFlag, __VA_ARGS__); \
} \
} while (0)
static STaskId replaceStreamTaskId(SStreamTask *pTask) {
ASSERT(pTask->info.fillHistory);
STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
pTask->id.streamId = pTask->streamTaskId.streamId;
pTask->id.taskId = pTask->streamTaskId.taskId;
return id;
}
static void restoreStreamTaskId(SStreamTask *pTask, STaskId *pId) {
ASSERT(pTask->info.fillHistory);
pTask->id.taskId = pId->taskId;
pTask->id.streamId = pId->streamId;
}
int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t nextProcessVer) {
ASSERT(pTask->info.taskLevel == TASK_LEVEL__AGG && taosArrayGetSize(pTask->upstreamInfo.pList) != 0);
@ -50,16 +48,12 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t nextProcessVer
streamTaskOpenAllUpstreamInput(pTask);
SStreamTask *pSateTask = pTask;
SStreamTask task = {0};
STaskId taskId = {0};
if (pTask->info.fillHistory) {
task.id.streamId = pTask->streamTaskId.streamId;
task.id.taskId = pTask->streamTaskId.taskId;
task.pMeta = pTask->pMeta;
pSateTask = &task;
taskId = replaceStreamTaskId(pTask);
}
pTask->pState = streamStateOpen(pSnode->path, pSateTask, false, -1, -1);
pTask->pState = streamStateOpen(pSnode->path, pTask, false, -1, -1);
if (pTask->pState == NULL) {
sndError("s-task:%s failed to open state for task", pTask->id.idStr);
return -1;
@ -67,6 +61,9 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t nextProcessVer
sndDebug("s-task:%s state:%p", pTask->id.idStr, pTask->pState);
}
if (pTask->info.fillHistory) {
restoreStreamTaskId(pTask, &taskId);
}
int32_t numOfVgroups = (int32_t)taosArrayGetSize(pTask->upstreamInfo.pList);
SReadHandle handle = {
@ -90,8 +87,8 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t nextProcessVer
// checkpoint ver is the kept version, handled data should be the next version.
if (pTask->chkInfo.checkpointId != 0) {
pTask->chkInfo.nextProcessVer = pTask->chkInfo.checkpointVer + 1;
sndInfo("s-task:%s restore from the checkpointId:%" PRId64 " ver:%" PRId64 " nextProcessVer:%" PRId64, pTask->id.idStr,
pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer);
sndInfo("s-task:%s restore from the checkpointId:%" PRId64 " ver:%" PRId64 " nextProcessVer:%" PRId64,
pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer);
}
char *p = NULL;