Merge pull request #22665 from taosdata/fix/3_liaohj

refactor: do some internal refactor.
This commit is contained in:
Haojun Liao 2023-08-31 15:45:42 +08:00 committed by GitHub
commit 8d48fd171f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 142 additions and 209 deletions

View File

@ -50,7 +50,6 @@ enum {
TASK_STATUS__HALT, // pause, but not be manipulated by user command
TASK_STATUS__PAUSE, // pause
TASK_STATUS__CK, // stream task is in checkpoint status, no data are allowed to put into inputQ anymore
TASK_STATUS__CK_READY,
};
enum {
@ -179,7 +178,7 @@ SStreamQueueRes streamQueueGetRes(SStreamQueue1* pQueue);
#endif
typedef struct {
STaosQueue* queue;
STaosQueue* pQueue;
STaosQall* qall;
void* qItem;
int8_t status;
@ -190,19 +189,9 @@ void streamCleanUp();
SStreamQueue* streamQueueOpen(int64_t cap);
void streamQueueClose(SStreamQueue* pQueue, int32_t taskId);
static FORCE_INLINE void streamQueueProcessSuccess(SStreamQueue* queue) {
ASSERT(atomic_load_8(&queue->status) == STREAM_QUEUE__PROCESSING);
queue->qItem = NULL;
atomic_store_8(&queue->status, STREAM_QUEUE__SUCESS);
}
static FORCE_INLINE void streamQueueProcessFail(SStreamQueue* queue) {
ASSERT(atomic_load_8(&queue->status) == STREAM_QUEUE__PROCESSING);
atomic_store_8(&queue->status, STREAM_QUEUE__FAILED);
}
void* streamQueueNextItem(SStreamQueue* pQueue);
void streamQueueProcessSuccess(SStreamQueue* queue);
void streamQueueProcessFail(SStreamQueue* queue);
void* streamQueueNextItem(SStreamQueue* pQueue);
SStreamDataSubmit* streamDataSubmitNew(SPackedData* pData, int32_t type);
void streamDataSubmitDestroy(SStreamDataSubmit* pDataSubmit);
@ -259,7 +248,7 @@ typedef struct SStreamChildEpInfo {
typedef struct SStreamTaskKey {
int64_t streamId;
int64_t taskId;
int32_t taskId;
} SStreamTaskKey;
typedef struct SStreamTaskId {
@ -284,10 +273,10 @@ typedef struct SStreamStatus {
int8_t pauseAllowed; // allowed task status to be set to be paused
} SStreamStatus;
typedef struct SHistDataRange {
typedef struct SDataRange {
SVersionRange range;
STimeWindow window;
} SHistDataRange;
} SDataRange;
typedef struct SSTaskBasicInfo {
int32_t nodeId; // vgroup id or snode id
@ -297,6 +286,7 @@ typedef struct SSTaskBasicInfo {
int32_t totalLevel;
int8_t taskLevel;
int8_t fillHistory; // is fill history task or not
int64_t triggerParam; // in msec
} SSTaskBasicInfo;
typedef struct SDispatchMsgInfo {
@ -306,12 +296,22 @@ typedef struct SDispatchMsgInfo {
int64_t blockingTs; // output blocking timestamp
} SDispatchMsgInfo;
typedef struct {
typedef struct STaskOutputInfo {
int8_t type;
int8_t status;
SStreamQueue* queue;
} STaskOutputInfo;
typedef struct STaskInputInfo {
int8_t status;
SStreamQueue* queue;
} STaskInputInfo;
typedef struct STaskSchedInfo {
int8_t status;
void* pTimer;
} STaskSchedInfo;
typedef struct {
int64_t init;
int64_t step1Start;
@ -323,15 +323,15 @@ struct SStreamTask {
SStreamTaskId id;
SSTaskBasicInfo info;
STaskOutputInfo outputInfo;
STaskInputInfo inputInfo;
STaskSchedInfo schedInfo;
SDispatchMsgInfo msgInfo;
SStreamStatus status;
SCheckpointInfo chkInfo;
STaskExec exec;
SHistDataRange dataRange;
SDataRange dataRange;
SStreamTaskId historyTaskId;
SStreamTaskId streamTaskId;
int32_t nextCheckId;
SArray* checkpointInfo; // SArray<SStreamCheckpointInfo>
STaskTimestamp tsInfo;
SArray* pReadyMsgList; // SArray<SStreamChkptReadyInfo*>
TdThreadMutex lock; // secure the operation of set task status and puting data into inputQ
@ -346,13 +346,6 @@ struct SStreamTask {
STaskSinkFetch fetchSink;
};
int8_t inputStatus;
SStreamQueue* inputQueue;
// trigger
int8_t triggerStatus;
int64_t triggerParam;
void* schedTimer;
void* launchTaskTimer;
SMsgCb* pMsgCb; // msg handle
SStreamState* pState; // state backend
@ -425,6 +418,7 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsg
int32_t tDecodeStreamTaskChkInfo(SDecoder* pDecoder, SCheckpointInfo* pChkpInfo);
int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem);
int32_t streamTaskPutTranstateIntoInputQ(SStreamTask* pTask);
bool streamQueueIsFull(const STaosQueue* pQueue);
typedef struct {
@ -597,14 +591,6 @@ typedef struct SStreamTaskNodeUpdateMsg {
int32_t tEncodeStreamTaskUpdateMsg(SEncoder* pEncoder, const SStreamTaskNodeUpdateMsg* pMsg);
int32_t tDecodeStreamTaskUpdateMsg(SDecoder* pDecoder, SStreamTaskNodeUpdateMsg* pMsg);
typedef struct SStreamTaskNodeUpdateRsp {
int64_t streamId;
int32_t taskId;
} SStreamTaskNodeUpdateRsp;
int32_t tEncodeStreamTaskUpdateRsp(SEncoder* pEncoder, const SStreamTaskNodeUpdateRsp* pMsg);
int32_t tDecodeStreamTaskUpdateRsp(SDecoder* pDecoder, SStreamTaskNodeUpdateRsp* pMsg);
typedef struct {
int64_t streamId;
int32_t downstreamTaskId;
@ -695,8 +681,6 @@ int32_t streamSetParamForStreamScannerStep2(SStreamTask* pTask, SVersionRange* p
int32_t streamSourceScanHistoryData(SStreamTask* pTask);
int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask);
int32_t appendTranstateIntoInputQ(SStreamTask* pTask);
// agg level
int32_t streamTaskScanHistoryPrepare(SStreamTask* pTask);
int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, SStreamScanHistoryFinishReq* pReq,

View File

@ -99,7 +99,7 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t ver) {
" child id:%d, level:%d, status:%s fill-history:%d, trigger:%" PRId64 " ms",
SNODE_HANDLE, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->currentVer,
pTask->info.selfChildId, pTask->info.taskLevel, streamGetTaskStatusStr(pTask->status.taskStatus),
pTask->info.fillHistory, pTask->triggerParam);
pTask->info.fillHistory, pTask->info.triggerParam);
return 0;
}

View File

@ -64,7 +64,7 @@ set(
"src/tq/tqPush.c"
"src/tq/tqSink.c"
"src/tq/tqCommit.c"
"src/tq/tqRestore.c"
"src/tq/tqStreamTask.c"
"src/tq/tqSnapshot.c"
"src/tq/tqOffsetSnapshot.c"
"src/tq/tqStreamStateSnap.c"

View File

@ -163,7 +163,7 @@ int32_t tqOffsetRestoreFromFile(STqOffsetStore* pStore, const char* fname);
// tqStream
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver);
int32_t tqScanWalForStreamTasks(STQ* pTq);
int32_t tqScanWal(STQ* pTq);
int32_t tqCheckAndRunStreamTask(STQ* pTq);
int32_t tqStopStreamTasks(STQ* pTq);

View File

@ -915,7 +915,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
" child id:%d, level:%d, status:%s fill-history:%d, trigger:%" PRId64 " ms",
vgId, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->currentVer,
pTask->info.selfChildId, pTask->info.taskLevel, streamGetTaskStatusStr(pTask->status.taskStatus),
pTask->info.fillHistory, pTask->triggerParam);
pTask->info.fillHistory, pTask->info.triggerParam);
return 0;
}
@ -1172,7 +1172,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
if (done) {
pTask->tsInfo.step2Start = taosGetTimestampMs();
qDebug("s-task:%s scan-history from WAL stage(step 2) ended, elapsed time:%.2fs", id, 0.0);
appendTranstateIntoInputQ(pTask);
streamTaskPutTranstateIntoInputQ(pTask);
streamTryExec(pTask); // exec directly
} else {
STimeWindow* pWindow = &pTask->dataRange.window;
@ -1346,7 +1346,7 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
}
if (taskId == STREAM_EXEC_EXTRACT_DATA_IN_WAL_ID) { // all tasks are extracted submit data from the wal
tqScanWalForStreamTasks(pTq);
tqScanWal(pTq);
return 0;
}
@ -1504,7 +1504,7 @@ int32_t tqProcessTaskResumeImpl(STQ* pTq, SStreamTask* pTask, int64_t sversion,
if (level == TASK_LEVEL__SOURCE && pTask->info.fillHistory &&
pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) {
streamStartScanHistoryAsync(pTask, igUntreated);
} else if (level == TASK_LEVEL__SOURCE && (taosQueueItemSize(pTask->inputQueue->queue) == 0)) {
} else if (level == TASK_LEVEL__SOURCE && (taosQueueItemSize(pTask->inputInfo.queue->pQueue) == 0)) {
tqScanWalAsync(pTq, false);
} else {
streamSchedExec(pTask);
@ -1832,44 +1832,3 @@ _end:
return rsp.code;
}
int32_t tqProcessTaskStopReq(STQ* pTq, SRpcMsg* pMsg) {
int32_t vgId = TD_VID(pTq->pVnode);
char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
SRpcMsg rsp = {.info = pMsg->info, .code = TSDB_CODE_SUCCESS};
SVPauseStreamTaskReq* pReq = (SVPauseStreamTaskReq*)msg;
SStreamMeta* pMeta = pTq->pStreamMeta;
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId);
if (pTask == NULL) {
tqError("vgId:%d process stop req, failed to acquire task:0x%x, it may have been dropped already", vgId,
pReq->taskId);
// since task is in [STOP|DROPPING] state, it is safe to assume the pause is active
return TSDB_CODE_SUCCESS;
}
tqDebug("s-task:%s receive stop msg from mnode", pTask->id.idStr);
streamTaskStop(pTask);
SStreamTask* pHistoryTask = NULL;
if (pTask->historyTaskId.taskId != 0) {
pHistoryTask = streamMetaAcquireTask(pMeta, pTask->historyTaskId.streamId, pTask->historyTaskId.taskId);
if (pHistoryTask == NULL) {
tqError("vgId:%d process pause req, failed to acquire fill-history task:0x%x, it may have been dropped already",
pMeta->vgId, pTask->historyTaskId.taskId);
streamMetaReleaseTask(pMeta, pTask);
// since task is in [STOP|DROPPING] state, it is safe to assume the pause is active
return TSDB_CODE_SUCCESS;
}
tqDebug("s-task:%s fill-history task handle paused along with related stream task", pHistoryTask->id.idStr);
streamTaskStop(pHistoryTask);
streamMetaReleaseTask(pMeta, pHistoryTask);
}
streamMetaReleaseTask(pMeta, pTask);
tmsgSendRsp(&rsp);
return 0;
}

View File

@ -16,11 +16,12 @@
#include "tq.h"
#include "vnd.h"
static int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle);
static int32_t doSetOffsetForWalReader(SStreamTask* pTask, int32_t vgId);
static int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle);
static int32_t setWalReaderStartOffset(SStreamTask* pTask, int32_t vgId);
static void handleFillhistoryScanComplete(SStreamTask* pTask, int64_t ver);
// extract submit block from WAL, and add them into the input queue for the sources tasks.
int32_t tqScanWalForStreamTasks(STQ* pTq) {
// extract data blocks(submit/delete) from WAL, and add them into the input queue for all the sources tasks.
int32_t tqScanWal(STQ* pTq) {
int32_t vgId = TD_VID(pTq->pVnode);
SStreamMeta* pMeta = pTq->pStreamMeta;
int64_t st = taosGetTimestampMs();
@ -31,7 +32,7 @@ int32_t tqScanWalForStreamTasks(STQ* pTq) {
// check all tasks
bool shouldIdle = true;
createStreamTaskRunReq(pTq->pStreamMeta, &shouldIdle);
doScanWalForAllTasks(pTq->pStreamMeta, &shouldIdle);
int32_t times = 0;
@ -140,7 +141,7 @@ int32_t tqScanWalAsync(STQ* pTq, bool ckPause) {
int32_t vgId = TD_VID(pTq->pVnode);
SStreamMeta* pMeta = pTq->pStreamMeta;
// for follower or vnode does not restored, do not launch the stream tasks.
// do not launch the stream tasks, if it is a follower or not restored vnode.
if (!(vnodeIsRoleLeader(pTq->pVnode) && pTq->pVnode->restored)) {
return TSDB_CODE_SUCCESS;
}
@ -223,7 +224,7 @@ int32_t tqStopStreamTasks(STQ* pTq) {
return 0;
}
int32_t doSetOffsetForWalReader(SStreamTask* pTask, int32_t vgId) {
int32_t setWalReaderStartOffset(SStreamTask* pTask, int32_t vgId) {
// seek the stored version and extract data from WAL
int64_t firstVer = walReaderGetValidFirstVer(pTask->exec.pWalReader);
if (pTask->chkInfo.currentVer < firstVer) {
@ -267,7 +268,8 @@ int32_t doSetOffsetForWalReader(SStreamTask* pTask, int32_t vgId) {
return TSDB_CODE_SUCCESS;
}
static void checkForFillHistoryVerRange(SStreamTask* pTask, int64_t ver) {
// todo handle memory error
void handleFillhistoryScanComplete(SStreamTask* pTask, int64_t ver) {
const char* id = pTask->id.idStr;
int64_t maxVer = pTask->dataRange.range.maxVer;
@ -279,7 +281,7 @@ static void checkForFillHistoryVerRange(SStreamTask* pTask, int64_t ver) {
double el = (taosGetTimestampMs() - pTask->tsInfo.step2Start) / 1000.0;
qDebug("s-task:%s scan-history from WAL stage(step 2) ended, elapsed time:%.2fs", id, el);
appendTranstateIntoInputQ(pTask);
/*int32_t code = */streamTaskPutTranstateIntoInputQ(pTask);
/*int32_t code = */ streamSchedExec(pTask);
} else {
qWarn("s-task:%s fill-history scan WAL, currentVer:%" PRId64 " reach the maximum ver:%" PRId64 ", not scan wal",
@ -288,7 +290,7 @@ static void checkForFillHistoryVerRange(SStreamTask* pTask, int64_t ver) {
}
}
int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) {
int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) {
*pScanIdle = true;
bool noDataInWal = true;
int32_t vgId = pStreamMeta->vgId;
@ -340,14 +342,14 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) {
continue;
}
if (streamQueueIsFull(pTask->inputQueue->queue)) {
if (streamQueueIsFull(pTask->inputInfo.queue->pQueue)) {
tqTrace("s-task:%s input queue is full, do nothing", pTask->id.idStr);
streamMetaReleaseTask(pStreamMeta, pTask);
continue;
}
// downstream task has blocked the output, stopped for a while
if (pTask->inputStatus == TASK_INPUT_STATUS__BLOCKED) {
if (pTask->inputInfo.status == TASK_INPUT_STATUS__BLOCKED) {
tqDebug("s-task:%s inputQ is blocked, do nothing", pTask->id.idStr);
streamMetaReleaseTask(pStreamMeta, pTask);
continue;
@ -356,7 +358,7 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) {
*pScanIdle = false;
// seek the stored version and extract data from WAL
int32_t code = doSetOffsetForWalReader(pTask, vgId);
int32_t code = setWalReaderStartOffset(pTask, vgId);
if (code != TSDB_CODE_SUCCESS) {
streamMetaReleaseTask(pStreamMeta, pTask);
continue;
@ -369,7 +371,7 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) {
code = extractMsgFromWal(pTask->exec.pWalReader, (void**)&pItem, maxVer, pTask->id.idStr);
if ((code != TSDB_CODE_SUCCESS || pItem == NULL) && (numOfItems == 0)) { // failed, continue
checkForFillHistoryVerRange(pTask, walReaderGetCurrentVer(pTask->exec.pWalReader));
handleFillhistoryScanComplete(pTask, walReaderGetCurrentVer(pTask->exec.pWalReader));
streamMetaReleaseTask(pStreamMeta, pTask);
continue;
}
@ -390,7 +392,7 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) {
if (code == TSDB_CODE_SUCCESS) {
int64_t ver = walReaderGetCurrentVer(pTask->exec.pWalReader);
pTask->chkInfo.currentVer = ver;
checkForFillHistoryVerRange(pTask, ver);
handleFillhistoryScanComplete(pTask, ver);
tqDebug("s-task:%s set the ver:%" PRId64 " from WALReader after extract block from WAL", pTask->id.idStr, ver);
} else {
tqError("s-task:%s append input queue failed, too many in inputQ, ver:%" PRId64, pTask->id.idStr,

View File

@ -89,7 +89,6 @@ int32_t streamTaskSnapReaderClose(SStreamTaskReader* pReader) {
taosArrayDestroy(pReader->tdbTbList);
tdbTbcClose(pReader->pCur);
taosMemoryFree(pReader);
return code;
}

View File

@ -87,6 +87,7 @@ void vnodeSnapReaderClose(SVSnapReader *pReader) {
int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData) {
int32_t code = 0;
SVnode *pVnode = pReader->pVnode;
int32_t vgId = TD_VID(pReader->pVnode);
// CONFIG ==============
// FIXME: if commit multiple times and the config changed?
@ -220,30 +221,30 @@ int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData)
}
// STREAM ============
vInfo("stream task start");
vInfo("vgId:%d stream task start", vgId);
if (!pReader->streamTaskDone) {
if (pReader->pStreamTaskReader == NULL) {
vInfo("stream task start 1");
code = streamTaskSnapReaderOpen(pReader->pVnode->pTq, pReader->sver, pReader->sver, &pReader->pStreamTaskReader);
if (code) {
vInfo("stream task start err");
vError("vgId:%d open streamtask snapshot reader failed, code:%s", vgId, tstrerror(code));
goto _err;
}
}
code = streamTaskSnapRead(pReader->pStreamTaskReader, ppData);
vInfo("stream task start 2");
if (code) {
vInfo("stream task start 3");
vError("vgId:%d error happens during read data from streatask snapshot, code:%s", vgId, tstrerror(code));
goto _err;
} else {
if (*ppData) {
vInfo("vgId:%d no streamTask snapshot", vgId);
goto _exit;
vInfo("stream task start 4");
} else {
pReader->streamTaskDone = 1;
code = streamTaskSnapReaderClose(pReader->pStreamTaskReader);
vInfo("stream task start 5");
if (code) goto _err;
if (code) {
goto _err;
}
pReader->pStreamTaskReader = NULL;
}
}
@ -305,15 +306,15 @@ _exit:
pReader->index++;
*nData = sizeof(SSnapDataHdr) + pHdr->size;
pHdr->index = pReader->index;
vDebug("vgId:%d, vnode snapshot read data, index:%" PRId64 " type:%d blockLen:%d ", TD_VID(pReader->pVnode),
pReader->index, pHdr->type, *nData);
vDebug("vgId:%d, vnode snapshot read data, index:%" PRId64 " type:%d blockLen:%d ", vgId, pReader->index,
pHdr->type, *nData);
} else {
vInfo("vgId:%d, vnode snapshot read data end, index:%" PRId64, TD_VID(pReader->pVnode), pReader->index);
vInfo("vgId:%d, vnode snapshot read data end, index:%" PRId64, vgId, pReader->index);
}
return code;
_err:
vError("vgId:%d, vnode snapshot read failed since %s", TD_VID(pReader->pVnode), tstrerror(code));
vError("vgId:%d, vnode snapshot read failed since %s", vgId, tstrerror(code));
return code;
}

View File

@ -77,9 +77,6 @@ int32_t streamNotifyUpstreamContinue(SStreamTask* pTask);
int32_t streamTaskFillHistoryFinished(SStreamTask* pTask);
int32_t streamTransferStateToStreamTask(SStreamTask* pTask);
extern int32_t streamBackendId;
extern int32_t streamBackendCfWrapperId;
#ifdef __cplusplus
}
#endif

View File

@ -59,8 +59,8 @@ char* createStreamTaskIdStr(int64_t streamId, int32_t taskId) {
static void streamSchedByTimer(void* param, void* tmrId) {
SStreamTask* pTask = (void*)param;
int8_t status = atomic_load_8(&pTask->triggerStatus);
qDebug("s-task:%s in scheduler, trigger status:%d, next:%dms", pTask->id.idStr, status, (int32_t)pTask->triggerParam);
int8_t status = atomic_load_8(&pTask->schedInfo.status);
qDebug("s-task:%s in scheduler, trigger status:%d, next:%dms", pTask->id.idStr, status, (int32_t)pTask->info.triggerParam);
if (streamTaskShouldStop(&pTask->status) || streamTaskShouldPause(&pTask->status)) {
qDebug("s-task:%s jump out of schedTimer", pTask->id.idStr);
@ -80,29 +80,29 @@ static void streamSchedByTimer(void* param, void* tmrId) {
return;
}
atomic_store_8(&pTask->triggerStatus, TASK_TRIGGER_STATUS__INACTIVE);
atomic_store_8(&pTask->schedInfo.status, TASK_TRIGGER_STATUS__INACTIVE);
pTrigger->pBlock->info.type = STREAM_GET_ALL;
if (streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pTrigger) < 0) {
taosFreeQitem(pTrigger);
taosTmrReset(streamSchedByTimer, (int32_t)pTask->triggerParam, pTask, streamEnv.timer, &pTask->schedTimer);
taosTmrReset(streamSchedByTimer, (int32_t)pTask->info.triggerParam, pTask, streamEnv.timer, &pTask->schedInfo.pTimer);
return;
}
streamSchedExec(pTask);
}
taosTmrReset(streamSchedByTimer, (int32_t)pTask->triggerParam, pTask, streamEnv.timer, &pTask->schedTimer);
taosTmrReset(streamSchedByTimer, (int32_t)pTask->info.triggerParam, pTask, streamEnv.timer, &pTask->schedInfo.pTimer);
}
int32_t streamSetupScheduleTrigger(SStreamTask* pTask) {
if (pTask->triggerParam != 0 && pTask->info.fillHistory == 0) {
if (pTask->info.triggerParam != 0 && pTask->info.fillHistory == 0) {
int32_t ref = atomic_add_fetch_32(&pTask->refCnt, 1);
ASSERT(ref == 2 && pTask->schedTimer == NULL);
ASSERT(ref == 2 && pTask->schedInfo.pTimer == NULL);
qDebug("s-task:%s setup scheduler trigger, delay:%" PRId64 " ms", pTask->id.idStr, pTask->triggerParam);
qDebug("s-task:%s setup scheduler trigger, delay:%" PRId64 " ms", pTask->id.idStr, pTask->info.triggerParam);
pTask->schedTimer = taosTmrStart(streamSchedByTimer, (int32_t)pTask->triggerParam, pTask, streamEnv.timer);
pTask->triggerStatus = TASK_TRIGGER_STATUS__INACTIVE;
pTask->schedInfo.pTimer = taosTmrStart(streamSchedByTimer, (int32_t)pTask->info.triggerParam, pTask, streamEnv.timer);
pTask->schedInfo.status = TASK_TRIGGER_STATUS__INACTIVE;
}
return 0;
@ -224,7 +224,7 @@ int32_t streamTaskOutputResultBlock(SStreamTask* pTask, SStreamDataBlock* pBlock
destroyStreamDataBlock(pBlock);
} else {
ASSERT(type == TASK_OUTPUT__FIXED_DISPATCH || type == TASK_OUTPUT__SHUFFLE_DISPATCH);
code = taosWriteQitem(pTask->outputInfo.queue->queue, pBlock);
code = taosWriteQitem(pTask->outputInfo.queue->pQueue, pBlock);
if (code != 0) {
qError("s-task:%s failed to put res into outputQ", pTask->id.idStr);
}
@ -299,7 +299,7 @@ int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, S
return 0;
}
void streamTaskInputFail(SStreamTask* pTask) { atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__FAILED); }
void streamTaskInputFail(SStreamTask* pTask) { atomic_store_8(&pTask->inputInfo.status, TASK_INPUT_STATUS__FAILED); }
void streamTaskOpenAllUpstreamInput(SStreamTask* pTask) {
int32_t num = taosArrayGetSize(pTask->pUpstreamInfoList);

View File

@ -155,7 +155,7 @@ static int32_t continueDispatchCheckpointBlock(SStreamDataBlock* pBlock, SStream
pBlock->srcTaskId = pTask->id.taskId;
pBlock->srcVgId = pTask->pMeta->vgId;
int32_t code = taosWriteQitem(pTask->outputInfo.queue->queue, pBlock);
int32_t code = taosWriteQitem(pTask->outputInfo.queue->pQueue, pBlock);
if (code == 0) {
streamDispatchStreamBlock(pTask);
} else {
@ -217,7 +217,6 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc
}
if (taskLevel == TASK_LEVEL__SINK) {
pTask->status.taskStatus = TASK_STATUS__CK_READY;
qDebug("s-task:%s process checkpoint block, all %d upstreams sent checkpoint msgs, send ready msg to upstream",
id, num);
streamFreeQitem((SStreamQueueItem*)pBlock);
@ -231,8 +230,7 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc
// can start local checkpoint procedure
pTask->checkpointNotReadyTasks = streamTaskGetNumOfDownstream(pTask);
// if all upstreams are ready for generating checkpoint, set the status to be TASK_STATUS__CK_READY
// put the checkpoint block into inputQ, to make sure all blocks with less version have been handled by this task
// Put the checkpoint block into inputQ, to make sure all blocks with less version have been handled by this task
// already. And then, dispatch check point msg to all downstream tasks
code = continueDispatchCheckpointBlock(pBlock, pTask);
}
@ -314,7 +312,7 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) {
int32_t remain = atomic_sub_fetch_32(&pMeta->chkptNotReadyTasks, 1);
ASSERT(remain >= 0);
if (remain == 0) { // all tasks are in TASK_STATUS__CK_READY state
if (remain == 0) { // all tasks are ready
qDebug("s-task:%s is ready for checkpoint", pTask->id.idStr);
pMeta->totalTasks = 0;

View File

@ -493,7 +493,7 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
ASSERT((pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH));
const char* id = pTask->id.idStr;
int32_t numOfElems = taosQueueItemSize(pTask->outputInfo.queue->queue);
int32_t numOfElems = taosQueueItemSize(pTask->outputInfo.queue->pQueue);
if (numOfElems > 0) {
qDebug("s-task:%s try to dispatch intermediate block to downstream, elem in outputQ:%d", id, numOfElems);
}
@ -995,7 +995,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
// the input queue of the (down stream) task that receive the output data is full,
// so the TASK_INPUT_STATUS_BLOCKED is rsp
if (pRsp->inputStatus == TASK_INPUT_STATUS__BLOCKED) {
pTask->inputStatus = TASK_INPUT_STATUS__BLOCKED; // block the input of current task, to push pressure to upstream
pTask->inputInfo.status = TASK_INPUT_STATUS__BLOCKED; // block the input of current task, to push pressure to upstream
pTask->msgInfo.blockingTs = taosGetTimestampMs(); // record the blocking start time
qError("s-task:%s inputQ of downstream task:0x%x is full, time:%" PRId64 " wait for %dms and retry dispatch data",
id, pRsp->downstreamTaskId, pTask->msgInfo.blockingTs, DISPATCH_RETRY_INTERVAL_MS);
@ -1012,7 +1012,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
pTask->msgInfo.blockingTs = 0;
// put data into inputQ of current task is also allowed
pTask->inputStatus = TASK_INPUT_STATUS__NORMAL;
pTask->inputInfo.status = TASK_INPUT_STATUS__NORMAL;
}
// now ready for next data output
@ -1062,19 +1062,3 @@ int32_t tDecodeStreamTaskUpdateMsg(SDecoder* pDecoder, SStreamTaskNodeUpdateMsg*
tEndDecode(pDecoder);
return 0;
}
int32_t tEncodeStreamTaskUpdateRsp(SEncoder* pEncoder, const SStreamTaskNodeUpdateRsp* pMsg) {
if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeI64(pEncoder, pMsg->streamId) < 0) return -1;
if (tEncodeI32(pEncoder, pMsg->taskId) < 0) return -1;
tEndEncode(pEncoder);
return pEncoder->pos;
}
int32_t tDecodeStreamTaskUpdateRsp(SDecoder* pDecoder, SStreamTaskNodeUpdateRsp* pMsg) {
if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeI64(pDecoder, &pMsg->streamId) < 0) return -1;
if (tDecodeI32(pDecoder, &pMsg->taskId) < 0) return -1;
tEndDecode(pDecoder);
return 0;
}

View File

@ -83,7 +83,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i
return 0;
}
if (pTask->inputStatus == TASK_INPUT_STATUS__BLOCKED) {
if (pTask->inputInfo.status == TASK_INPUT_STATUS__BLOCKED) {
qWarn("s-task:%s downstream task inputQ blocked, idle for 1sec and retry", pTask->id.idStr);
taosMsleep(1000);
continue;
@ -192,7 +192,7 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSize) {
return 0;
}
if (pTask->inputStatus == TASK_INPUT_STATUS__BLOCKED) {
if (pTask->inputInfo.status == TASK_INPUT_STATUS__BLOCKED) {
qDebug("s-task:%s inputQ is blocked, wait for 10sec and retry", pTask->id.idStr);
taosMsleep(10000);
continue;
@ -249,8 +249,8 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSize) {
}
int32_t streamTaskGetInputQItems(const SStreamTask* pTask) {
int32_t numOfItems1 = taosQueueItemSize(pTask->inputQueue->queue);
int32_t numOfItems2 = taosQallItemSize(pTask->inputQueue->qall);
int32_t numOfItems1 = taosQueueItemSize(pTask->inputInfo.queue->pQueue);
int32_t numOfItems2 = taosQallItemSize(pTask->inputInfo.queue->qall);
return numOfItems1 + numOfItems2;
}
@ -360,7 +360,7 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
// 7. pause allowed.
streamTaskEnablePause(pStreamTask);
if (taosQueueEmpty(pStreamTask->inputQueue->queue)) {
if (taosQueueEmpty(pStreamTask->inputInfo.queue->pQueue)) {
SStreamRefDataBlock* pItem = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0);
SSDataBlock* pDelBlock = createSpecialDataBlock(STREAM_DELETE_DATA);
@ -472,7 +472,7 @@ int32_t streamProcessTranstateBlock(SStreamTask* pTask, SStreamDataBlock* pBlock
// agg task should dispatch trans-state msg to sink task, to flush all data to sink task.
if (level == TASK_LEVEL__AGG || level == TASK_LEVEL__SOURCE) {
pBlock->srcVgId = pTask->pMeta->vgId;
code = taosWriteQitem(pTask->outputInfo.queue->queue, pBlock);
code = taosWriteQitem(pTask->outputInfo.queue->pQueue, pBlock);
if (code == 0) {
streamDispatchStreamBlock(pTask);
} else {
@ -577,7 +577,6 @@ int32_t streamExecForAll(SStreamTask* pTask) {
// do nothing after sync executor state to storage backend, untill the vnode-level checkpoint is completed.
if (type == STREAM_INPUT__CHECKPOINT) {
// ASSERT(pTask->status.taskStatus == TASK_STATUS__CK);
// pTask->status.taskStatus = TASK_STATUS__CK_READY;
qDebug("s-task:%s checkpoint block received, set the status:%s", pTask->id.idStr,
streamGetTaskStatusStr(pTask->status.taskStatus));
streamTaskBuildCheckpoint(pTask);
@ -615,7 +614,7 @@ int32_t streamTryExec(SStreamTask* pTask) {
qDebug("s-task:%s exec completed, status:%s, sched-status:%d", id, streamGetTaskStatusStr(pTask->status.taskStatus),
pTask->status.schedStatus);
if (!(taosQueueEmpty(pTask->inputQueue->queue) || streamTaskShouldStop(&pTask->status) ||
if (!(taosQueueEmpty(pTask->inputInfo.queue->pQueue) || streamTaskShouldStop(&pTask->status) ||
streamTaskShouldPause(&pTask->status))) {
streamSchedExec(pTask);
}

View File

@ -261,10 +261,10 @@ void streamMetaClear(SStreamMeta* pMeta) {
SStreamTask* p = *(SStreamTask**)pIter;
// release the ref by timer
if (p->triggerParam != 0 && p->info.fillHistory == 0) { // one more ref in timer
if (p->info.triggerParam != 0 && p->info.fillHistory == 0) { // one more ref in timer
qDebug("s-task:%s stop schedTimer, and (before) desc ref:%d", p->id.idStr, p->refCnt);
taosTmrStop(p->schedTimer);
p->triggerParam = 0;
taosTmrStop(p->schedInfo.pTimer);
p->info.triggerParam = 0;
streamMetaReleaseTask(pMeta, p);
}

View File

@ -19,7 +19,6 @@
#define MIN_STREAM_EXEC_BATCH_NUM 4
#define STREAM_TASK_INPUT_QUEUE_CAPACITY 20480
#define STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE (30)
#define QUEUE_MEM_SIZE_IN_MB(_q) (taosQueueMemorySize(_q) / ONE_MB_F)
// todo refactor:
// read data from input queue
@ -46,28 +45,28 @@ SStreamQueue* streamQueueOpen(int64_t cap) {
return NULL;
}
pQueue->queue = taosOpenQueue();
pQueue->pQueue = taosOpenQueue();
pQueue->qall = taosAllocateQall();
if (pQueue->queue == NULL || pQueue->qall == NULL) {
if (pQueue->queue) taosCloseQueue(pQueue->queue);
if (pQueue->pQueue == NULL || pQueue->qall == NULL) {
if (pQueue->pQueue) taosCloseQueue(pQueue->pQueue);
if (pQueue->qall) taosFreeQall(pQueue->qall);
taosMemoryFree(pQueue);
return NULL;
}
pQueue->status = STREAM_QUEUE__SUCESS;
taosSetQueueCapacity(pQueue->queue, cap);
taosSetQueueMemoryCapacity(pQueue->queue, cap * 1024);
taosSetQueueCapacity(pQueue->pQueue, cap);
taosSetQueueMemoryCapacity(pQueue->pQueue, cap * 1024);
return pQueue;
}
void streamQueueClose(SStreamQueue* pQueue, int32_t taskId) {
qDebug("s-task:0x%x free the queue:%p, items in queue:%d", taskId, pQueue->queue, taosQueueItemSize(pQueue->queue));
qDebug("s-task:0x%x free the queue:%p, items in queue:%d", taskId, pQueue->pQueue, taosQueueItemSize(pQueue->pQueue));
streamQueueCleanup(pQueue);
taosFreeQall(pQueue->qall);
taosCloseQueue(pQueue->queue);
taosCloseQueue(pQueue->pQueue);
taosMemoryFree(pQueue);
}
@ -81,7 +80,7 @@ void* streamQueueNextItem(SStreamQueue* pQueue) {
pQueue->qItem = NULL;
taosGetQitem(pQueue->qall, &pQueue->qItem);
if (pQueue->qItem == NULL) {
taosReadAllQitems(pQueue->queue, pQueue->qall);
taosReadAllQitems(pQueue->pQueue, pQueue->qall);
taosGetQitem(pQueue->qall, &pQueue->qItem);
}
@ -89,6 +88,17 @@ void* streamQueueNextItem(SStreamQueue* pQueue) {
}
}
void streamQueueProcessSuccess(SStreamQueue* queue) {
ASSERT(atomic_load_8(&queue->status) == STREAM_QUEUE__PROCESSING);
queue->qItem = NULL;
atomic_store_8(&queue->status, STREAM_QUEUE__SUCESS);
}
void streamQueueProcessFail(SStreamQueue* queue) {
ASSERT(atomic_load_8(&queue->status) == STREAM_QUEUE__PROCESSING);
atomic_store_8(&queue->status, STREAM_QUEUE__FAILED);
}
#if 0
bool streamQueueResEmpty(const SStreamQueueRes* pRes) {
//
@ -149,7 +159,7 @@ SStreamQueueRes streamQueueGetRes(SStreamQueue1* pQueue) {
bool streamQueueIsFull(const STaosQueue* pQueue) {
bool isFull = taosQueueItemSize((STaosQueue*) pQueue) >= STREAM_TASK_INPUT_QUEUE_CAPACITY;
double size = QUEUE_MEM_SIZE_IN_MB((STaosQueue*) pQueue);
double size = SIZE_IN_MB(taosQueueMemorySize((STaosQueue*) pQueue));
return (isFull || size >= STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE);
}
@ -165,7 +175,7 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu
return TSDB_CODE_SUCCESS;
}
SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue);
SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputInfo.queue);
if (qItem == NULL) {
qDebug("===stream===break batchSize:%d, %s", *numOfBlocks, id);
return TSDB_CODE_SUCCESS;
@ -185,7 +195,7 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu
return TSDB_CODE_SUCCESS;
}
SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue);
SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputInfo.queue);
if (qItem == NULL) {
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE && (++retryTimes) < MAX_RETRY_TIMES) {
taosMsleep(10);
@ -211,7 +221,7 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu
} else {
// previous existed blocks needs to be handle, before handle the checkpoint msg block
qDebug("s-task:%s %s msg extracted, handle previous blocks, numOfBlocks:%d", id, p, *numOfBlocks);
streamQueueProcessFail(pTask->inputQueue);
streamQueueProcessFail(pTask->inputInfo.queue);
return TSDB_CODE_SUCCESS;
}
} else {
@ -227,7 +237,7 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu
tstrerror(terrno));
}
streamQueueProcessFail(pTask->inputQueue);
streamQueueProcessFail(pTask->inputInfo.queue);
return TSDB_CODE_SUCCESS;
}
@ -235,7 +245,7 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu
}
*numOfBlocks += 1;
streamQueueProcessSuccess(pTask->inputQueue);
streamQueueProcessSuccess(pTask->inputInfo.queue);
if (*numOfBlocks >= MAX_STREAM_EXEC_BATCH_NUM) {
qDebug("s-task:%s batch size limit:%d reached, start to process blocks", id, MAX_STREAM_EXEC_BATCH_NUM);
@ -246,13 +256,14 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu
}
int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem) {
int8_t type = pItem->type;
int32_t total = taosQueueItemSize(pTask->inputQueue->queue) + 1;
double size = QUEUE_MEM_SIZE_IN_MB(pTask->inputQueue->queue);
int8_t type = pItem->type;
STaosQueue* pQueue = pTask->inputInfo.queue->pQueue;
int32_t total = taosQueueItemSize(pQueue) + 1;
double size = SIZE_IN_MB(taosQueueMemorySize(pQueue));
if (type == STREAM_INPUT__DATA_SUBMIT) {
SStreamDataSubmit* px = (SStreamDataSubmit*)pItem;
if ((pTask->info.taskLevel == TASK_LEVEL__SOURCE) && streamQueueIsFull(pTask->inputQueue->queue)) {
if ((pTask->info.taskLevel == TASK_LEVEL__SOURCE) && streamQueueIsFull(pQueue)) {
qError(
"s-task:%s inputQ is full, capacity(size:%d num:%dMiB), current(blocks:%d, size:%.2fMiB) stop to push data",
pTask->id.idStr, STREAM_TASK_INPUT_QUEUE_CAPACITY, STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE, total, size);
@ -264,7 +275,7 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem)
int32_t msgLen = px->submit.msgLen;
int64_t ver = px->submit.ver;
int32_t code = taosWriteQitem(pTask->inputQueue->queue, pItem);
int32_t code = taosWriteQitem(pQueue, pItem);
if (code != TSDB_CODE_SUCCESS) {
streamDataSubmitDestroy(px);
taosFreeQitem(pItem);
@ -276,7 +287,7 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem)
msgLen, ver, total, size + SIZE_IN_MB(msgLen));
} else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE ||
type == STREAM_INPUT__REF_DATA_BLOCK) {
if (streamQueueIsFull(pTask->inputQueue->queue)) {
if (streamQueueIsFull(pQueue)) {
qError("s-task:%s input queue is full, capacity:%d size:%d MiB, current(blocks:%d, size:%.2fMiB) abort",
pTask->id.idStr, STREAM_TASK_INPUT_QUEUE_CAPACITY, STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE, total, size);
destroyStreamDataBlock((SStreamDataBlock*)pItem);
@ -284,27 +295,27 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem)
}
qDebug("s-task:%s blockdata enqueue, total in queue:%d, size:%.2fMiB", pTask->id.idStr, total, size);
int32_t code = taosWriteQitem(pTask->inputQueue->queue, pItem);
int32_t code = taosWriteQitem(pQueue, pItem);
if (code != TSDB_CODE_SUCCESS) {
destroyStreamDataBlock((SStreamDataBlock*)pItem);
return code;
}
} else if (type == STREAM_INPUT__CHECKPOINT || type == STREAM_INPUT__CHECKPOINT_TRIGGER ||
type == STREAM_INPUT__TRANS_STATE) {
taosWriteQitem(pTask->inputQueue->queue, pItem);
taosWriteQitem(pQueue, pItem);
qDebug("s-task:%s level:%d %s blockdata enqueue, total in queue:%d, size:%.2fMiB", pTask->id.idStr,
pTask->info.taskLevel, streamGetBlockTypeStr(type), total, size);
} else if (type == STREAM_INPUT__GET_RES) {
// use the default memory limit, refactor later.
taosWriteQitem(pTask->inputQueue->queue, pItem);
taosWriteQitem(pQueue, pItem);
qDebug("s-task:%s data res enqueue, current(blocks:%d, size:%.2fMiB)", pTask->id.idStr, total, size);
} else {
ASSERT(0);
}
if (type != STREAM_INPUT__GET_RES && type != STREAM_INPUT__CHECKPOINT && pTask->triggerParam != 0) {
atomic_val_compare_exchange_8(&pTask->triggerStatus, TASK_TRIGGER_STATUS__INACTIVE, TASK_TRIGGER_STATUS__ACTIVE);
qDebug("s-task:%s new data arrived, active the trigger, triggerStatus:%d", pTask->id.idStr, pTask->triggerStatus);
if (type != STREAM_INPUT__GET_RES && type != STREAM_INPUT__CHECKPOINT && pTask->info.triggerParam != 0) {
atomic_val_compare_exchange_8(&pTask->schedInfo.status, TASK_TRIGGER_STATUS__INACTIVE, TASK_TRIGGER_STATUS__ACTIVE);
qDebug("s-task:%s new data arrived, active the trigger, triggerStatus:%d", pTask->id.idStr, pTask->schedInfo.status);
}
return 0;

View File

@ -65,7 +65,6 @@ const char* streamGetTaskStatusStr(int32_t status) {
case TASK_STATUS__HALT: return "halt";
case TASK_STATUS__PAUSE: return "paused";
case TASK_STATUS__CK: return "check-point";
case TASK_STATUS__CK_READY: return "check-point-ready";
case TASK_STATUS__DROPPING: return "dropping";
case TASK_STATUS__STOP: return "stop";
default:return "";
@ -109,7 +108,7 @@ int32_t streamTaskLaunchScanHistory(SStreamTask* pTask) {
// check status
static int32_t doCheckDownstreamStatus(SStreamTask* pTask) {
SHistDataRange* pRange = &pTask->dataRange;
SDataRange* pRange = &pTask->dataRange;
STimeWindow* pWindow = &pRange->window;
SStreamTaskCheckReq req = {
@ -366,7 +365,7 @@ int32_t streamSourceScanHistoryData(SStreamTask* pTask) {
return streamScanExec(pTask, 100);
}
int32_t appendTranstateIntoInputQ(SStreamTask* pTask) {
int32_t streamTaskPutTranstateIntoInputQ(SStreamTask* pTask) {
SStreamDataBlock* pTranstate = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, sizeof(SSDataBlock));
if (pTranstate == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
@ -765,7 +764,7 @@ int32_t tDecodeStreamScanHistoryFinishReq(SDecoder* pDecoder, SStreamScanHistory
void streamTaskSetRangeStreamCalc(SStreamTask* pTask) {
if (pTask->historyTaskId.taskId == 0) {
SHistDataRange* pRange = &pTask->dataRange;
SDataRange* pRange = &pTask->dataRange;
if (pTask->info.fillHistory == 1) {
qDebug("s-task:%s fill-history task, time window:%" PRId64 "-%" PRId64 ", verRange:%" PRId64
"-%" PRId64,
@ -776,7 +775,7 @@ void streamTaskSetRangeStreamCalc(SStreamTask* pTask) {
pTask->id.idStr, pRange->window.skey, pRange->window.ekey, pRange->range.minVer, pRange->range.maxVer);
}
} else {
SHistDataRange* pRange = &pTask->dataRange;
SDataRange* pRange = &pTask->dataRange;
int64_t ekey = 0;
if (pRange->window.ekey < INT64_MAX) {

View File

@ -39,7 +39,7 @@ SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, int8_t fillHisto
pTask->id.streamId = streamId;
pTask->info.taskLevel = taskLevel;
pTask->info.fillHistory = fillHistory;
pTask->triggerParam = triggerParam;
pTask->info.triggerParam = triggerParam;
char buf[128] = {0};
sprintf(buf, "0x%" PRIx64 "-%d", pTask->id.streamId, pTask->id.taskId);
@ -47,7 +47,7 @@ SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, int8_t fillHisto
pTask->id.idStr = taosStrdup(buf);
pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE;
pTask->status.taskStatus = TASK_STATUS__SCAN_HISTORY;
pTask->inputStatus = TASK_INPUT_STATUS__NORMAL;
pTask->inputInfo.status = TASK_INPUT_STATUS__NORMAL;
pTask->outputInfo.status = TASK_OUTPUT_STATUS__NORMAL;
addToTaskset(pTaskList, pTask);
@ -133,7 +133,7 @@ int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
if (tSerializeSUseDbRspImp(pEncoder, &pTask->shuffleDispatcher.dbInfo) < 0) return -1;
if (tEncodeCStr(pEncoder, pTask->shuffleDispatcher.stbFullName) < 0) return -1;
}
if (tEncodeI64(pEncoder, pTask->triggerParam) < 0) return -1;
if (tEncodeI64(pEncoder, pTask->info.triggerParam) < 0) return -1;
if (tEncodeCStrWithLen(pEncoder, pTask->reserve, sizeof(pTask->reserve) - 1) < 0) return -1;
tEndEncode(pEncoder);
@ -210,7 +210,7 @@ int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
if (tDeserializeSUseDbRspImp(pDecoder, &pTask->shuffleDispatcher.dbInfo) < 0) return -1;
if (tDecodeCStrTo(pDecoder, pTask->shuffleDispatcher.stbFullName) < 0) return -1;
}
if (tDecodeI64(pDecoder, &pTask->triggerParam) < 0) return -1;
if (tDecodeI64(pDecoder, &pTask->info.triggerParam) < 0) return -1;
if (tDecodeCStrTo(pDecoder, pTask->reserve) < 0) return -1;
tEndDecode(pDecoder);
@ -273,9 +273,9 @@ void tFreeStreamTask(SStreamTask* pTask) {
taosMsleep(10);
}
if (pTask->schedTimer != NULL) {
taosTmrStop(pTask->schedTimer);
pTask->schedTimer = NULL;
if (pTask->schedInfo.pTimer != NULL) {
taosTmrStop(pTask->schedInfo.pTimer);
pTask->schedInfo.pTimer = NULL;
}
if (pTask->launchTaskTimer != NULL) {
@ -284,8 +284,8 @@ void tFreeStreamTask(SStreamTask* pTask) {
}
int32_t status = atomic_load_8((int8_t*)&(pTask->status.taskStatus));
if (pTask->inputQueue) {
streamQueueClose(pTask->inputQueue, pTask->id.taskId);
if (pTask->inputInfo.queue) {
streamQueueClose(pTask->inputInfo.queue, pTask->id.taskId);
}
if (pTask->outputInfo.queue) {
@ -355,16 +355,16 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i
pTask->refCnt = 1;
pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE;
pTask->status.timerActive = 0;
pTask->inputQueue = streamQueueOpen(512 << 10);
pTask->inputInfo.queue = streamQueueOpen(512 << 10);
pTask->outputInfo.queue = streamQueueOpen(512 << 10);
if (pTask->inputQueue == NULL || pTask->outputInfo.queue == NULL) {
if (pTask->inputInfo.queue == NULL || pTask->outputInfo.queue == NULL) {
qError("s-task:%s failed to prepare the input/output queue, initialize task failed", pTask->id.idStr);
return -1;
}
pTask->tsInfo.init = taosGetTimestampMs();
pTask->inputStatus = TASK_INPUT_STATUS__NORMAL;
pTask->inputInfo.status = TASK_INPUT_STATUS__NORMAL;
pTask->outputInfo.status = TASK_OUTPUT_STATUS__NORMAL;
pTask->pMeta = pMeta;