fix(stream): not revise the stream id for fill-history task.

This commit is contained in:
Haojun Liao 2024-06-17 23:13:54 +08:00
parent f57ff96408
commit c4a2756907
6 changed files with 21 additions and 52 deletions

View File

@ -29,7 +29,8 @@ extern "C" {
#include "storageapi.h" #include "storageapi.h"
SStreamState* streamStateOpen(const char* path, void* pTask, bool specPath, int32_t szPage, int32_t pages); SStreamState* streamStateOpen(const char* path, void* pTask, int64_t streamId, int32_t taskId, bool specPath,
int32_t szPage, int32_t pages);
void streamStateClose(SStreamState* pState, bool remove); void streamStateClose(SStreamState* pState, bool remove);
int32_t streamStateBegin(SStreamState* pState); int32_t streamStateBegin(SStreamState* pState);
int32_t streamStateCommit(SStreamState* pState); int32_t streamStateCommit(SStreamState* pState);

View File

@ -25,12 +25,6 @@
#define sndDebug(...) do { if (sndDebugFlag & DEBUG_DEBUG) { taosPrintLog("SND ", DEBUG_DEBUG, sndDebugFlag, __VA_ARGS__);}} while (0) #define sndDebug(...) do { if (sndDebugFlag & DEBUG_DEBUG) { taosPrintLog("SND ", DEBUG_DEBUG, sndDebugFlag, __VA_ARGS__);}} while (0)
// clang-format on // clang-format on
static void restoreStreamTaskId(SStreamTask *pTask, STaskId *pId) {
ASSERT(pTask->info.fillHistory);
pTask->id.taskId = pId->taskId;
pTask->id.streamId = pId->streamId;
}
int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t nextProcessVer) { int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t nextProcessVer) {
ASSERT(pTask->info.taskLevel == TASK_LEVEL__AGG && taosArrayGetSize(pTask->upstreamInfo.pList) != 0); ASSERT(pTask->info.taskLevel == TASK_LEVEL__AGG && taosArrayGetSize(pTask->upstreamInfo.pList) != 0);
int32_t code = streamTaskInit(pTask, pSnode->pMeta, &pSnode->msgCb, nextProcessVer); int32_t code = streamTaskInit(pTask, pSnode->pMeta, &pSnode->msgCb, nextProcessVer);

View File

@ -299,7 +299,7 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat
tdRSmaTaskInit(pStreamTask->pMeta, pItem, &pStreamTask->id); tdRSmaTaskInit(pStreamTask->pMeta, pItem, &pStreamTask->id);
pStreamTask->status.pSM = streamCreateStateMachine(pStreamTask); pStreamTask->status.pSM = streamCreateStateMachine(pStreamTask);
pStreamTask->chkInfo.pActiveInfo = streamTaskCreateActiveChkptInfo(); pStreamTask->chkInfo.pActiveInfo = streamTaskCreateActiveChkptInfo();
pStreamState = streamStateOpen(taskInfDir, pStreamTask, true, -1, -1); pStreamState = streamStateOpen(taskInfDir, pStreamTask, pStreamTask->id.streamId, pStreamTask->id.taskId, true, -1, -1);
if (!pStreamState) { if (!pStreamState) {
terrno = TSDB_CODE_RSMA_STREAM_STATE_OPEN; terrno = TSDB_CODE_RSMA_STREAM_STATE_OPEN;
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;

View File

@ -30,37 +30,26 @@ typedef struct SMStreamCheckpointReadyRspMsg {
static int32_t doProcessDummyRspMsg(SStreamMeta* pMeta, SRpcMsg* pMsg); static int32_t doProcessDummyRspMsg(SStreamMeta* pMeta, SRpcMsg* pMsg);
static STaskId replaceStreamTaskId(SStreamTask* pTask) {
ASSERT(pTask->info.fillHistory);
STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
pTask->id.streamId = pTask->streamTaskId.streamId;
pTask->id.taskId = pTask->streamTaskId.taskId;
return id;
}
static void restoreStreamTaskId(SStreamTask* pTask, STaskId* pId) {
ASSERT(pTask->info.fillHistory);
pTask->id.taskId = pId->taskId;
pTask->id.streamId = pId->streamId;
}
int32_t tqExpandStreamTask(SStreamTask* pTask) { int32_t tqExpandStreamTask(SStreamTask* pTask) {
SStreamMeta* pMeta = pTask->pMeta; SStreamMeta* pMeta = pTask->pMeta;
int32_t vgId = pMeta->vgId; int32_t vgId = pMeta->vgId;
STaskId taskId = {0};
int64_t st = taosGetTimestampMs(); int64_t st = taosGetTimestampMs();
int64_t streamId = 0;
int32_t taskId = 0;
tqDebug("s-task:%s vgId:%d start to expand stream task", pTask->id.idStr, vgId); tqDebug("s-task:%s vgId:%d start to expand stream task", pTask->id.idStr, vgId);
if (pTask->info.fillHistory) { if (pTask->info.fillHistory) {
taskId = replaceStreamTaskId(pTask); streamId = pTask->streamTaskId.streamId;
taskId = pTask->streamTaskId.taskId;
} else {
streamId = pTask->id.streamId;
taskId = pTask->id.taskId;
} }
// sink task does not need the pState // sink task does not need the pState
if (pTask->info.taskLevel != TASK_LEVEL__SINK) { if (pTask->info.taskLevel != TASK_LEVEL__SINK) {
pTask->pState = streamStateOpen(pMeta->path, pTask, false, -1, -1); pTask->pState = streamStateOpen(pMeta->path, pTask, false, streamId, taskId, -1, -1);
if (pTask->pState == NULL) { if (pTask->pState == NULL) {
tqError("s-task:%s (vgId:%d) failed to open state for task, expand task failed", pTask->id.idStr, vgId); tqError("s-task:%s (vgId:%d) failed to open state for task, expand task failed", pTask->id.idStr, vgId);
return -1; return -1;
@ -69,10 +58,6 @@ int32_t tqExpandStreamTask(SStreamTask* pTask) {
} }
} }
if (pTask->info.fillHistory) {
restoreStreamTaskId(pTask, &taskId);
}
SReadHandle handle = { SReadHandle handle = {
.checkpointId = pTask->chkInfo.checkpointId, .checkpointId = pTask->chkInfo.checkpointId,
.pStateBackend = pTask->pState, .pStateBackend = pTask->pState,

View File

@ -608,6 +608,7 @@ int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTa
} }
taosArrayPush(pMeta->pTaskList, &pTask->id); taosArrayPush(pMeta->pTaskList, &pTask->id);
taosHashPut(pMeta->pTasksMap, &id, sizeof(id), &pTask, POINTER_BYTES);
if (streamMetaSaveTask(pMeta, pTask) < 0) { if (streamMetaSaveTask(pMeta, pTask) < 0) {
return -1; return -1;
@ -617,7 +618,6 @@ int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTa
return -1; return -1;
} }
taosHashPut(pMeta->pTasksMap, &id, sizeof(id), &pTask, POINTER_BYTES);
if (pTask->info.fillHistory == 0) { if (pTask->info.fillHistory == 0) {
atomic_add_fetch_32(&pMeta->numOfStreamTasks, 1); atomic_add_fetch_32(&pMeta->numOfStreamTasks, 1);
} }
@ -672,21 +672,16 @@ void streamMetaReleaseTask(SStreamMeta* UNUSED_PARAM(pMeta), SStreamTask* pTask)
} }
} }
static void doRemoveIdFromList(SStreamMeta* pMeta, int32_t num, SStreamTaskId* id) { static void doRemoveIdFromList(SArray* pTaskList, int32_t num, SStreamTaskId* id) {
bool remove = false; bool remove = false;
for (int32_t i = 0; i < num; ++i) { for (int32_t i = 0; i < num; ++i) {
SStreamTaskId* pTaskId = taosArrayGet(pMeta->pTaskList, i); SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i);
if (pTaskId->streamId == id->streamId && pTaskId->taskId == id->taskId) { if (pTaskId->streamId == id->streamId && pTaskId->taskId == id->taskId) {
stDebug("vgId:%d remove streamId:0x%" PRIx64 " taskId:0x%x succ", pMeta->vgId, id->streamId, id->taskId); taosArrayRemove(pTaskList, i);
taosArrayRemove(pMeta->pTaskList, i);
remove = true; remove = true;
break; break;
} else {
stDebug("vgId:%d remove streamId:0x%" PRIx64 " taskId:0x%x, entry:0x%" PRIx64 "-0x%x", pMeta->vgId, id->streamId,
id->taskId, pTaskId->streamId, pTaskId->taskId);
} }
} }
ASSERT(remove); ASSERT(remove);
} }
@ -750,26 +745,19 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t
ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
if (ppTask) { if (ppTask) {
pTask = *ppTask; pTask = *ppTask;
SStreamTaskId pxId = pTask->id;
ASSERT((pxId.taskId == id.taskId) && (pxId.streamId == id.streamId));
// it is an fill-history task, remove the related stream task's id that points to it // it is an fill-history task, remove the related stream task's id that points to it
if (pTask->info.fillHistory == 0) { if (pTask->info.fillHistory == 0) {
atomic_sub_fetch_32(&pMeta->numOfStreamTasks, 1); atomic_sub_fetch_32(&pMeta->numOfStreamTasks, 1);
} }
ASSERT(taosHashGetSize(pMeta->pTasksMap) == taosArrayGetSize(pMeta->pTaskList));
taosHashRemove(pMeta->pTasksMap, &id, sizeof(id)); taosHashRemove(pMeta->pTasksMap, &id, sizeof(id));
doRemoveIdFromList(pMeta, (int32_t)taosArrayGetSize(pMeta->pTaskList), &pTask->id); doRemoveIdFromList(pMeta->pTaskList, (int32_t)taosArrayGetSize(pMeta->pTaskList), &pTask->id);
ASSERT(taosHashGetSize(pMeta->pTasksMap) == taosArrayGetSize(pMeta->pTaskList));
streamMetaRemoveTask(pMeta, &id); streamMetaRemoveTask(pMeta, &id);
ASSERT(taosHashGetSize(pMeta->pTasksMap) == taosArrayGetSize(pMeta->pTaskList));
streamMetaWUnLock(pMeta); streamMetaWUnLock(pMeta);
ASSERT(pTask->status.timerActive == 0); ASSERT(pTask->status.timerActive == 0);
if (pTask->info.delaySchedParam != 0 && pTask->info.fillHistory == 0) { if (pTask->info.delaySchedParam != 0 && pTask->info.fillHistory == 0) {
stDebug("s-task:%s stop schedTimer, and (before) desc ref:%d", pTask->id.idStr, pTask->refCnt); stDebug("s-task:%s stop schedTimer, and (before) desc ref:%d", pTask->id.idStr, pTask->refCnt);
taosTmrStop(pTask->schedInfo.pDelayTimer); taosTmrStop(pTask->schedInfo.pDelayTimer);

View File

@ -98,7 +98,8 @@ int stateKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, int kLen2) {
return winKeyCmprImpl(&pWin1->key, &pWin2->key); return winKeyCmprImpl(&pWin1->key, &pWin2->key);
} }
SStreamState* streamStateOpen(const char* path, void* pTask, bool specPath, int32_t szPage, int32_t pages) { SStreamState* streamStateOpen(const char* path, void* pTask, int64_t streamId, int32_t taskId, bool specPath,
int32_t szPage, int32_t pages) {
SStreamState* pState = taosMemoryCalloc(1, sizeof(SStreamState)); SStreamState* pState = taosMemoryCalloc(1, sizeof(SStreamState));
stDebug("open stream state %p, %s", pState, path); stDebug("open stream state %p, %s", pState, path);
if (pState == NULL) { if (pState == NULL) {
@ -114,8 +115,8 @@ SStreamState* streamStateOpen(const char* path, void* pTask, bool specPath, int3
} }
SStreamTask* pStreamTask = pTask; SStreamTask* pStreamTask = pTask;
pState->taskId = pStreamTask->id.taskId; pState->streamId = streamId;
pState->streamId = pStreamTask->id.streamId; pState->taskId = taskId;
sprintf(pState->pTdbState->idstr, "0x%" PRIx64 "-0x%x", pState->streamId, pState->taskId); sprintf(pState->pTdbState->idstr, "0x%" PRIx64 "-0x%x", pState->streamId, pState->taskId);
streamTaskSetDb(pStreamTask->pMeta, pTask, pState->pTdbState->idstr); streamTaskSetDb(pStreamTask->pMeta, pTask, pState->pTdbState->idstr);