refactor: do some internal refactor.

This commit is contained in:
Haojun Liao 2023-08-31 14:23:46 +08:00
parent 69f95f9324
commit e2bb64eb18
6 changed files with 28 additions and 28 deletions

View File

@ -248,7 +248,7 @@ typedef struct SStreamChildEpInfo {
typedef struct SStreamTaskKey { typedef struct SStreamTaskKey {
int64_t streamId; int64_t streamId;
int64_t taskId; int32_t taskId;
} SStreamTaskKey; } SStreamTaskKey;
typedef struct SStreamTaskId { typedef struct SStreamTaskId {
@ -273,10 +273,10 @@ typedef struct SStreamStatus {
int8_t pauseAllowed; // allowed task status to be set to be paused int8_t pauseAllowed; // allowed task status to be set to be paused
} SStreamStatus; } SStreamStatus;
typedef struct SHistDataRange { typedef struct SDataRange {
SVersionRange range; SVersionRange range;
STimeWindow window; STimeWindow window;
} SHistDataRange; } SDataRange;
typedef struct SSTaskBasicInfo { typedef struct SSTaskBasicInfo {
int32_t nodeId; // vgroup id or snode id int32_t nodeId; // vgroup id or snode id
@ -309,7 +309,6 @@ typedef struct STaskInputInfo {
typedef struct STaskSchedInfo { typedef struct STaskSchedInfo {
int8_t status; int8_t status;
// int64_t triggerParam;
void* pTimer; void* pTimer;
} STaskSchedInfo; } STaskSchedInfo;
@ -330,7 +329,7 @@ struct SStreamTask {
SStreamStatus status; SStreamStatus status;
SCheckpointInfo chkInfo; SCheckpointInfo chkInfo;
STaskExec exec; STaskExec exec;
SHistDataRange dataRange; SDataRange dataRange;
SStreamTaskId historyTaskId; SStreamTaskId historyTaskId;
SStreamTaskId streamTaskId; SStreamTaskId streamTaskId;
STaskTimestamp tsInfo; STaskTimestamp tsInfo;
@ -419,6 +418,7 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsg
int32_t tDecodeStreamTaskChkInfo(SDecoder* pDecoder, SCheckpointInfo* pChkpInfo); int32_t tDecodeStreamTaskChkInfo(SDecoder* pDecoder, SCheckpointInfo* pChkpInfo);
int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem); int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem);
int32_t streamTaskPutTranstateIntoInputQ(SStreamTask* pTask);
bool streamQueueIsFull(const STaosQueue* pQueue); bool streamQueueIsFull(const STaosQueue* pQueue);
typedef struct { typedef struct {
@ -681,8 +681,6 @@ int32_t streamSetParamForStreamScannerStep2(SStreamTask* pTask, SVersionRange* p
int32_t streamSourceScanHistoryData(SStreamTask* pTask); int32_t streamSourceScanHistoryData(SStreamTask* pTask);
int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask); int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask);
int32_t appendTranstateIntoInputQ(SStreamTask* pTask);
// agg level // agg level
int32_t streamTaskScanHistoryPrepare(SStreamTask* pTask); int32_t streamTaskScanHistoryPrepare(SStreamTask* pTask);
int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, SStreamScanHistoryFinishReq* pReq, int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, SStreamScanHistoryFinishReq* pReq,

View File

@ -64,7 +64,7 @@ set(
"src/tq/tqPush.c" "src/tq/tqPush.c"
"src/tq/tqSink.c" "src/tq/tqSink.c"
"src/tq/tqCommit.c" "src/tq/tqCommit.c"
"src/tq/tqRestore.c" "src/tq/tqStreamTask.c"
"src/tq/tqSnapshot.c" "src/tq/tqSnapshot.c"
"src/tq/tqOffsetSnapshot.c" "src/tq/tqOffsetSnapshot.c"
"src/tq/tqStreamStateSnap.c" "src/tq/tqStreamStateSnap.c"

View File

@ -163,7 +163,7 @@ int32_t tqOffsetRestoreFromFile(STqOffsetStore* pStore, const char* fname);
// tqStream // tqStream
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver); int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver);
int32_t tqScanWalForStreamTasks(STQ* pTq); int32_t tqScanWal(STQ* pTq);
int32_t tqCheckAndRunStreamTask(STQ* pTq); int32_t tqCheckAndRunStreamTask(STQ* pTq);
int32_t tqStopStreamTasks(STQ* pTq); int32_t tqStopStreamTasks(STQ* pTq);

View File

@ -1172,7 +1172,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
if (done) { if (done) {
pTask->tsInfo.step2Start = taosGetTimestampMs(); pTask->tsInfo.step2Start = taosGetTimestampMs();
qDebug("s-task:%s scan-history from WAL stage(step 2) ended, elapsed time:%.2fs", id, 0.0); qDebug("s-task:%s scan-history from WAL stage(step 2) ended, elapsed time:%.2fs", id, 0.0);
appendTranstateIntoInputQ(pTask); streamTaskPutTranstateIntoInputQ(pTask);
streamTryExec(pTask); // exec directly streamTryExec(pTask); // exec directly
} else { } else {
STimeWindow* pWindow = &pTask->dataRange.window; STimeWindow* pWindow = &pTask->dataRange.window;
@ -1346,7 +1346,7 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
} }
if (taskId == STREAM_EXEC_EXTRACT_DATA_IN_WAL_ID) { // all tasks are extracted submit data from the wal if (taskId == STREAM_EXEC_EXTRACT_DATA_IN_WAL_ID) { // all tasks are extracted submit data from the wal
tqScanWalForStreamTasks(pTq); tqScanWal(pTq);
return 0; return 0;
} }

View File

@ -16,11 +16,12 @@
#include "tq.h" #include "tq.h"
#include "vnd.h" #include "vnd.h"
static int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle); static int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle);
static int32_t doSetOffsetForWalReader(SStreamTask* pTask, int32_t vgId); static int32_t setWalReaderStartOffset(SStreamTask* pTask, int32_t vgId);
static void handleFillhistoryScanComplete(SStreamTask* pTask, int64_t ver);
// extract submit block from WAL, and add them into the input queue for the sources tasks. // extract data blocks(submit/delete) from WAL, and add them into the input queue for all the sources tasks.
int32_t tqScanWalForStreamTasks(STQ* pTq) { int32_t tqScanWal(STQ* pTq) {
int32_t vgId = TD_VID(pTq->pVnode); int32_t vgId = TD_VID(pTq->pVnode);
SStreamMeta* pMeta = pTq->pStreamMeta; SStreamMeta* pMeta = pTq->pStreamMeta;
int64_t st = taosGetTimestampMs(); int64_t st = taosGetTimestampMs();
@ -31,7 +32,7 @@ int32_t tqScanWalForStreamTasks(STQ* pTq) {
// check all tasks // check all tasks
bool shouldIdle = true; bool shouldIdle = true;
createStreamTaskRunReq(pTq->pStreamMeta, &shouldIdle); doScanWalForAllTasks(pTq->pStreamMeta, &shouldIdle);
int32_t times = 0; int32_t times = 0;
@ -140,7 +141,7 @@ int32_t tqScanWalAsync(STQ* pTq, bool ckPause) {
int32_t vgId = TD_VID(pTq->pVnode); int32_t vgId = TD_VID(pTq->pVnode);
SStreamMeta* pMeta = pTq->pStreamMeta; SStreamMeta* pMeta = pTq->pStreamMeta;
// for follower or vnode does not restored, do not launch the stream tasks. // do not launch the stream tasks, if it is a follower or not restored vnode.
if (!(vnodeIsRoleLeader(pTq->pVnode) && pTq->pVnode->restored)) { if (!(vnodeIsRoleLeader(pTq->pVnode) && pTq->pVnode->restored)) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -223,7 +224,7 @@ int32_t tqStopStreamTasks(STQ* pTq) {
return 0; return 0;
} }
int32_t doSetOffsetForWalReader(SStreamTask* pTask, int32_t vgId) { int32_t setWalReaderStartOffset(SStreamTask* pTask, int32_t vgId) {
// seek the stored version and extract data from WAL // seek the stored version and extract data from WAL
int64_t firstVer = walReaderGetValidFirstVer(pTask->exec.pWalReader); int64_t firstVer = walReaderGetValidFirstVer(pTask->exec.pWalReader);
if (pTask->chkInfo.currentVer < firstVer) { if (pTask->chkInfo.currentVer < firstVer) {
@ -267,7 +268,8 @@ int32_t doSetOffsetForWalReader(SStreamTask* pTask, int32_t vgId) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static void checkForFillHistoryVerRange(SStreamTask* pTask, int64_t ver) { // todo handle memory error
void handleFillhistoryScanComplete(SStreamTask* pTask, int64_t ver) {
const char* id = pTask->id.idStr; const char* id = pTask->id.idStr;
int64_t maxVer = pTask->dataRange.range.maxVer; int64_t maxVer = pTask->dataRange.range.maxVer;
@ -279,7 +281,7 @@ static void checkForFillHistoryVerRange(SStreamTask* pTask, int64_t ver) {
double el = (taosGetTimestampMs() - pTask->tsInfo.step2Start) / 1000.0; double el = (taosGetTimestampMs() - pTask->tsInfo.step2Start) / 1000.0;
qDebug("s-task:%s scan-history from WAL stage(step 2) ended, elapsed time:%.2fs", id, el); qDebug("s-task:%s scan-history from WAL stage(step 2) ended, elapsed time:%.2fs", id, el);
appendTranstateIntoInputQ(pTask); /*int32_t code = */streamTaskPutTranstateIntoInputQ(pTask);
/*int32_t code = */ streamSchedExec(pTask); /*int32_t code = */ streamSchedExec(pTask);
} else { } else {
qWarn("s-task:%s fill-history scan WAL, currentVer:%" PRId64 " reach the maximum ver:%" PRId64 ", not scan wal", qWarn("s-task:%s fill-history scan WAL, currentVer:%" PRId64 " reach the maximum ver:%" PRId64 ", not scan wal",
@ -288,7 +290,7 @@ static void checkForFillHistoryVerRange(SStreamTask* pTask, int64_t ver) {
} }
} }
int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) {
*pScanIdle = true; *pScanIdle = true;
bool noDataInWal = true; bool noDataInWal = true;
int32_t vgId = pStreamMeta->vgId; int32_t vgId = pStreamMeta->vgId;
@ -356,7 +358,7 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) {
*pScanIdle = false; *pScanIdle = false;
// seek the stored version and extract data from WAL // seek the stored version and extract data from WAL
int32_t code = doSetOffsetForWalReader(pTask, vgId); int32_t code = setWalReaderStartOffset(pTask, vgId);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
streamMetaReleaseTask(pStreamMeta, pTask); streamMetaReleaseTask(pStreamMeta, pTask);
continue; continue;
@ -369,7 +371,7 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) {
code = extractMsgFromWal(pTask->exec.pWalReader, (void**)&pItem, maxVer, pTask->id.idStr); code = extractMsgFromWal(pTask->exec.pWalReader, (void**)&pItem, maxVer, pTask->id.idStr);
if ((code != TSDB_CODE_SUCCESS || pItem == NULL) && (numOfItems == 0)) { // failed, continue if ((code != TSDB_CODE_SUCCESS || pItem == NULL) && (numOfItems == 0)) { // failed, continue
checkForFillHistoryVerRange(pTask, walReaderGetCurrentVer(pTask->exec.pWalReader)); handleFillhistoryScanComplete(pTask, walReaderGetCurrentVer(pTask->exec.pWalReader));
streamMetaReleaseTask(pStreamMeta, pTask); streamMetaReleaseTask(pStreamMeta, pTask);
continue; continue;
} }
@ -390,7 +392,7 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) {
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
int64_t ver = walReaderGetCurrentVer(pTask->exec.pWalReader); int64_t ver = walReaderGetCurrentVer(pTask->exec.pWalReader);
pTask->chkInfo.currentVer = ver; pTask->chkInfo.currentVer = ver;
checkForFillHistoryVerRange(pTask, ver); handleFillhistoryScanComplete(pTask, ver);
tqDebug("s-task:%s set the ver:%" PRId64 " from WALReader after extract block from WAL", pTask->id.idStr, ver); tqDebug("s-task:%s set the ver:%" PRId64 " from WALReader after extract block from WAL", pTask->id.idStr, ver);
} else { } else {
tqError("s-task:%s append input queue failed, too many in inputQ, ver:%" PRId64, pTask->id.idStr, tqError("s-task:%s append input queue failed, too many in inputQ, ver:%" PRId64, pTask->id.idStr,

View File

@ -108,7 +108,7 @@ int32_t streamTaskLaunchScanHistory(SStreamTask* pTask) {
// check status // check status
static int32_t doCheckDownstreamStatus(SStreamTask* pTask) { static int32_t doCheckDownstreamStatus(SStreamTask* pTask) {
SHistDataRange* pRange = &pTask->dataRange; SDataRange* pRange = &pTask->dataRange;
STimeWindow* pWindow = &pRange->window; STimeWindow* pWindow = &pRange->window;
SStreamTaskCheckReq req = { SStreamTaskCheckReq req = {
@ -365,7 +365,7 @@ int32_t streamSourceScanHistoryData(SStreamTask* pTask) {
return streamScanExec(pTask, 100); return streamScanExec(pTask, 100);
} }
int32_t appendTranstateIntoInputQ(SStreamTask* pTask) { int32_t streamTaskPutTranstateIntoInputQ(SStreamTask* pTask) {
SStreamDataBlock* pTranstate = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, sizeof(SSDataBlock)); SStreamDataBlock* pTranstate = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, sizeof(SSDataBlock));
if (pTranstate == NULL) { if (pTranstate == NULL) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
@ -764,7 +764,7 @@ int32_t tDecodeStreamScanHistoryFinishReq(SDecoder* pDecoder, SStreamScanHistory
void streamTaskSetRangeStreamCalc(SStreamTask* pTask) { void streamTaskSetRangeStreamCalc(SStreamTask* pTask) {
if (pTask->historyTaskId.taskId == 0) { if (pTask->historyTaskId.taskId == 0) {
SHistDataRange* pRange = &pTask->dataRange; SDataRange* pRange = &pTask->dataRange;
if (pTask->info.fillHistory == 1) { if (pTask->info.fillHistory == 1) {
qDebug("s-task:%s fill-history task, time window:%" PRId64 "-%" PRId64 ", verRange:%" PRId64 qDebug("s-task:%s fill-history task, time window:%" PRId64 "-%" PRId64 ", verRange:%" PRId64
"-%" PRId64, "-%" PRId64,
@ -775,7 +775,7 @@ void streamTaskSetRangeStreamCalc(SStreamTask* pTask) {
pTask->id.idStr, pRange->window.skey, pRange->window.ekey, pRange->range.minVer, pRange->range.maxVer); pTask->id.idStr, pRange->window.skey, pRange->window.ekey, pRange->range.minVer, pRange->range.maxVer);
} }
} else { } else {
SHistDataRange* pRange = &pTask->dataRange; SDataRange* pRange = &pTask->dataRange;
int64_t ekey = 0; int64_t ekey = 0;
if (pRange->window.ekey < INT64_MAX) { if (pRange->window.ekey < INT64_MAX) {