refactor: do some internal refactor.

This commit is contained in:
Haojun Liao 2023-10-07 19:16:55 +08:00
parent 940d6a3cb4
commit af2c0c95e9
12 changed files with 188 additions and 164 deletions

View File

@ -290,6 +290,8 @@ typedef struct SSTaskBasicInfo {
} SSTaskBasicInfo;
typedef struct SStreamDispatchReq SStreamDispatchReq;
typedef struct STokenBucket STokenBucket;
typedef struct SMetaHbInfo SMetaHbInfo;
typedef struct SDispatchMsgInfo {
SStreamDispatchReq* pData; // current dispatch data
@ -301,11 +303,10 @@ typedef struct SDispatchMsgInfo {
void* pTimer; // used to dispatch data after a given time duration
} SDispatchMsgInfo;
typedef struct STaskOutputInfo {
int8_t type;
typedef struct STaskOutputQueue {
int8_t status;
SStreamQueue* queue;
} STaskOutputInfo;
} STaskOutputQueue;
typedef struct STaskInputInfo {
int8_t status;
@ -348,29 +349,7 @@ typedef struct SHistoryTaskInfo {
int32_t waitInterval;
} SHistoryTaskInfo;
typedef struct STokenBucket STokenBucket;
typedef struct SMetaHbInfo SMetaHbInfo;
struct SStreamTask {
int64_t ver;
SStreamTaskId id;
SSTaskBasicInfo info;
STaskOutputInfo outputInfo;
STaskInputInfo inputInfo;
STaskSchedInfo schedInfo;
SDispatchMsgInfo msgInfo;
SStreamStatus status;
SCheckpointInfo chkInfo;
STaskExec exec;
SDataRange dataRange;
SHistoryTaskInfo hTaskInfo;
STaskId streamTaskId;
STaskExecStatisInfo execInfo;
SArray* pReadyMsgList; // SArray<SStreamChkptReadyInfo*>
TdThreadMutex lock; // secure the operation of set task status and puting data into inputQ
SArray* pUpstreamInfoList;
// output
typedef struct STaskOutputInfo {
union {
STaskDispatcherFixed fixedDispatcher;
STaskDispatcherShuffle shuffleDispatcher;
@ -379,11 +358,38 @@ struct SStreamTask {
STaskSinkFetch fetchSink;
};
void* pTimer; // timer for launch sink tasks
int8_t type;
STokenBucket* pTokenBucket;
SMsgCb* pMsgCb; // msg handle
SStreamState* pState; // state backend
SArray* pRspMsgList;
} STaskOutputInfo;
typedef struct SUpstreamInfo {
SArray* pList;
int32_t numOfClosed;
} SUpstreamInfo;
struct SStreamTask {
int64_t ver;
SStreamTaskId id;
SSTaskBasicInfo info;
STaskOutputQueue outputq;
STaskInputInfo inputInfo;
STaskSchedInfo schedInfo; // todo remove it
STaskOutputInfo outputInfo;
SDispatchMsgInfo msgInfo;
SStreamStatus status;
SCheckpointInfo chkInfo;
STaskExec exec;
SDataRange dataRange;
SHistoryTaskInfo hTaskInfo;
STaskId streamTaskId;
STaskExecStatisInfo execInfo;
SArray* pReadyMsgList; // SArray<SStreamChkptReadyInfo*>
TdThreadMutex lock; // secure the operation of set task status and puting data into inputQ
SMsgCb* pMsgCb; // msg handle
SStreamState* pState; // state backend
SArray* pRspMsgList;
SUpstreamInfo upstreamInfo;
// the followings attributes don't be serialized
int32_t notReadyTasks;
int32_t numOfWaitingUpstream;
@ -669,7 +675,6 @@ void tDeleteStreamDispatchReq(SStreamDispatchReq* pReq);
int32_t streamSetupScheduleTrigger(SStreamTask* pTask);
int32_t streamProcessRunReq(SStreamTask* pTask);
int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pMsg);
int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, int32_t code);
@ -677,7 +682,7 @@ int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieve
SStreamChildEpInfo* streamTaskGetUpstreamTaskEpInfo(SStreamTask* pTask, int32_t taskId);
void streamTaskInputFail(SStreamTask* pTask);
int32_t streamTryExec(SStreamTask* pTask);
int32_t streamExecTask(SStreamTask* pTask);
int32_t streamSchedExec(SStreamTask* pTask);
bool streamTaskShouldStop(const SStreamStatus* pStatus);
bool streamTaskShouldPause(const SStreamStatus* pStatus);
@ -693,6 +698,7 @@ int32_t streamTaskLaunchScanHistory(SStreamTask* pTask);
int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_t vgId, int64_t stage);
int32_t streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList);
void streamTaskResetUpstreamStageInfo(SStreamTask* pTask);
bool streamTaskAllUpstreamClosed(SStreamTask* pTask);
bool streamTaskSetSchedStatusWait(SStreamTask* pTask);
int8_t streamTaskSetSchedStatusActive(SStreamTask* pTask);
int8_t streamTaskSetSchedStatusInActive(SStreamTask* pTask);

View File

@ -87,15 +87,17 @@ END:
}
int32_t mndSetSinkTaskInfo(SStreamObj* pStream, SStreamTask* pTask) {
STaskOutputInfo* pInfo = &pTask->outputInfo;
if (pStream->smaId != 0) {
pTask->outputInfo.type = TASK_OUTPUT__SMA;
pTask->smaSink.smaId = pStream->smaId;
pInfo->type = TASK_OUTPUT__SMA;
pInfo->smaSink.smaId = pStream->smaId;
} else {
pTask->outputInfo.type = TASK_OUTPUT__TABLE;
pTask->tbSink.stbUid = pStream->targetStbUid;
memcpy(pTask->tbSink.stbFullName, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN);
pTask->tbSink.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema);
if (pTask->tbSink.pSchemaWrapper == NULL) {
pInfo->type = TASK_OUTPUT__TABLE;
pInfo->tbSink.stbUid = pStream->targetStbUid;
memcpy(pInfo->tbSink.stbFullName, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN);
pInfo->tbSink.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema);
if (pInfo->tbSink.pSchemaWrapper == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
}
@ -113,7 +115,7 @@ int32_t mndAddDispatcherForInternalTask(SMnode* pMnode, SStreamObj* pStream, SAr
isShuffle = true;
pTask->outputInfo.type = TASK_OUTPUT__SHUFFLE_DISPATCH;
pTask->msgInfo.msgType = TDMT_STREAM_TASK_DISPATCH;
if (mndExtractDbInfo(pMnode, pDb, &pTask->shuffleDispatcher.dbInfo, NULL) < 0) {
if (mndExtractDbInfo(pMnode, pDb, &pTask->outputInfo.shuffleDispatcher.dbInfo, NULL) < 0) {
return -1;
}
}
@ -124,8 +126,8 @@ int32_t mndAddDispatcherForInternalTask(SMnode* pMnode, SStreamObj* pStream, SAr
int32_t numOfSinkNodes = taosArrayGetSize(pSinkNodeList);
if (isShuffle) {
memcpy(pTask->shuffleDispatcher.stbFullName, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN);
SArray* pVgs = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
memcpy(pTask->outputInfo.shuffleDispatcher.stbFullName, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN);
SArray* pVgs = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos;
int32_t numOfVgroups = taosArrayGetSize(pVgs);
for (int32_t i = 0; i < numOfVgroups; i++) {

View File

@ -54,7 +54,7 @@ FAIL:
}
int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t nextProcessVer) {
ASSERT(pTask->info.taskLevel == TASK_LEVEL__AGG && taosArrayGetSize(pTask->pUpstreamInfoList) != 0);
ASSERT(pTask->info.taskLevel == TASK_LEVEL__AGG && taosArrayGetSize(pTask->upstreamInfo.pList) != 0);
int32_t code = streamTaskInit(pTask, pSnode->pMeta, &pSnode->msgCb, nextProcessVer);
if (code != TSDB_CODE_SUCCESS) {
return code;
@ -70,7 +70,7 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t nextProcessVer
qDebug("s-task:%s state:%p", pTask->id.idStr, pTask->pState);
}
int32_t numOfChildEp = taosArrayGetSize(pTask->pUpstreamInfoList);
int32_t numOfChildEp = taosArrayGetSize(pTask->upstreamInfo.pList);
SReadHandle handle = { .vnode = NULL, .numOfVgroups = numOfChildEp, .pStateBackend = pTask->pState, .fillHistory = pTask->info.fillHistory };
initStreamStateAPI(&handle.api);
@ -206,7 +206,7 @@ int32_t sndProcessTaskRunReq(SSnode *pSnode, SRpcMsg *pMsg) {
SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, pReq->streamId, pReq->taskId);
if (pTask) {
streamProcessRunReq(pTask);
streamExecTask(pTask);
streamMetaReleaseTask(pSnode->pMeta, pTask);
return 0;
} else {

View File

@ -788,7 +788,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t nextProcessVer) {
tqDebug("s-task:%s state:%p", pTask->id.idStr, pTask->pState);
}
int32_t numOfVgroups = (int32_t)taosArrayGetSize(pTask->pUpstreamInfoList);
int32_t numOfVgroups = (int32_t)taosArrayGetSize(pTask->upstreamInfo.pList);
SReadHandle handle = {
.checkpointId = pTask->chkInfo.checkpointId,
.vnode = NULL,
@ -809,27 +809,27 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t nextProcessVer) {
// sink
if (pTask->outputInfo.type == TASK_OUTPUT__SMA) {
pTask->smaSink.vnode = pTq->pVnode;
pTask->smaSink.smaSink = smaHandleRes;
pTask->outputInfo.smaSink.vnode = pTq->pVnode;
pTask->outputInfo.smaSink.smaSink = smaHandleRes;
} else if (pTask->outputInfo.type == TASK_OUTPUT__TABLE) {
pTask->tbSink.vnode = pTq->pVnode;
pTask->tbSink.tbSinkFunc = tqSinkDataIntoDstTable;
pTask->outputInfo.tbSink.vnode = pTq->pVnode;
pTask->outputInfo.tbSink.tbSinkFunc = tqSinkDataIntoDstTable;
int32_t ver1 = 1;
SMetaInfo info = {0};
code = metaGetInfo(pTq->pVnode->pMeta, pTask->tbSink.stbUid, &info, NULL);
code = metaGetInfo(pTq->pVnode->pMeta, pTask->outputInfo.tbSink.stbUid, &info, NULL);
if (code == TSDB_CODE_SUCCESS) {
ver1 = info.skmVer;
}
SSchemaWrapper* pschemaWrapper = pTask->tbSink.pSchemaWrapper;
pTask->tbSink.pTSchema = tBuildTSchema(pschemaWrapper->pSchema, pschemaWrapper->nCols, ver1);
if (pTask->tbSink.pTSchema == NULL) {
SSchemaWrapper* pschemaWrapper = pTask->outputInfo.tbSink.pSchemaWrapper;
pTask->outputInfo.tbSink.pTSchema = tBuildTSchema(pschemaWrapper->pSchema, pschemaWrapper->nCols, ver1);
if (pTask->outputInfo.tbSink.pTSchema == NULL) {
return -1;
}
pTask->tbSink.pTblInfo = tSimpleHashInit(10240, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
tSimpleHashSetFreeFp(pTask->tbSink.pTblInfo, freePtr);
pTask->outputInfo.tbSink.pTblInfo = tSimpleHashInit(10240, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
tSimpleHashSetFreeFp(pTask->outputInfo.tbSink.pTblInfo, freePtr);
}
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
@ -1193,7 +1193,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
streamGetTaskStatusStr(TASK_STATUS__PAUSE), streamGetTaskStatusStr(pTask->status.keepTaskStatus));
}
streamTryExec(pTask); // exec directly
streamExecTask(pTask); // exec directly
} else {
STimeWindow* pWindow = &pTask->dataRange.window;
tqDebug("s-task:%s level:%d verRange:%" PRId64 " - %" PRId64 " window:%" PRId64 "-%" PRId64
@ -1338,7 +1338,7 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
if (st == TASK_STATUS__NORMAL || st == TASK_STATUS__SCAN_HISTORY || st == TASK_STATUS__CK) {
tqDebug("vgId:%d s-task:%s start to process block from inputQ, next checked ver:%" PRId64, vgId, pTask->id.idStr,
pTask->chkInfo.nextProcessVer);
streamProcessRunReq(pTask);
streamExecTask(pTask);
} else {
int8_t status = streamTaskSetSchedStatusInActive(pTask);
tqDebug("vgId:%d s-task:%s ignore run req since not in ready state, status:%s, sched-status:%d", vgId,

View File

@ -142,7 +142,7 @@ static int32_t doBuildAndSendCreateTableMsg(SVnode* pVnode, char* stbFullName, S
int64_t suid) {
tqDebug("s-task:%s build create table msg", pTask->id.idStr);
STSchema* pTSchema = pTask->tbSink.pTSchema;
STSchema* pTSchema = pTask->outputInfo.tbSink.pTSchema;
int32_t rows = pDataBlock->info.rows;
SArray* tagArray = NULL;
int32_t code = 0;
@ -588,7 +588,7 @@ int32_t doConvertRows(SSubmitTbData* pTableData, STSchema* pTSchema, SSDataBlock
int32_t doWaitForDstTableCreated(SVnode* pVnode, SStreamTask* pTask, STableSinkInfo* pTableSinkInfo,
const char* dstTableName, int64_t* uid) {
int32_t vgId = TD_VID(pVnode);
int64_t suid = pTask->tbSink.stbUid;
int64_t suid = pTask->outputInfo.tbSink.stbUid;
const char* id = pTask->id.idStr;
while (pTableSinkInfo->uid == 0) {
@ -631,12 +631,12 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat
char* dstTableName = pDataBlock->info.parTbName;
int32_t numOfRows = pDataBlock->info.rows;
const char* id = pTask->id.idStr;
int64_t suid = pTask->tbSink.stbUid;
STSchema* pTSchema = pTask->tbSink.pTSchema;
int64_t suid = pTask->outputInfo.tbSink.stbUid;
STSchema* pTSchema = pTask->outputInfo.tbSink.pTSchema;
int32_t vgId = TD_VID(pVnode);
STableSinkInfo* pTableSinkInfo = NULL;
bool alreadyCached = tqGetTableInfo(pTask->tbSink.pTblInfo, groupId, &pTableSinkInfo);
bool alreadyCached = tqGetTableInfo(pTask->outputInfo.tbSink.pTblInfo, groupId, &pTableSinkInfo);
if (alreadyCached) {
if (dstTableName[0] == 0) { // data block does not set the destination table name
@ -702,7 +702,7 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat
}
pTableSinkInfo->uid = 0;
doPutIntoCache(pTask->tbSink.pTblInfo, pTableSinkInfo, groupId, id);
doPutIntoCache(pTask->outputInfo.tbSink.pTblInfo, pTableSinkInfo, groupId, id);
} else {
bool isValid = isValidDstChildTable(&mr, vgId, dstTableName, suid);
if (!isValid) {
@ -716,7 +716,7 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat
pTableSinkInfo->uid = mr.me.uid;
metaReaderClear(&mr);
doPutIntoCache(pTask->tbSink.pTblInfo, pTableSinkInfo, groupId, id);
doPutIntoCache(pTask->outputInfo.tbSink.pTblInfo, pTableSinkInfo, groupId, id);
}
}
}
@ -730,11 +730,11 @@ int32_t setDstTableDataPayload(SStreamTask* pTask, int32_t blockIndex, SSDataBlo
const char* id = pTask->id.idStr;
tqDebug("s-task:%s sink data pipeline, build submit msg from %dth resBlock, including %d rows, dst suid:%" PRId64,
id, blockIndex + 1, numOfRows, pTask->tbSink.stbUid);
id, blockIndex + 1, numOfRows, pTask->outputInfo.tbSink.stbUid);
char* dstTableName = pDataBlock->info.parTbName;
// convert all rows
int32_t code = doConvertRows(pTableData, pTask->tbSink.pTSchema, pDataBlock, id);
int32_t code = doConvertRows(pTableData, pTask->outputInfo.tbSink.pTSchema, pDataBlock, id);
if (code != TSDB_CODE_SUCCESS) {
tqError("s-task:%s failed to convert rows from result block, code:%s", id, tstrerror(terrno));
return code;
@ -759,9 +759,9 @@ bool hasOnlySubmitData(const SArray* pBlocks, int32_t numOfBlocks) {
void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) {
const SArray* pBlocks = (const SArray*)data;
SVnode* pVnode = (SVnode*)vnode;
int64_t suid = pTask->tbSink.stbUid;
char* stbFullName = pTask->tbSink.stbFullName;
STSchema* pTSchema = pTask->tbSink.pTSchema;
int64_t suid = pTask->outputInfo.tbSink.stbUid;
char* stbFullName = pTask->outputInfo.tbSink.stbFullName;
STSchema* pTSchema = pTask->outputInfo.tbSink.pTSchema;
int32_t vgId = TD_VID(pVnode);
int32_t numOfBlocks = taosArrayGetSize(pBlocks);
int32_t code = TSDB_CODE_SUCCESS;

View File

@ -126,7 +126,7 @@ int32_t streamSchedExec(SStreamTask* pTask) {
SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)};
tmsgPutToQueue(pTask->pMsgCb, STREAM_QUEUE, &msg);
} else {
stDebug("s-task:%s not launch task since sched status:%d", pTask->id.idStr, pTask->status.schedStatus);
stTrace("s-task:%s not launch task since sched status:%d", pTask->id.idStr, pTask->status.schedStatus);
}
return 0;
@ -239,8 +239,9 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S
status = TASK_INPUT_STATUS__BLOCKED;
} else {
// This task has received the checkpoint req from the upstream task, from which all the messages should be
// blocked
// blocked. Note that there is no race condition here.
if (pReq->type == STREAM_INPUT__CHECKPOINT_TRIGGER) {
atomic_add_fetch_32(&pTask->upstreamInfo.numOfClosed, 1);
streamTaskCloseUpstreamInput(pTask, pReq->upstreamTaskId);
stDebug("s-task:%s close inputQ for upstream:0x%x, msgId:%d", id, pReq->upstreamTaskId, pReq->msgId);
}
@ -274,13 +275,6 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S
return 0;
}
int32_t streamProcessRunReq(SStreamTask* pTask) {
if (streamTryExec(pTask) < 0) {
return -1;
}
return 0;
}
int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pRsp) {
streamTaskEnqueueRetrieve(pTask, pReq, pRsp);
ASSERT(pTask->info.taskLevel != TASK_LEVEL__SINK);
@ -291,15 +285,17 @@ int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, S
void streamTaskInputFail(SStreamTask* pTask) { atomic_store_8(&pTask->inputInfo.status, TASK_INPUT_STATUS__FAILED); }
void streamTaskOpenAllUpstreamInput(SStreamTask* pTask) {
int32_t num = taosArrayGetSize(pTask->pUpstreamInfoList);
int32_t num = taosArrayGetSize(pTask->upstreamInfo.pList);
if (num == 0) {
return;
}
for (int32_t i = 0; i < num; ++i) {
SStreamChildEpInfo* pInfo = taosArrayGetP(pTask->pUpstreamInfoList, i);
SStreamChildEpInfo* pInfo = taosArrayGetP(pTask->upstreamInfo.pList, i);
pInfo->dataAllowed = true;
}
pTask->upstreamInfo.numOfClosed = 0;
}
void streamTaskCloseUpstreamInput(SStreamTask* pTask, int32_t taskId) {
@ -310,9 +306,9 @@ void streamTaskCloseUpstreamInput(SStreamTask* pTask, int32_t taskId) {
}
SStreamChildEpInfo* streamTaskGetUpstreamTaskEpInfo(SStreamTask* pTask, int32_t taskId) {
int32_t num = taosArrayGetSize(pTask->pUpstreamInfoList);
int32_t num = taosArrayGetSize(pTask->upstreamInfo.pList);
for (int32_t i = 0; i < num; ++i) {
SStreamChildEpInfo* pInfo = taosArrayGetP(pTask->pUpstreamInfoList, i);
SStreamChildEpInfo* pInfo = taosArrayGetP(pTask->upstreamInfo.pList, i);
if (pInfo->taskId == taskId) {
return pInfo;
}

View File

@ -92,7 +92,7 @@ int32_t tDecodeStreamCheckpointReadyMsg(SDecoder* pDecoder, SStreamCheckpointRea
}
static int32_t streamAlignCheckpoint(SStreamTask* pTask) {
int32_t num = taosArrayGetSize(pTask->pUpstreamInfoList);
int32_t num = taosArrayGetSize(pTask->upstreamInfo.pList);
int64_t old = atomic_val_compare_exchange_32(&pTask->checkpointAlignCnt, 0, num);
if (old == 0) {
stDebug("s-task:%s set initial align upstream num:%d", pTask->id.idStr, num);
@ -153,7 +153,7 @@ static int32_t continueDispatchCheckpointBlock(SStreamDataBlock* pBlock, SStream
pBlock->srcTaskId = pTask->id.taskId;
pBlock->srcVgId = pTask->pMeta->vgId;
int32_t code = taosWriteQitem(pTask->outputInfo.queue->pQueue, pBlock);
int32_t code = taosWriteQitem(pTask->outputq.queue->pQueue, pBlock);
if (code == 0) {
streamDispatchStreamBlock(pTask);
} else {
@ -192,14 +192,14 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc
int32_t taskLevel = pTask->info.taskLevel;
if (taskLevel == TASK_LEVEL__SOURCE) {
if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
stDebug("s-task:%s set childIdx:%d, and add checkpoint block into outputQ", id, pTask->info.selfChildId);
stDebug("s-task:%s set childIdx:%d, and add checkpoint-trigger block into outputQ", id, pTask->info.selfChildId);
continueDispatchCheckpointBlock(pBlock, pTask);
} else { // only one task exists, no need to dispatch downstream info
streamProcessCheckpointReadyMsg(pTask);
streamFreeQitem((SStreamQueueItem*)pBlock);
}
} else if (taskLevel == TASK_LEVEL__SINK || taskLevel == TASK_LEVEL__AGG) {
ASSERT(taosArrayGetSize(pTask->pUpstreamInfoList) > 0);
ASSERT(taosArrayGetSize(pTask->upstreamInfo.pList) > 0);
if (pTask->chkInfo.startTs == 0) {
pTask->chkInfo.startTs = taosGetTimestampMs();
pTask->execInfo.checkpoint += 1;
@ -210,7 +210,7 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc
// there are still some upstream tasks not send checkpoint request, do nothing and wait for then
int32_t notReady = streamAlignCheckpoint(pTask);
int32_t num = taosArrayGetSize(pTask->pUpstreamInfoList);
int32_t num = taosArrayGetSize(pTask->upstreamInfo.pList);
if (notReady > 0) {
stDebug("s-task:%s received checkpoint block, idx:%d, %d upstream tasks not send checkpoint info yet, total:%d",
id, pTask->info.selfChildId, notReady, num);

View File

@ -195,11 +195,11 @@ int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock)
.retrieveLen = dataStrLen,
};
int32_t sz = taosArrayGetSize(pTask->pUpstreamInfoList);
int32_t sz = taosArrayGetSize(pTask->upstreamInfo.pList);
ASSERT(sz > 0);
for (int32_t i = 0; i < sz; i++) {
req.reqId = tGenIdPI64();
SStreamChildEpInfo* pEpInfo = taosArrayGetP(pTask->pUpstreamInfoList, i);
SStreamChildEpInfo* pEpInfo = taosArrayGetP(pTask->upstreamInfo.pList, i);
req.dstNodeId = pEpInfo->nodeId;
req.dstTaskId = pEpInfo->taskId;
int32_t len;
@ -288,7 +288,7 @@ void destroyDispatchMsg(SStreamDispatchReq* pReq, int32_t numOfVgroups) {
int32_t getNumOfDispatchBranch(SStreamTask* pTask) {
return (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH)
? 1
: taosArrayGetSize(pTask->shuffleDispatcher.dbInfo.pVgroupInfos);
: taosArrayGetSize(pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos);
}
static int32_t doBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* pData) {
@ -301,7 +301,7 @@ static int32_t doBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* pD
if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
SStreamDispatchReq* pReq = taosMemoryCalloc(1, sizeof(SStreamDispatchReq));
int32_t downstreamTaskId = pTask->fixedDispatcher.taskId;
int32_t downstreamTaskId = pTask->outputInfo.fixedDispatcher.taskId;
code = tInitStreamDispatchReq(pReq, pTask, pData->srcVgId, numOfBlocks, downstreamTaskId, pData->type);
if (code != TSDB_CODE_SUCCESS) {
return code;
@ -318,10 +318,10 @@ static int32_t doBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* pD
pTask->msgInfo.pData = pReq;
} else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
int32_t rspCnt = atomic_load_32(&pTask->shuffleDispatcher.waitingRspCnt);
int32_t rspCnt = atomic_load_32(&pTask->outputInfo.shuffleDispatcher.waitingRspCnt);
ASSERT(rspCnt == 0);
SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos;
int32_t numOfVgroups = taosArrayGetSize(vgInfo);
SStreamDispatchReq* pReqs = taosMemoryCalloc(numOfVgroups, sizeof(SStreamDispatchReq));
@ -352,7 +352,7 @@ static int32_t doBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* pD
}
if (pReqs[j].blockNum == 0) {
atomic_add_fetch_32(&pTask->shuffleDispatcher.waitingRspCnt, 1);
atomic_add_fetch_32(&pTask->outputInfo.shuffleDispatcher.waitingRspCnt, 1);
}
pReqs[j].blockNum++;
@ -381,16 +381,16 @@ static int32_t sendDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pDispatch
const char* id = pTask->id.idStr;
if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
int32_t vgId = pTask->fixedDispatcher.nodeId;
SEpSet* pEpSet = &pTask->fixedDispatcher.epSet;
int32_t downstreamTaskId = pTask->fixedDispatcher.taskId;
int32_t vgId = pTask->outputInfo.fixedDispatcher.nodeId;
SEpSet* pEpSet = &pTask->outputInfo.fixedDispatcher.epSet;
int32_t downstreamTaskId = pTask->outputInfo.fixedDispatcher.taskId;
stDebug("s-task:%s (child taskId:%d) fix-dispatch %d block(s) to s-task:0x%x (vgId:%d), id:%d", id,
pTask->info.selfChildId, 1, downstreamTaskId, vgId, msgId);
code = doSendDispatchMsg(pTask, pDispatchMsg, vgId, pEpSet);
} else {
SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos;
int32_t numOfVgroups = taosArrayGetSize(vgInfo);
stDebug("s-task:%s (child taskId:%d) start to shuffle-dispatch blocks to %d vgroup(s), msgId:%d",
@ -421,12 +421,12 @@ static void doRetryDispatchData(void* param, void* tmrId) {
int32_t msgId = pTask->execInfo.dispatch;
if (streamTaskShouldStop(&pTask->status)) {
int8_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s should stop, abort from timer, ref:%d", pTask->id.idStr, ref);
return;
}
ASSERT(pTask->outputInfo.status == TASK_OUTPUT_STATUS__WAIT);
ASSERT(pTask->outputq.status == TASK_OUTPUT_STATUS__WAIT);
int32_t code = 0;
{
@ -436,7 +436,7 @@ static void doRetryDispatchData(void* param, void* tmrId) {
SStreamDispatchReq *pReq = pTask->msgInfo.pData;
if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos;
int32_t numOfVgroups = taosArrayGetSize(vgInfo);
int32_t numOfFailed = taosArrayGetSize(pList);
@ -462,9 +462,9 @@ static void doRetryDispatchData(void* param, void* tmrId) {
stDebug("s-task:%s complete re-try shuffle-dispatch blocks to all %d vnodes, msgId:%d", pTask->id.idStr, numOfFailed, msgId);
} else {
int32_t vgId = pTask->fixedDispatcher.nodeId;
SEpSet* pEpSet = &pTask->fixedDispatcher.epSet;
int32_t downstreamTaskId = pTask->fixedDispatcher.taskId;
int32_t vgId = pTask->outputInfo.fixedDispatcher.nodeId;
SEpSet* pEpSet = &pTask->outputInfo.fixedDispatcher.epSet;
int32_t downstreamTaskId = pTask->outputInfo.fixedDispatcher.taskId;
stDebug("s-task:%s (child taskId:%d) fix-dispatch %d block(s) to s-task:0x%x (vgId:%d), id:%d", id,
pTask->info.selfChildId, 1, downstreamTaskId, vgId, msgId);
@ -476,7 +476,7 @@ static void doRetryDispatchData(void* param, void* tmrId) {
if (code != TSDB_CODE_SUCCESS) {
if (!streamTaskShouldStop(&pTask->status)) {
// stDebug("s-task:%s reset the waitRspCnt to be 0 before launch retry dispatch", pTask->id.idStr);
// atomic_store_32(&pTask->shuffleDispatcher.waitingRspCnt, 0);
// atomic_store_32(&pTask->outputInfo.shuffleDispatcher.waitingRspCnt, 0);
if (streamTaskShouldPause(&pTask->status)) {
streamRetryDispatchData(pTask, DISPATCH_RETRY_INTERVAL_MS * 10);
} else {
@ -487,7 +487,7 @@ static void doRetryDispatchData(void* param, void* tmrId) {
stDebug("s-task:%s should stop, abort from timer, ref:%d", pTask->id.idStr, ref);
}
} else {
int8_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s send success, jump out of timer, ref:%d", pTask->id.idStr, ref);
}
}
@ -508,7 +508,7 @@ void streamRetryDispatchData(SStreamTask* pTask, int64_t waitDuration) {
int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, SSDataBlock* pDataBlock, int32_t vgSz,
int64_t groupId) {
uint32_t hashValue = 0;
SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos;
if (pTask->pNameMap == NULL) {
pTask->pNameMap = tSimpleHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
}
@ -528,14 +528,14 @@ int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, S
}
if (pDataBlock->info.parTbName[0]) {
snprintf(ctbName, TSDB_TABLE_NAME_LEN, "%s.%s", pTask->shuffleDispatcher.dbInfo.db, pDataBlock->info.parTbName);
snprintf(ctbName, TSDB_TABLE_NAME_LEN, "%s.%s", pTask->outputInfo.shuffleDispatcher.dbInfo.db, pDataBlock->info.parTbName);
} else {
buildCtbNameByGroupIdImpl(pTask->shuffleDispatcher.stbFullName, groupId, pDataBlock->info.parTbName);
snprintf(ctbName, TSDB_TABLE_NAME_LEN, "%s.%s", pTask->shuffleDispatcher.dbInfo.db, pDataBlock->info.parTbName);
buildCtbNameByGroupIdImpl(pTask->outputInfo.shuffleDispatcher.stbFullName, groupId, pDataBlock->info.parTbName);
snprintf(ctbName, TSDB_TABLE_NAME_LEN, "%s.%s", pTask->outputInfo.shuffleDispatcher.dbInfo.db, pDataBlock->info.parTbName);
}
/*uint32_t hashValue = MurmurHash3_32(ctbName, strlen(ctbName));*/
SUseDbRsp* pDbInfo = &pTask->shuffleDispatcher.dbInfo;
SUseDbRsp* pDbInfo = &pTask->outputInfo.shuffleDispatcher.dbInfo;
hashValue =
taosGetTbHashVal(ctbName, strlen(ctbName), pDbInfo->hashMethod, pDbInfo->hashPrefix, pDbInfo->hashSuffix);
taosMemoryFree(ctbName);
@ -560,7 +560,7 @@ int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, S
}
if (pReqs[j].blockNum == 0) {
atomic_add_fetch_32(&pTask->shuffleDispatcher.waitingRspCnt, 1);
atomic_add_fetch_32(&pTask->outputInfo.shuffleDispatcher.waitingRspCnt, 1);
}
pReqs[j].blockNum++;
@ -576,27 +576,27 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
ASSERT((pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH));
const char* id = pTask->id.idStr;
int32_t numOfElems = streamQueueGetNumOfItems(pTask->outputInfo.queue);
int32_t numOfElems = streamQueueGetNumOfItems(pTask->outputq.queue);
if (numOfElems > 0) {
double size = SIZE_IN_MiB(taosQueueMemorySize(pTask->outputInfo.queue->pQueue));
double size = SIZE_IN_MiB(taosQueueMemorySize(pTask->outputq.queue->pQueue));
stDebug("s-task:%s start to dispatch intermediate block to downstream, elem in outputQ:%d, size:%.2fMiB", id, numOfElems, size);
}
// to make sure only one dispatch is running
int8_t old =
atomic_val_compare_exchange_8(&pTask->outputInfo.status, TASK_OUTPUT_STATUS__NORMAL, TASK_OUTPUT_STATUS__WAIT);
atomic_val_compare_exchange_8(&pTask->outputq.status, TASK_OUTPUT_STATUS__NORMAL, TASK_OUTPUT_STATUS__WAIT);
if (old != TASK_OUTPUT_STATUS__NORMAL) {
stDebug("s-task:%s wait for dispatch rsp, not dispatch now, output status:%d", id, old);
return 0;
}
ASSERT(pTask->msgInfo.pData == NULL);
stDebug("s-task:%s start to dispatch msg, set output status:%d", id, pTask->outputInfo.status);
stDebug("s-task:%s start to dispatch msg, set output status:%d", id, pTask->outputq.status);
SStreamDataBlock* pBlock = streamQueueNextItem(pTask->outputInfo.queue);
SStreamDataBlock* pBlock = streamQueueNextItem(pTask->outputq.queue);
if (pBlock == NULL) {
atomic_store_8(&pTask->outputInfo.status, TASK_OUTPUT_STATUS__NORMAL);
stDebug("s-task:%s not dispatch since no elems in outputQ, output status:%d", id, pTask->outputInfo.status);
atomic_store_8(&pTask->outputq.status, TASK_OUTPUT_STATUS__NORMAL);
stDebug("s-task:%s not dispatch since no elems in outputQ, output status:%d", id, pTask->outputq.status);
return 0;
}
@ -620,10 +620,10 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
}
stDebug("s-task:%s failed to dispatch msg:%d to downstream, code:%s, output status:%d, retry cnt:%d", id,
pTask->execInfo.dispatch, tstrerror(terrno), pTask->outputInfo.status, retryCount);
pTask->execInfo.dispatch, tstrerror(terrno), pTask->outputq.status, retryCount);
// todo deal with only partially success dispatch case
atomic_store_32(&pTask->shuffleDispatcher.waitingRspCnt, 0);
atomic_store_32(&pTask->outputInfo.shuffleDispatcher.waitingRspCnt, 0);
if (terrno == TSDB_CODE_APP_IS_STOPPING) { // in case of this error, do not retry anymore
destroyDispatchMsg(pTask->msgInfo.pData, getNumOfDispatchBranch(pTask));
pTask->msgInfo.pData = NULL;
@ -631,7 +631,7 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
}
if (++retryCount > MAX_CONTINUE_RETRY_COUNT) { // add to timer to retry
int8_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s failed to dispatch msg to downstream for %d times, code:%s, add timer to retry in %dms, ref:%d",
pTask->id.idStr, retryCount, tstrerror(terrno), DISPATCH_RETRY_INTERVAL_MS, ref);
@ -654,11 +654,11 @@ int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask) {
// serialize
if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
req.downstreamTaskId = pTask->fixedDispatcher.taskId;
req.downstreamTaskId = pTask->outputInfo.fixedDispatcher.taskId;
pTask->notReadyTasks = 1;
doDispatchScanHistoryFinishMsg(pTask, &req, pTask->fixedDispatcher.nodeId, &pTask->fixedDispatcher.epSet);
doDispatchScanHistoryFinishMsg(pTask, &req, pTask->outputInfo.fixedDispatcher.nodeId, &pTask->outputInfo.fixedDispatcher.epSet);
} else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos;
int32_t numOfVgs = taosArrayGetSize(vgInfo);
pTask->notReadyTasks = numOfVgs;
@ -680,7 +680,7 @@ int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask) {
// this function is usually invoked by sink/agg task
int32_t streamTaskSendCheckpointReadyMsg(SStreamTask* pTask) {
int32_t num = taosArrayGetSize(pTask->pReadyMsgList);
ASSERT(taosArrayGetSize(pTask->pUpstreamInfoList) == num);
ASSERT(taosArrayGetSize(pTask->upstreamInfo.pList) == num);
for (int32_t i = 0; i < num; ++i) {
SStreamChkptReadyInfo* pInfo = taosArrayGet(pTask->pReadyMsgList, i);
@ -1049,7 +1049,7 @@ static int32_t handleDispatchSuccessRsp(SStreamTask* pTask, int32_t downstreamId
}
// now ready for next data output
atomic_store_8(&pTask->outputInfo.status, TASK_OUTPUT_STATUS__NORMAL);
atomic_store_8(&pTask->outputq.status, TASK_OUTPUT_STATUS__NORMAL);
// otherwise, continue dispatch the first block to down stream task in pipeline
streamDispatchStreamBlock(pTask);
@ -1061,11 +1061,13 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
int32_t vgId = pTask->pMeta->vgId;
int32_t msgId = pTask->execInfo.dispatch;
// follower not handle the dispatch rsp
if ((pTask->pMeta->role == NODE_ROLE_FOLLOWER) || (pTask->status.downstreamReady != 1)) {
stError("s-task:%s vgId:%d is follower or task just re-launched, not handle the dispatch rsp, discard it", id, vgId);
return TSDB_CODE_STREAM_TASK_NOT_EXIST;
}
// discard invalid dispatch rsp msg
if ((pRsp->msgId != msgId) || (pRsp->stage != pTask->pMeta->stage)) {
stError("s-task:%s vgId:%d not expect rsp, expected: msgId:%d, stage:%" PRId64 " actual msgId:%d, stage:%" PRId64
" discard it",
@ -1107,7 +1109,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
int32_t leftRsp = 0;
if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
leftRsp = atomic_sub_fetch_32(&pTask->shuffleDispatcher.waitingRspCnt, 1);
leftRsp = atomic_sub_fetch_32(&pTask->outputInfo.shuffleDispatcher.waitingRspCnt, 1);
ASSERT(leftRsp >= 0);
if (leftRsp > 0) {
@ -1127,17 +1129,17 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
// all msg rsp already, continue
if (leftRsp == 0) {
ASSERT(pTask->outputInfo.status == TASK_OUTPUT_STATUS__WAIT);
ASSERT(pTask->outputq.status == TASK_OUTPUT_STATUS__WAIT);
// we need to re-try send dispatch msg to downstream tasks
int32_t numOfFailed = taosArrayGetSize(pTask->msgInfo.pRetryList);
if (numOfFailed > 0) {
if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
atomic_store_32(&pTask->shuffleDispatcher.waitingRspCnt, numOfFailed);
stDebug("s-task:%s waiting rsp set to be %d", id, pTask->shuffleDispatcher.waitingRspCnt);
atomic_store_32(&pTask->outputInfo.shuffleDispatcher.waitingRspCnt, numOfFailed);
stDebug("s-task:%s waiting rsp set to be %d", id, pTask->outputInfo.shuffleDispatcher.waitingRspCnt);
}
int8_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s failed to dispatch msg to downstream code:%s, add timer to retry in %dms, ref:%d",
pTask->id.idStr, tstrerror(terrno), DISPATCH_RETRY_INTERVAL_MS, ref);
@ -1155,7 +1157,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
}
// now ready for next data output
atomic_store_8(&pTask->outputInfo.status, TASK_OUTPUT_STATUS__NORMAL);
atomic_store_8(&pTask->outputq.status, TASK_OUTPUT_STATUS__NORMAL);
} else {
handleDispatchSuccessRsp(pTask, pRsp->downstreamTaskId);
}

View File

@ -36,10 +36,10 @@ static int32_t doOutputResultBlockImpl(SStreamTask* pTask, SStreamDataBlock* pBl
int32_t code = 0;
int32_t type = pTask->outputInfo.type;
if (type == TASK_OUTPUT__TABLE) {
pTask->tbSink.tbSinkFunc(pTask, pTask->tbSink.vnode, pBlock->blocks);
pTask->outputInfo.tbSink.tbSinkFunc(pTask, pTask->outputInfo.tbSink.vnode, pBlock->blocks);
destroyStreamDataBlock(pBlock);
} else if (type == TASK_OUTPUT__SMA) {
pTask->smaSink.smaSink(pTask->smaSink.vnode, pTask->smaSink.smaId, pBlock->blocks);
pTask->outputInfo.smaSink.smaSink(pTask->outputInfo.smaSink.vnode, pTask->outputInfo.smaSink.smaId, pBlock->blocks);
destroyStreamDataBlock(pBlock);
} else {
ASSERT(type == TASK_OUTPUT__FIXED_DISPATCH || type == TASK_OUTPUT__SHUFFLE_DISPATCH);
@ -487,7 +487,7 @@ int32_t streamProcessTranstateBlock(SStreamTask* pTask, SStreamDataBlock* pBlock
// agg task should dispatch trans-state msg to sink task, to flush all data to sink task.
if (level == TASK_LEVEL__AGG || level == TASK_LEVEL__SOURCE) {
pBlock->srcVgId = pTask->pMeta->vgId;
code = taosWriteQitem(pTask->outputInfo.queue->pQueue, pBlock);
code = taosWriteQitem(pTask->outputq.queue->pQueue, pBlock);
if (code == 0) {
streamDispatchStreamBlock(pTask);
} else {
@ -607,7 +607,7 @@ bool streamTaskIsIdle(const SStreamTask* pTask) {
pTask->status.taskStatus == TASK_STATUS__DROPPING);
}
int32_t streamTryExec(SStreamTask* pTask) {
int32_t streamExecTask(SStreamTask* pTask) {
// this function may be executed by multi-threads, so status check is required.
const char* id = pTask->id.idStr;
@ -615,7 +615,7 @@ int32_t streamTryExec(SStreamTask* pTask) {
if (schedStatus == TASK_SCHED_STATUS__WAITING) {
while (1) {
int32_t code = streamExecForAll(pTask);
if (code < 0) { // todo this status shoudl be removed
if (code < 0) { // todo this status should be removed
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__FAILED);
return -1;
}
@ -663,7 +663,7 @@ int32_t streamTaskReloadState(SStreamTask* pTask) {
}
int32_t streamAlignTransferState(SStreamTask* pTask) {
int32_t numOfUpstream = taosArrayGetSize(pTask->pUpstreamInfoList);
int32_t numOfUpstream = taosArrayGetSize(pTask->upstreamInfo.pList);
int32_t old = atomic_val_compare_exchange_32(&pTask->transferStateAlignCnt, 0, numOfUpstream);
if (old == 0) {
stDebug("s-task:%s set the transfer state aligncnt %d", pTask->id.idStr, numOfUpstream);

View File

@ -892,7 +892,7 @@ void metaHbToMnode(void* param, void* tmrId) {
entry.inputRate = entry.inputQUsed*100.0/STREAM_TASK_QUEUE_CAPACITY_IN_SIZE;
if ((*pTask)->info.taskLevel == TASK_LEVEL__SINK) {
entry.sinkQuota = (*pTask)->pTokenBucket->quotaRate;
entry.sinkQuota = (*pTask)->outputInfo.pTokenBucket->quotaRate;
entry.sinkDataSize = SIZE_IN_MiB((*pTask)->execInfo.sink.dataSize);
}

View File

@ -18,6 +18,7 @@
#define MAX_STREAM_EXEC_BATCH_NUM 32
#define MAX_SMOOTH_BURST_RATIO 5 // 5 sec
#define WAIT_FOR_DURATION 40
#define SINK_TASK_IDLE_DURATION 200 // 200 ms
// todo refactor:
// read data from input queue
@ -154,6 +155,10 @@ const char* streamQueueItemGetTypeStr(int32_t type) {
}
}
static void doLaunchSinkTask(void* param, void* tmrId) {
}
int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks,
int32_t* blockSize) {
int32_t retryTimes = 0;
@ -166,8 +171,21 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu
*blockSize = 0;
// no available token in bucket for sink task, let's wait for a little bit
if (taskLevel == TASK_LEVEL__SINK && (!streamTaskExtractAvailableToken(pTask->pTokenBucket, pTask->id.idStr))) {
if (taskLevel == TASK_LEVEL__SINK && (!streamTaskExtractAvailableToken(pTask->outputInfo.pTokenBucket, pTask->id.idStr))) {
stDebug("s-task:%s no available token in bucket for sink data, wait", id);
// if (streamTaskAllUpstreamClosed(pTask)) {
// int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
// stDebug("s-task:%s try start task in %dms in tmr, since all upstream inputQ is closed, ref:%d", pTask->id.idStr,
// SINK_TASK_IDLE_DURATION, ref);
//
// if (pTask->outputInfo.pTimer == NULL) {
// pTask->outputInfo.pTimer = taosTmrStart(doLaunchSinkTask, SINK_TASK_IDLE_DURATION, pTask, streamEnv.timer);
// } else {
// taosTmrReset(doLaunchSinkTask, SINK_TASK_IDLE_DURATION, pTask, streamEnv.timer, &pTask->outputInfo.pTimer);
// }
// }
return TSDB_CODE_SUCCESS;
}
@ -188,10 +206,10 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu
if (*numOfBlocks > 0) {
*blockSize = streamQueueItemGetSize(*pInput);
if (taskLevel == TASK_LEVEL__SINK) {
streamTaskConsumeQuota(pTask->pTokenBucket, *blockSize);
streamTaskConsumeQuota(pTask->outputInfo.pTokenBucket, *blockSize);
}
} else {
streamTaskPutbackToken(pTask->pTokenBucket);
streamTaskPutbackToken(pTask->outputInfo.pTokenBucket);
}
return TSDB_CODE_SUCCESS;
@ -207,7 +225,7 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu
stDebug("s-task:%s %s msg extracted, start to process immediately", id, p);
// restore the token to bucket in case of checkpoint/trans-state msg
streamTaskPutbackToken(pTask->pTokenBucket);
streamTaskPutbackToken(pTask->outputInfo.pTokenBucket);
*blockSize = 0;
*numOfBlocks = 1;
*pInput = qItem;
@ -216,7 +234,7 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu
stDebug("s-task:%s %s msg extracted, handle previous blocks, numOfBlocks:%d", id, p, *numOfBlocks);
*blockSize = streamQueueItemGetSize(*pInput);
if (taskLevel == TASK_LEVEL__SINK) {
streamTaskConsumeQuota(pTask->pTokenBucket, *blockSize);
streamTaskConsumeQuota(pTask->outputInfo.pTokenBucket, *blockSize);
}
streamQueueProcessFail(pTask->inputInfo.queue);
@ -237,7 +255,7 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu
*blockSize = streamQueueItemGetSize(*pInput);
if (taskLevel == TASK_LEVEL__SINK) {
streamTaskConsumeQuota(pTask->pTokenBucket, *blockSize);
streamTaskConsumeQuota(pTask->outputInfo.pTokenBucket, *blockSize);
}
streamQueueProcessFail(pTask->inputInfo.queue);
@ -255,7 +273,7 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu
*blockSize = streamQueueItemGetSize(*pInput);
if (taskLevel == TASK_LEVEL__SINK) {
streamTaskConsumeQuota(pTask->pTokenBucket, *blockSize);
streamTaskConsumeQuota(pTask->outputInfo.pTokenBucket, *blockSize);
}
return TSDB_CODE_SUCCESS;
@ -350,15 +368,15 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem)
// the result should be put into the outputQ in any cases, otherwise, the result may be lost
int32_t streamTaskPutDataIntoOutputQ(SStreamTask* pTask, SStreamDataBlock* pBlock) {
STaosQueue* pQueue = pTask->outputInfo.queue->pQueue;
STaosQueue* pQueue = pTask->outputq.queue->pQueue;
while (streamQueueIsFull(pTask->outputInfo.queue)) {
while (streamQueueIsFull(pTask->outputq.queue)) {
if (streamTaskShouldStop(&pTask->status)) {
stInfo("s-task:%s discard result block due to task stop", pTask->id.idStr);
return TSDB_CODE_STREAM_EXEC_CANCELLED;
}
int32_t total = streamQueueGetNumOfItems(pTask->outputInfo.queue);
int32_t total = streamQueueGetNumOfItems(pTask->outputq.queue);
double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue));
// let's wait for there are enough space to hold this result pBlock
stDebug("s-task:%s outputQ is full, wait for 500ms and retry, outputQ items:%d, size:%.2fMiB", pTask->id.idStr,
@ -368,7 +386,7 @@ int32_t streamTaskPutDataIntoOutputQ(SStreamTask* pTask, SStreamDataBlock* pBloc
int32_t code = taosWriteQitem(pQueue, pBlock);
int32_t total = streamQueueGetNumOfItems(pTask->outputInfo.queue);
int32_t total = streamQueueGetNumOfItems(pTask->outputq.queue);
double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue));
if (code != 0) {
stError("s-task:%s failed to put res into outputQ, outputQ items:%d, size:%.2fMiB code:%s, result lost",

View File

@ -40,7 +40,7 @@ static void streamTaskSetReady(SStreamTask* pTask, int32_t numOfReqs) {
int32_t vgId = pMeta->vgId;
if (pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY && pTask->info.taskLevel != TASK_LEVEL__SOURCE) {
pTask->numOfWaitingUpstream = taosArrayGetSize(pTask->pUpstreamInfoList);
pTask->numOfWaitingUpstream = taosArrayGetSize(pTask->upstreamInfo.pList);
stDebug("s-task:%s level:%d task wait for %d upstream tasks complete scan-history procedure, status:%s",
pTask->id.idStr, pTask->info.taskLevel, pTask->numOfWaitingUpstream,
streamGetTaskStatusStr(pTask->status.taskStatus));
@ -144,8 +144,8 @@ static int32_t doCheckDownstreamStatus(SStreamTask* pTask) {
// serialize streamProcessScanHistoryFinishRsp
if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
req.reqId = tGenIdPI64();
req.downstreamNodeId = pTask->fixedDispatcher.nodeId;
req.downstreamTaskId = pTask->fixedDispatcher.taskId;
req.downstreamNodeId = pTask->outputInfo.fixedDispatcher.nodeId;
req.downstreamTaskId = pTask->outputInfo.fixedDispatcher.taskId;
pTask->checkReqId = req.reqId;
stDebug("s-task:%s (vgId:%d) stage:%" PRId64 " check single downstream task:0x%x(vgId:%d) ver:%" PRId64 "-%" PRId64
@ -153,9 +153,9 @@ static int32_t doCheckDownstreamStatus(SStreamTask* pTask) {
pTask->id.idStr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId,
pRange->range.minVer, pRange->range.maxVer, pWindow->skey, pWindow->ekey, req.reqId);
streamSendCheckMsg(pTask, &req, pTask->fixedDispatcher.nodeId, &pTask->fixedDispatcher.epSet);
streamSendCheckMsg(pTask, &req, pTask->outputInfo.fixedDispatcher.nodeId, &pTask->outputInfo.fixedDispatcher.epSet);
} else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos;
int32_t numOfVgs = taosArrayGetSize(vgInfo);
pTask->notReadyTasks = numOfVgs;
@ -225,9 +225,9 @@ static void recheckDownstreamTasks(void* param, void* tmrId) {
if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
stDebug("s-task:%s (vgId:%d) check downstream task:0x%x (vgId:%d) stage:%" PRId64 " (recheck)", pTask->id.idStr,
pTask->info.nodeId, pReq->downstreamTaskId, pReq->downstreamNodeId, pReq->stage);
streamSendCheckMsg(pTask, pReq, pReq->downstreamNodeId, &pTask->fixedDispatcher.epSet);
streamSendCheckMsg(pTask, pReq, pReq->downstreamNodeId, &pTask->outputInfo.fixedDispatcher.epSet);
} else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos;
int32_t numOfVgs = taosArrayGetSize(vgInfo);
for (int32_t i = 0; i < numOfVgs; i++) {
@ -241,7 +241,7 @@ static void recheckDownstreamTasks(void* param, void* tmrId) {
}
destroyRecheckInfo(pInfo);
int8_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s complete send check in timer, ref:%d", pTask->id.idStr, ref);
}
@ -341,7 +341,7 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
doProcessDownstreamReadyRsp(pTask, numOfReqs);
} else {
int32_t total = taosArrayGetSize(pTask->shuffleDispatcher.dbInfo.pVgroupInfos);
int32_t total = taosArrayGetSize(pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos);
stDebug("s-task:%s (vgId:%d) recv check rsp from task:0x%x (vgId:%d) status:%d, total:%d not ready:%d", id,
pRsp->upstreamNodeId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->status, total, left);
}
@ -367,7 +367,7 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
} else {
STaskRecheckInfo* pInfo = createRecheckInfo(pTask, pRsp);
int8_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s downstream taskId:0x%x (vgId:%d) not ready, stage:%d, retry in 100ms, ref:%d ", id,
pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->oldStage, ref);
@ -528,7 +528,7 @@ int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, SStreamScanHistory
ASSERT(left >= 0);
if (left == 0) {
int32_t numOfTasks = taosArrayGetSize(pTask->pUpstreamInfoList);
int32_t numOfTasks = taosArrayGetSize(pTask->upstreamInfo.pList);
stDebug(
"s-task:%s all %d upstream tasks finish scan-history data, set param for agg task for stream data and send "
"rsp to all upstream tasks",
@ -640,7 +640,7 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) {
}
if (pHTaskInfo->retryTimes > MAX_RETRY_LAUNCH_HISTORY_TASK) {
int8_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
streamMetaReleaseTask(pMeta, pTask);
stError("s-task:%s max retry:%d reached, quit from retrying launch related fill-history task:0x%x, ref:%d",
@ -672,7 +672,7 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) {
}
// not in timer anymore
int8_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:0x%x fill-history task launch completed, retry times:%d, ref:%d", (int32_t)pInfo->id.taskId,
pHTaskInfo->retryTimes, ref);
streamMetaReleaseTask(pMeta, pTask);