fix(stream): do dome internal refactor.

This commit is contained in:
Haojun Liao 2023-08-28 15:54:01 +08:00
parent 33672f2f35
commit 391103bc5a
7 changed files with 55 additions and 133 deletions

View File

@ -258,11 +258,16 @@ typedef struct SStreamChildEpInfo {
int64_t stage; // upstream task stage value, to denote if the upstream node has restart/replica changed/transfer
} SStreamChildEpInfo;
typedef struct SStreamId {
typedef struct SStreamTaskKey {
int64_t streamId;
int64_t taskId;
} SStreamTaskKey;
typedef struct SStreamTaskId {
int64_t streamId;
int32_t taskId;
const char* idStr;
} SStreamId;
} SStreamTaskId;
typedef struct SCheckpointInfo {
int64_t checkpointId;
@ -275,7 +280,6 @@ typedef struct SStreamStatus {
int8_t downstreamReady; // downstream tasks are all ready now, if this flag is set
int8_t schedStatus;
int8_t keepTaskStatus;
bool transferState;
bool appendTranstateBlock; // has append the transfer state data block already, todo: remove it
int8_t timerActive; // timer is active
int8_t pauseAllowed; // allowed task status to be set to be paused
@ -317,7 +321,7 @@ typedef struct {
struct SStreamTask {
int64_t ver;
SStreamId id;
SStreamTaskId id;
SSTaskBasicInfo info;
STaskOutputInfo outputInfo;
SDispatchMsgInfo msgInfo;
@ -325,8 +329,8 @@ struct SStreamTask {
SCheckpointInfo chkInfo;
STaskExec exec;
SHistDataRange dataRange;
SStreamId historyTaskId;
SStreamId streamTaskId;
SStreamTaskId historyTaskId;
SStreamTaskId streamTaskId;
int32_t nextCheckId;
SArray* checkpointInfo; // SArray<SStreamCheckpointInfo>
STaskTimestamp tsInfo;
@ -630,12 +634,8 @@ int32_t streamSetupScheduleTrigger(SStreamTask* pTask);
int32_t streamProcessRunReq(SStreamTask* pTask);
int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pMsg, bool exec);
int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, int32_t code);
void streamTaskCloseUpstreamInput(SStreamTask* pTask, int32_t taskId);
void streamTaskOpenAllUpstreamInput(SStreamTask* pTask);
int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pMsg);
void streamTaskOpenAllUpstreamInput(SStreamTask* pTask);
void streamTaskCloseUpstreamInput(SStreamTask* pTask, int32_t taskId);
SStreamChildEpInfo* streamTaskGetUpstreamTaskEpInfo(SStreamTask* pTask, int32_t taskId);
void streamTaskInputFail(SStreamTask* pTask);
@ -655,7 +655,6 @@ char* createStreamTaskIdStr(int64_t streamId, int32_t taskId);
void streamTaskCheckDownstreamTasks(SStreamTask* pTask);
int32_t streamTaskLaunchScanHistory(SStreamTask* pTask);
int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_t vgId, int64_t stage);
int32_t streamTaskRestart(SStreamTask* pTask, const char* pDir, bool startTask);
int32_t streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList);
void streamTaskResetUpstreamStageInfo(SStreamTask* pTask);
@ -683,6 +682,10 @@ int32_t streamTaskSetUpstreamInfo(SStreamTask* pTask, const SStreamTask* pUp
void streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet);
void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet);
void streamTaskSetFixedDownstreamInfo(SStreamTask* pTask, const SStreamTask* pDownstreamTask);
int32_t streamTaskReleaseState(SStreamTask* pTask);
int32_t streamTaskReloadState(SStreamTask* pTask);
void streamTaskCloseUpstreamInput(SStreamTask* pTask, int32_t taskId);
void streamTaskOpenAllUpstreamInput(SStreamTask* pTask);
// source level
int32_t streamSetParamForStreamScannerStep1(SStreamTask* pTask, SVersionRange* pVerRange, STimeWindow* pWindow);
@ -703,36 +706,28 @@ void streamMetaInit();
void streamMetaCleanup();
SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId, int64_t stage);
void streamMetaClose(SStreamMeta* streamMeta);
// save to stream meta store
int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask);
int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId);
int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask); // save to stream meta store
int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int64_t* pKey);
int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask, bool* pAdded);
int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId);
int32_t streamMetaGetNumOfTasks(SStreamMeta* pMeta); // todo remove it
int32_t streamMetaGetNumOfTasks(SStreamMeta* pMeta);
SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId);
void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask);
// int32_t streamStateRebuild(SStreamMeta* pMeta, char* path, int64_t chkpId);
int32_t streamMetaReopen(SStreamMeta* pMeta, int64_t chkpId);
int32_t streamMetaBegin(SStreamMeta* pMeta);
int32_t streamMetaCommit(SStreamMeta* pMeta);
int32_t streamLoadTasks(SStreamMeta* pMeta);
void streamMetaNotifyClose(SStreamMeta* pMeta);
// checkpoint
int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSourceReq* pReq);
int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask);
int32_t streamTaskReleaseState(SStreamTask* pTask);
int32_t streamTaskReloadState(SStreamTask* pTask);
int32_t streamAlignTransferState(SStreamTask* pTask);
int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SStreamTask* pTask,
int8_t isSucceed);
int32_t buildCheckpointSourceRsp(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SRpcMsg* pMsg,
int8_t isSucceed);
void streamMetaNotifyClose(SStreamMeta* pMeta);
#ifdef __cplusplus
}

View File

@ -1278,8 +1278,6 @@ int32_t tqProcessTaskTransferStateReq(STQ* pTq, SRpcMsg* pMsg) {
tqDebug("s-task:%s all upstream tasks send transfer msg, open transfer state flag", pTask->id.idStr);
ASSERT(pTask->streamTaskId.taskId != 0 && pTask->info.fillHistory == 1);
pTask->status.transferState = true;
streamSchedExec(pTask);
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
return 0;
@ -1803,7 +1801,7 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
// bool allStopped = true;
// int32_t numOfCount = streamMetaGetNumOfTasks(pMeta);
// for(int32_t i = 0; i < numOfCount; ++i) {
// SStreamId* pId = taosArrayGet(pMeta->pTaskList, i);
// SStreamTaskId* pId = taosArrayGet(pMeta->pTaskList, i);
//
// int64_t keys1[2] = {pId->streamId, pId->taskId};
// SStreamTask** p = taosHashGet(pMeta->pTasks, keys1, sizeof(keys1));

View File

@ -74,7 +74,7 @@ int32_t tqStreamTasksStatusCheck(STQ* pTq) {
// broadcast the check downstream tasks msg
for (int32_t i = 0; i < numOfTasks; ++i) {
SStreamId* pTaskId = taosArrayGet(pTaskList, i);
SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i);
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pTaskId->streamId, pTaskId->taskId);
if (pTask == NULL) {
continue;
@ -254,7 +254,7 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) {
numOfTasks = taosArrayGetSize(pTaskList);
for (int32_t i = 0; i < numOfTasks; ++i) {
SStreamId* pTaskId = taosArrayGet(pTaskList, i);
SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i);
SStreamTask* pTask = streamMetaAcquireTask(pStreamMeta, pTaskId->streamId, pTaskId->taskId);
if (pTask == NULL) {
continue;

View File

@ -267,7 +267,7 @@ int32_t streamSaveAllTaskStatus(SStreamMeta* pMeta, int64_t checkpointId) {
int64_t keys[2];
for (int32_t i = 0; i < taosArrayGetSize(pMeta->pTaskList); ++i) {
SStreamId* pId = taosArrayGet(pMeta->pTaskList, i);
SStreamTaskId* pId = taosArrayGet(pMeta->pTaskList, i);
keys[0] = pId->streamId;
keys[1] = pId->taskId;

View File

@ -499,51 +499,6 @@ int32_t streamProcessTranstateBlock(SStreamTask* pTask, SStreamDataBlock* pBlock
return code;
}
///**
// * todo: the batch of blocks should be tuned dynamic, according to the total elapsed time of each batch of blocks, the
// * appropriate batch of blocks should be handled in 5 to 10 sec.
// */
// int32_t streamExecForAll(SStreamTask* pTask) {
// const char* id = pTask->id.idStr;
//
// while (1) {
// int32_t batchSize = 0;
// SStreamQueueItem* pInput = NULL;
//
// // merge multiple input data if possible in the input queue.
// qDebug("s-task:%s start to extract data block from inputQ", id);
//
// /*int32_t code = */extractMsgFromInputQ(pTask, &pInput, &batchSize, id);
// if (pInput == NULL) {
// ASSERT(batchSize == 0);
// if (pTask->info.fillHistory && pTask->status.transferState) {
// int32_t code = streamTransferStateToStreamTask(pTask);
// if (code != TSDB_CODE_SUCCESS) { // todo handle this
// return 0;
// }
// }
//
// break;
// }
//
// if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
// ASSERT(pInput->type == STREAM_INPUT__DATA_BLOCK);
// qDebug("s-task:%s sink task start to sink %d blocks", id, batchSize);
// streamTaskOutputResultBlock(pTask, (SStreamDataBlock*)pInput);
// continue;
// }
//
// int64_t st = taosGetTimestampMs();
// qDebug("s-task:%s start to process batch of blocks, num:%d", id, batchSize);
//
//// {
// // set input
// const SStreamQueueItem* pItem = pInput;
// qDebug("s-task:%s start to process batch of blocks, num:%d, type:%d", id, numOfBlocks, pItem->type);
//
// int64_t ver = pTask->chkInfo.checkpointVer;
// doSetStreamInputBlock(pTask, pInput, &ver, id);
/**
* todo: the batch of blocks should be tuned dynamic, according to the total elapsed time of each batch of blocks, the
* appropriate batch of blocks should be handled in 5 to 10 sec.

View File

@ -23,6 +23,7 @@
#define META_HB_CHECK_INTERVAL 200
#define META_HB_SEND_IDLE_COUNTER 25 // send hb every 5 sec
#define STREAM_TASK_KEY_LEN ((sizeof(int64_t)) << 1)
static TdThreadOnce streamMetaModuleInit = PTHREAD_ONCE_INIT;
@ -39,8 +40,9 @@ SGStreamMetaMgt gStreamMetaMgt;
static int64_t streamGetLatestCheckpointId(SStreamMeta* pMeta);
static void metaHbToMnode(void* param, void* tmrId);
static void streamMetaClear(SStreamMeta* pMeta);
void streamMetaCloseImpl(void* arg);
static int32_t streamMetaBegin(SStreamMeta* pMeta);
static void streamMetaCloseImpl(void* arg);
static void extractStreamTaskKey(int64_t* pKey, const SStreamTask* pTask);
static void streamMetaEnvInit() {
streamBackendId = taosOpenRef(64, streamBackendCleanup);
@ -105,7 +107,7 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
goto _err;
}
if (tdbTbOpen("task.db", sizeof(int32_t), -1, NULL, pMeta->db, &pMeta->pTaskDb, 0) < 0) {
if (tdbTbOpen("task.db", STREAM_TASK_KEY_LEN, -1, NULL, pMeta->db, &pMeta->pTaskDb, 0) < 0) {
goto _err;
}
@ -124,7 +126,7 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
}
// task list
pMeta->pTaskList = taosArrayInit(4, sizeof(SStreamId));
pMeta->pTaskList = taosArrayInit(4, sizeof(SStreamTaskId));
if (pMeta->pTaskList == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
@ -311,7 +313,10 @@ int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask) {
tEncodeStreamTask(&encoder, pTask);
tEncoderClear(&encoder);
if (tdbTbUpsert(pMeta->pTaskDb, &pTask->id.taskId, sizeof(int32_t), buf, len, pMeta->txn) < 0) {
int64_t key[2] = {0};
extractStreamTaskKey(key, pTask);
if (tdbTbUpsert(pMeta->pTaskDb, key, STREAM_TASK_KEY_LEN, buf, len, pMeta->txn) < 0) {
qError("s-task:%s save to disk failed, code:%s", pTask->id.idStr, tstrerror(terrno));
return -1;
}
@ -320,12 +325,17 @@ int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask) {
return 0;
}
int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) {
int32_t code = tdbTbDelete(pMeta->pTaskDb, &taskId, sizeof(taskId), pMeta->txn);
void extractStreamTaskKey(int64_t* pKey, const SStreamTask* pTask) {
pKey[0] = pTask->id.streamId;
pKey[1] = pTask->id.taskId;
}
int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int64_t* pKey) {
int32_t code = tdbTbDelete(pMeta->pTaskDb, pKey, STREAM_TASK_KEY_LEN, pMeta->txn);
if (code != 0) {
qError("vgId:%d failed to remove task:0x%x from metastore, code:%s", pMeta->vgId, taskId, tstrerror(terrno));
qError("vgId:%d failed to remove task:0x%x from metastore, code:%s", pMeta->vgId, (int32_t)pKey[1], tstrerror(terrno));
} else {
qDebug("vgId:%d remove task:0x%x from metastore", pMeta->vgId, taskId);
qDebug("vgId:%d remove task:0x%x from metastore", pMeta->vgId, (int32_t)pKey[1]);
}
return code;
@ -400,9 +410,9 @@ void streamMetaReleaseTask(SStreamMeta* UNUSED_PARAM(pMeta), SStreamTask* pTask)
}
}
static void doRemoveIdFromList(SStreamMeta* pMeta, int32_t num, SStreamId* id) {
static void doRemoveIdFromList(SStreamMeta* pMeta, int32_t num, SStreamTaskId* id) {
for (int32_t i = 0; i < num; ++i) {
SStreamId* pTaskId = taosArrayGet(pMeta->pTaskList, i);
SStreamTaskId* pTaskId = taosArrayGet(pMeta->pTaskList, i);
if (pTaskId->streamId == id->streamId && pTaskId->taskId == id->taskId) {
taosArrayRemove(pMeta->pTaskList, i);
break;
@ -460,7 +470,7 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t
ASSERT(pTask->status.timerActive == 0);
doRemoveIdFromList(pMeta, (int32_t)taosArrayGetSize(pMeta->pTaskList), &pTask->id);
streamMetaRemoveTask(pMeta, taskId);
streamMetaRemoveTask(pMeta, keys);
streamMetaReleaseTask(pMeta, pTask);
} else {
qDebug("vgId:%d failed to find the task:0x%x, it may have been dropped already", pMeta->vgId, taskId);
@ -558,7 +568,7 @@ int32_t streamLoadTasks(SStreamMeta* pMeta) {
void* pVal = NULL;
int32_t vLen = 0;
SDecoder decoder;
SArray* pRecycleList = taosArrayInit(4, sizeof(int32_t));
SArray* pRecycleList = taosArrayInit(4, STREAM_TASK_KEY_LEN);
tdbTbcMoveToFirst(pCur);
while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) {
@ -585,7 +595,10 @@ int32_t streamLoadTasks(SStreamMeta* pMeta) {
int32_t taskId = pTask->id.taskId;
tFreeStreamTask(pTask);
taosArrayPush(pRecycleList, &taskId);
int64_t key[2] = {0};
extractStreamTaskKey(key, pTask);
taosArrayPush(pRecycleList, key);
int32_t total = taosArrayGetSize(pRecycleList);
qDebug("s-task:0x%x is already dropped, add into recycle list, total:%d", taskId, total);
continue;
@ -628,8 +641,8 @@ int32_t streamLoadTasks(SStreamMeta* pMeta) {
if (taosArrayGetSize(pRecycleList) > 0) {
for (int32_t i = 0; i < taosArrayGetSize(pRecycleList); ++i) {
int32_t taskId = *(int32_t*)taosArrayGet(pRecycleList, i);
streamMetaRemoveTask(pMeta, taskId);
int64_t* pId = taosArrayGet(pRecycleList, i);
streamMetaRemoveTask(pMeta, pId);
}
}
@ -715,7 +728,7 @@ void metaHbToMnode(void* param, void* tmrId) {
hbMsg.pTaskStatus = taosArrayInit(numOfTasks, sizeof(STaskStatusEntry));
for (int32_t i = 0; i < numOfTasks; ++i) {
SStreamId* pId = taosArrayGet(pMeta->pTaskList, i);
SStreamTaskId* pId = taosArrayGet(pMeta->pTaskList, i);
int64_t keys[2] = {pId->streamId, pId->taskId};
SStreamTask** pTask = taosHashGet(pMeta->pTasks, keys, sizeof(keys));

View File

@ -493,45 +493,6 @@ int32_t streamTaskStop(SStreamTask* pTask) {
return 0;
}
int32_t streamTaskRestart(SStreamTask* pTask, const char* pDir, bool startTask) {
const char* id = pTask->id.idStr;
int64_t stage = pTask->pMeta->stage;
int32_t vgId = pTask->pMeta->vgId;
qDebug("s-task:%s vgId:%d restart current task, stage:%" PRId64 ", status:%s, sched-status:%d", id, vgId, stage,
streamGetTaskStatusStr(pTask->status.taskStatus), pTask->status.schedStatus);
// 1. stop task
streamTaskStop(pTask);
// 2. clear state info
streamQueueCleanup(pTask->inputQueue);
streamQueueCleanup(pTask->outputInfo.queue);
taosArrayClear(pTask->checkReqIds);
taosArrayClear(pTask->pRspMsgList);
// reset the upstream task stage info
streamTaskResetUpstreamStageInfo(pTask);
pTask->status.downstreamReady = 0;
// todo: handle the case when the task is in fill-history (step 1) phase
streamSetStatusNormal(pTask);
taosWLockLatch(&pTask->pMeta->lock);
streamMetaSaveTask(pTask->pMeta, pTask);
streamMetaCommit(pTask->pMeta);
taosWUnLockLatch(&pTask->pMeta->lock);
qDebug("s-task:%s vgId:%d restart completed", pTask->id.idStr, vgId);
// 3. start to check the downstream status
if (startTask) {
streamTaskCheckDownstreamTasks(pTask);
}
return 0;
}
int32_t doUpdateTaskEpset(SStreamTask* pTask, int32_t nodeId, SEpSet* pEpSet) {
char buf[512] = {0};