Merge pull request #26169 from taosdata/fix/3_liaohj
fix(stream): not revise the stream id for fill-history task.
This commit is contained in:
commit
c3ae66c95c
|
@ -410,7 +410,7 @@ typedef struct SStateStore {
|
||||||
void (*streamFileStateClear)(struct SStreamFileState* pFileState);
|
void (*streamFileStateClear)(struct SStreamFileState* pFileState);
|
||||||
bool (*needClearDiskBuff)(struct SStreamFileState* pFileState);
|
bool (*needClearDiskBuff)(struct SStreamFileState* pFileState);
|
||||||
|
|
||||||
SStreamState* (*streamStateOpen)(const char* path, void* pTask, bool specPath, int32_t szPage, int32_t pages);
|
SStreamState* (*streamStateOpen)(const char* path, void* pTask, int64_t streamId, int32_t taskId, bool specPath, int32_t szPage, int32_t pages);
|
||||||
void (*streamStateClose)(SStreamState* pState, bool remove);
|
void (*streamStateClose)(SStreamState* pState, bool remove);
|
||||||
int32_t (*streamStateBegin)(SStreamState* pState);
|
int32_t (*streamStateBegin)(SStreamState* pState);
|
||||||
int32_t (*streamStateCommit)(SStreamState* pState);
|
int32_t (*streamStateCommit)(SStreamState* pState);
|
||||||
|
|
|
@ -29,7 +29,8 @@ extern "C" {
|
||||||
|
|
||||||
#include "storageapi.h"
|
#include "storageapi.h"
|
||||||
|
|
||||||
SStreamState* streamStateOpen(const char* path, void* pTask, bool specPath, int32_t szPage, int32_t pages);
|
SStreamState* streamStateOpen(const char* path, void* pTask, int64_t streamId, int32_t taskId, bool specPath,
|
||||||
|
int32_t szPage, int32_t pages);
|
||||||
void streamStateClose(SStreamState* pState, bool remove);
|
void streamStateClose(SStreamState* pState, bool remove);
|
||||||
int32_t streamStateBegin(SStreamState* pState);
|
int32_t streamStateBegin(SStreamState* pState);
|
||||||
int32_t streamStateCommit(SStreamState* pState);
|
int32_t streamStateCommit(SStreamState* pState);
|
||||||
|
|
|
@ -25,12 +25,6 @@
|
||||||
#define sndDebug(...) do { if (sndDebugFlag & DEBUG_DEBUG) { taosPrintLog("SND ", DEBUG_DEBUG, sndDebugFlag, __VA_ARGS__);}} while (0)
|
#define sndDebug(...) do { if (sndDebugFlag & DEBUG_DEBUG) { taosPrintLog("SND ", DEBUG_DEBUG, sndDebugFlag, __VA_ARGS__);}} while (0)
|
||||||
// clang-format on
|
// clang-format on
|
||||||
|
|
||||||
static void restoreStreamTaskId(SStreamTask *pTask, STaskId *pId) {
|
|
||||||
ASSERT(pTask->info.fillHistory);
|
|
||||||
pTask->id.taskId = pId->taskId;
|
|
||||||
pTask->id.streamId = pId->streamId;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t nextProcessVer) {
|
int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t nextProcessVer) {
|
||||||
ASSERT(pTask->info.taskLevel == TASK_LEVEL__AGG && taosArrayGetSize(pTask->upstreamInfo.pList) != 0);
|
ASSERT(pTask->info.taskLevel == TASK_LEVEL__AGG && taosArrayGetSize(pTask->upstreamInfo.pList) != 0);
|
||||||
int32_t code = streamTaskInit(pTask, pSnode->pMeta, &pSnode->msgCb, nextProcessVer);
|
int32_t code = streamTaskInit(pTask, pSnode->pMeta, &pSnode->msgCb, nextProcessVer);
|
||||||
|
|
|
@ -299,7 +299,7 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat
|
||||||
tdRSmaTaskInit(pStreamTask->pMeta, pItem, &pStreamTask->id);
|
tdRSmaTaskInit(pStreamTask->pMeta, pItem, &pStreamTask->id);
|
||||||
pStreamTask->status.pSM = streamCreateStateMachine(pStreamTask);
|
pStreamTask->status.pSM = streamCreateStateMachine(pStreamTask);
|
||||||
pStreamTask->chkInfo.pActiveInfo = streamTaskCreateActiveChkptInfo();
|
pStreamTask->chkInfo.pActiveInfo = streamTaskCreateActiveChkptInfo();
|
||||||
pStreamState = streamStateOpen(taskInfDir, pStreamTask, true, -1, -1);
|
pStreamState = streamStateOpen(taskInfDir, pStreamTask, pStreamTask->id.streamId, pStreamTask->id.taskId, true, -1, -1);
|
||||||
if (!pStreamState) {
|
if (!pStreamState) {
|
||||||
terrno = TSDB_CODE_RSMA_STREAM_STATE_OPEN;
|
terrno = TSDB_CODE_RSMA_STREAM_STATE_OPEN;
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
|
|
|
@ -605,14 +605,15 @@ int32_t tqGetStreamExecInfo(SVnode* pVnode, int64_t streamId, int64_t* pDelay, b
|
||||||
numOfTasks = taosArrayGetSize(pMeta->pTaskList);
|
numOfTasks = taosArrayGetSize(pMeta->pTaskList);
|
||||||
|
|
||||||
for (int32_t i = 0; i < numOfTasks; ++i) {
|
for (int32_t i = 0; i < numOfTasks; ++i) {
|
||||||
STaskId* pId = taosArrayGet(pMeta->pTaskList, i);
|
SStreamTaskId* pId = taosArrayGet(pMeta->pTaskList, i);
|
||||||
if (pId->streamId != streamId) {
|
if (pId->streamId != streamId) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
SStreamTask** ppTask = taosHashGet(pMeta->pTasksMap, pId, sizeof(*pId));
|
STaskId id = {.streamId = pId->streamId, .taskId = pId->taskId};
|
||||||
|
SStreamTask** ppTask = taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
|
||||||
if (ppTask == NULL) {
|
if (ppTask == NULL) {
|
||||||
tqError("vgId:%d failed to acquire task:0x%" PRIx64 " in retrieving progress", pMeta->vgId, pId->taskId);
|
tqError("vgId:%d failed to acquire task:0x%x in retrieving progress", pMeta->vgId, pId->taskId);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -30,37 +30,26 @@ typedef struct SMStreamCheckpointReadyRspMsg {
|
||||||
|
|
||||||
static int32_t doProcessDummyRspMsg(SStreamMeta* pMeta, SRpcMsg* pMsg);
|
static int32_t doProcessDummyRspMsg(SStreamMeta* pMeta, SRpcMsg* pMsg);
|
||||||
|
|
||||||
static STaskId replaceStreamTaskId(SStreamTask* pTask) {
|
|
||||||
ASSERT(pTask->info.fillHistory);
|
|
||||||
STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
|
|
||||||
|
|
||||||
pTask->id.streamId = pTask->streamTaskId.streamId;
|
|
||||||
pTask->id.taskId = pTask->streamTaskId.taskId;
|
|
||||||
|
|
||||||
return id;
|
|
||||||
}
|
|
||||||
|
|
||||||
static void restoreStreamTaskId(SStreamTask* pTask, STaskId* pId) {
|
|
||||||
ASSERT(pTask->info.fillHistory);
|
|
||||||
pTask->id.taskId = pId->taskId;
|
|
||||||
pTask->id.streamId = pId->streamId;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t tqExpandStreamTask(SStreamTask* pTask) {
|
int32_t tqExpandStreamTask(SStreamTask* pTask) {
|
||||||
SStreamMeta* pMeta = pTask->pMeta;
|
SStreamMeta* pMeta = pTask->pMeta;
|
||||||
int32_t vgId = pMeta->vgId;
|
int32_t vgId = pMeta->vgId;
|
||||||
STaskId taskId = {0};
|
|
||||||
int64_t st = taosGetTimestampMs();
|
int64_t st = taosGetTimestampMs();
|
||||||
|
int64_t streamId = 0;
|
||||||
|
int32_t taskId = 0;
|
||||||
|
|
||||||
tqDebug("s-task:%s vgId:%d start to expand stream task", pTask->id.idStr, vgId);
|
tqDebug("s-task:%s vgId:%d start to expand stream task", pTask->id.idStr, vgId);
|
||||||
|
|
||||||
if (pTask->info.fillHistory) {
|
if (pTask->info.fillHistory) {
|
||||||
taskId = replaceStreamTaskId(pTask);
|
streamId = pTask->streamTaskId.streamId;
|
||||||
|
taskId = pTask->streamTaskId.taskId;
|
||||||
|
} else {
|
||||||
|
streamId = pTask->id.streamId;
|
||||||
|
taskId = pTask->id.taskId;
|
||||||
}
|
}
|
||||||
|
|
||||||
// sink task does not need the pState
|
// sink task does not need the pState
|
||||||
if (pTask->info.taskLevel != TASK_LEVEL__SINK) {
|
if (pTask->info.taskLevel != TASK_LEVEL__SINK) {
|
||||||
pTask->pState = streamStateOpen(pMeta->path, pTask, false, -1, -1);
|
pTask->pState = streamStateOpen(pMeta->path, pTask, false, streamId, taskId, -1, -1);
|
||||||
if (pTask->pState == NULL) {
|
if (pTask->pState == NULL) {
|
||||||
tqError("s-task:%s (vgId:%d) failed to open state for task, expand task failed", pTask->id.idStr, vgId);
|
tqError("s-task:%s (vgId:%d) failed to open state for task, expand task failed", pTask->id.idStr, vgId);
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -69,10 +58,6 @@ int32_t tqExpandStreamTask(SStreamTask* pTask) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pTask->info.fillHistory) {
|
|
||||||
restoreStreamTaskId(pTask, &taskId);
|
|
||||||
}
|
|
||||||
|
|
||||||
SReadHandle handle = {
|
SReadHandle handle = {
|
||||||
.checkpointId = pTask->chkInfo.checkpointId,
|
.checkpointId = pTask->chkInfo.checkpointId,
|
||||||
.pStateBackend = pTask->pState,
|
.pStateBackend = pTask->pState,
|
||||||
|
@ -635,8 +620,8 @@ int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen
|
||||||
|
|
||||||
// drop the related fill-history task firstly
|
// drop the related fill-history task firstly
|
||||||
if (hTaskId.taskId != 0 && hTaskId.streamId != 0) {
|
if (hTaskId.taskId != 0 && hTaskId.streamId != 0) {
|
||||||
streamMetaUnregisterTask(pMeta, hTaskId.streamId, hTaskId.taskId);
|
|
||||||
tqDebug("s-task:0x%x vgId:%d drop rel fill-history task:0x%x firstly", pReq->taskId, vgId, (int32_t)hTaskId.taskId);
|
tqDebug("s-task:0x%x vgId:%d drop rel fill-history task:0x%x firstly", pReq->taskId, vgId, (int32_t)hTaskId.taskId);
|
||||||
|
streamMetaUnregisterTask(pMeta, hTaskId.streamId, hTaskId.taskId);
|
||||||
}
|
}
|
||||||
|
|
||||||
// drop the stream task now
|
// drop the stream task now
|
||||||
|
|
|
@ -608,6 +608,7 @@ int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTa
|
||||||
}
|
}
|
||||||
|
|
||||||
taosArrayPush(pMeta->pTaskList, &pTask->id);
|
taosArrayPush(pMeta->pTaskList, &pTask->id);
|
||||||
|
taosHashPut(pMeta->pTasksMap, &id, sizeof(id), &pTask, POINTER_BYTES);
|
||||||
|
|
||||||
if (streamMetaSaveTask(pMeta, pTask) < 0) {
|
if (streamMetaSaveTask(pMeta, pTask) < 0) {
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -617,7 +618,6 @@ int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTa
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
taosHashPut(pMeta->pTasksMap, &id, sizeof(id), &pTask, POINTER_BYTES);
|
|
||||||
if (pTask->info.fillHistory == 0) {
|
if (pTask->info.fillHistory == 0) {
|
||||||
atomic_add_fetch_32(&pMeta->numOfStreamTasks, 1);
|
atomic_add_fetch_32(&pMeta->numOfStreamTasks, 1);
|
||||||
}
|
}
|
||||||
|
@ -672,14 +672,17 @@ void streamMetaReleaseTask(SStreamMeta* UNUSED_PARAM(pMeta), SStreamTask* pTask)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static void doRemoveIdFromList(SStreamMeta* pMeta, int32_t num, SStreamTaskId* id) {
|
static void doRemoveIdFromList(SArray* pTaskList, int32_t num, SStreamTaskId* id) {
|
||||||
|
bool remove = false;
|
||||||
for (int32_t i = 0; i < num; ++i) {
|
for (int32_t i = 0; i < num; ++i) {
|
||||||
SStreamTaskId* pTaskId = taosArrayGet(pMeta->pTaskList, i);
|
SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i);
|
||||||
if (pTaskId->streamId == id->streamId && pTaskId->taskId == id->taskId) {
|
if (pTaskId->streamId == id->streamId && pTaskId->taskId == id->taskId) {
|
||||||
taosArrayRemove(pMeta->pTaskList, i);
|
taosArrayRemove(pTaskList, i);
|
||||||
|
remove = true;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
ASSERT(remove);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t streamTaskSendTransSuccessMsg(SStreamTask* pTask, void* param) {
|
static int32_t streamTaskSendTransSuccessMsg(SStreamTask* pTask, void* param) {
|
||||||
|
@ -715,7 +718,7 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t
|
||||||
}
|
}
|
||||||
streamMetaWUnLock(pMeta);
|
streamMetaWUnLock(pMeta);
|
||||||
|
|
||||||
stDebug("s-task:0x%x set task status:dropping and start to unregister it", taskId);
|
stDebug("s-task:0x%x vgId:%d set task status:dropping and start to unregister it", taskId, pMeta->vgId);
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
streamMetaRLock(pMeta);
|
streamMetaRLock(pMeta);
|
||||||
|
@ -742,18 +745,19 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t
|
||||||
ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
|
ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
|
||||||
if (ppTask) {
|
if (ppTask) {
|
||||||
pTask = *ppTask;
|
pTask = *ppTask;
|
||||||
|
|
||||||
// it is an fill-history task, remove the related stream task's id that points to it
|
// it is an fill-history task, remove the related stream task's id that points to it
|
||||||
|
if (pTask->info.fillHistory == 0) {
|
||||||
atomic_sub_fetch_32(&pMeta->numOfStreamTasks, 1);
|
atomic_sub_fetch_32(&pMeta->numOfStreamTasks, 1);
|
||||||
|
}
|
||||||
|
|
||||||
taosHashRemove(pMeta->pTasksMap, &id, sizeof(id));
|
taosHashRemove(pMeta->pTasksMap, &id, sizeof(id));
|
||||||
doRemoveIdFromList(pMeta, (int32_t)taosArrayGetSize(pMeta->pTaskList), &pTask->id);
|
doRemoveIdFromList(pMeta->pTaskList, (int32_t)taosArrayGetSize(pMeta->pTaskList), &pTask->id);
|
||||||
streamMetaRemoveTask(pMeta, &id);
|
streamMetaRemoveTask(pMeta, &id);
|
||||||
|
|
||||||
|
ASSERT(taosHashGetSize(pMeta->pTasksMap) == taosArrayGetSize(pMeta->pTaskList));
|
||||||
streamMetaWUnLock(pMeta);
|
streamMetaWUnLock(pMeta);
|
||||||
|
|
||||||
ASSERT(pTask->status.timerActive == 0);
|
ASSERT(pTask->status.timerActive == 0);
|
||||||
|
|
||||||
if (pTask->info.delaySchedParam != 0 && pTask->info.fillHistory == 0) {
|
if (pTask->info.delaySchedParam != 0 && pTask->info.fillHistory == 0) {
|
||||||
stDebug("s-task:%s stop schedTimer, and (before) desc ref:%d", pTask->id.idStr, pTask->refCnt);
|
stDebug("s-task:%s stop schedTimer, and (before) desc ref:%d", pTask->id.idStr, pTask->refCnt);
|
||||||
taosTmrStop(pTask->schedInfo.pDelayTimer);
|
taosTmrStop(pTask->schedInfo.pDelayTimer);
|
||||||
|
@ -1007,9 +1011,10 @@ static int32_t metaHeartbeatToMnodeImpl(SStreamMeta* pMeta) {
|
||||||
hbMsg.pUpdateNodes = taosArrayInit(numOfTasks, sizeof(int32_t));
|
hbMsg.pUpdateNodes = taosArrayInit(numOfTasks, sizeof(int32_t));
|
||||||
|
|
||||||
for (int32_t i = 0; i < numOfTasks; ++i) {
|
for (int32_t i = 0; i < numOfTasks; ++i) {
|
||||||
STaskId* pId = taosArrayGet(pMeta->pTaskList, i);
|
SStreamTaskId* pId = taosArrayGet(pMeta->pTaskList, i);
|
||||||
|
|
||||||
SStreamTask** pTask = taosHashGet(pMeta->pTasksMap, pId, sizeof(*pId));
|
STaskId id = {.streamId = pId->streamId, .taskId = pId->taskId};
|
||||||
|
SStreamTask** pTask = taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
|
||||||
if (pTask == NULL) {
|
if (pTask == NULL) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
@ -1020,7 +1025,7 @@ static int32_t metaHeartbeatToMnodeImpl(SStreamMeta* pMeta) {
|
||||||
}
|
}
|
||||||
|
|
||||||
STaskStatusEntry entry = {
|
STaskStatusEntry entry = {
|
||||||
.id = *pId,
|
.id = id,
|
||||||
.status = streamTaskGetStatus(*pTask)->state,
|
.status = streamTaskGetStatus(*pTask)->state,
|
||||||
.nodeId = hbMsg.vgId,
|
.nodeId = hbMsg.vgId,
|
||||||
.stage = pMeta->stage,
|
.stage = pMeta->stage,
|
||||||
|
@ -1508,8 +1513,9 @@ int32_t streamMetaStopAllTasks(SStreamMeta* pMeta) {
|
||||||
bool streamMetaAllTasksReady(const SStreamMeta* pMeta) {
|
bool streamMetaAllTasksReady(const SStreamMeta* pMeta) {
|
||||||
int32_t num = taosArrayGetSize(pMeta->pTaskList);
|
int32_t num = taosArrayGetSize(pMeta->pTaskList);
|
||||||
for (int32_t i = 0; i < num; ++i) {
|
for (int32_t i = 0; i < num; ++i) {
|
||||||
STaskId* pTaskId = taosArrayGet(pMeta->pTaskList, i);
|
SStreamTaskId* pId = taosArrayGet(pMeta->pTaskList, i);
|
||||||
SStreamTask** ppTask = taosHashGet(pMeta->pTasksMap, pTaskId, sizeof(*pTaskId));
|
STaskId id = {.streamId = pId->streamId, .taskId = pId->taskId};
|
||||||
|
SStreamTask** ppTask = taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
|
||||||
if (ppTask == NULL) {
|
if (ppTask == NULL) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
|
@ -98,7 +98,8 @@ int stateKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, int kLen2) {
|
||||||
return winKeyCmprImpl(&pWin1->key, &pWin2->key);
|
return winKeyCmprImpl(&pWin1->key, &pWin2->key);
|
||||||
}
|
}
|
||||||
|
|
||||||
SStreamState* streamStateOpen(const char* path, void* pTask, bool specPath, int32_t szPage, int32_t pages) {
|
SStreamState* streamStateOpen(const char* path, void* pTask, int64_t streamId, int32_t taskId, bool specPath,
|
||||||
|
int32_t szPage, int32_t pages) {
|
||||||
SStreamState* pState = taosMemoryCalloc(1, sizeof(SStreamState));
|
SStreamState* pState = taosMemoryCalloc(1, sizeof(SStreamState));
|
||||||
stDebug("open stream state %p, %s", pState, path);
|
stDebug("open stream state %p, %s", pState, path);
|
||||||
if (pState == NULL) {
|
if (pState == NULL) {
|
||||||
|
@ -114,8 +115,8 @@ SStreamState* streamStateOpen(const char* path, void* pTask, bool specPath, int3
|
||||||
}
|
}
|
||||||
|
|
||||||
SStreamTask* pStreamTask = pTask;
|
SStreamTask* pStreamTask = pTask;
|
||||||
pState->taskId = pStreamTask->id.taskId;
|
pState->streamId = streamId;
|
||||||
pState->streamId = pStreamTask->id.streamId;
|
pState->taskId = taskId;
|
||||||
sprintf(pState->pTdbState->idstr, "0x%" PRIx64 "-0x%x", pState->streamId, pState->taskId);
|
sprintf(pState->pTdbState->idstr, "0x%" PRIx64 "-0x%x", pState->streamId, pState->taskId);
|
||||||
|
|
||||||
streamTaskSetDb(pStreamTask->pMeta, pTask, pState->pTdbState->idstr);
|
streamTaskSetDb(pStreamTask->pMeta, pTask, pState->pTdbState->idstr);
|
||||||
|
|
|
@ -46,7 +46,7 @@ SStreamState *stateCreate(const char *path) {
|
||||||
SStreamMeta *pMeta = streamMetaOpen((path), NULL, NULL, NULL, 0, 0, NULL);
|
SStreamMeta *pMeta = streamMetaOpen((path), NULL, NULL, NULL, 0, 0, NULL);
|
||||||
pTask->pMeta = pMeta;
|
pTask->pMeta = pMeta;
|
||||||
|
|
||||||
SStreamState *p = streamStateOpen((char *)path, pTask, true, 32, 32 * 1024);
|
SStreamState *p = streamStateOpen((char *)path, pTask, 0, 0, true, 32, 32 * 1024);
|
||||||
ASSERT(p != NULL);
|
ASSERT(p != NULL);
|
||||||
return p;
|
return p;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue