refactor: do some internal refactor.
This commit is contained in:
parent
0e8a8aabc1
commit
ab17bfdf09
|
@ -290,6 +290,8 @@ typedef struct SSTaskBasicInfo {
|
|||
} SSTaskBasicInfo;
|
||||
|
||||
typedef struct SStreamDispatchReq SStreamDispatchReq;
|
||||
typedef struct STokenBucket STokenBucket;
|
||||
typedef struct SMetaHbInfo SMetaHbInfo;
|
||||
|
||||
typedef struct SDispatchMsgInfo {
|
||||
SStreamDispatchReq* pData; // current dispatch data
|
||||
|
@ -301,11 +303,10 @@ typedef struct SDispatchMsgInfo {
|
|||
void* pTimer; // used to dispatch data after a given time duration
|
||||
} SDispatchMsgInfo;
|
||||
|
||||
typedef struct STaskOutputInfo {
|
||||
int8_t type;
|
||||
typedef struct STaskOutputQueue {
|
||||
int8_t status;
|
||||
SStreamQueue* queue;
|
||||
} STaskOutputInfo;
|
||||
} STaskOutputQueue;
|
||||
|
||||
typedef struct STaskInputInfo {
|
||||
int8_t status;
|
||||
|
@ -348,29 +349,7 @@ typedef struct SHistoryTaskInfo {
|
|||
int32_t waitInterval;
|
||||
} SHistoryTaskInfo;
|
||||
|
||||
typedef struct STokenBucket STokenBucket;
|
||||
typedef struct SMetaHbInfo SMetaHbInfo;
|
||||
|
||||
struct SStreamTask {
|
||||
int64_t ver;
|
||||
SStreamTaskId id;
|
||||
SSTaskBasicInfo info;
|
||||
STaskOutputInfo outputInfo;
|
||||
STaskInputInfo inputInfo;
|
||||
STaskSchedInfo schedInfo;
|
||||
SDispatchMsgInfo msgInfo;
|
||||
SStreamStatus status;
|
||||
SCheckpointInfo chkInfo;
|
||||
STaskExec exec;
|
||||
SDataRange dataRange;
|
||||
SHistoryTaskInfo hTaskInfo;
|
||||
STaskId streamTaskId;
|
||||
STaskExecStatisInfo execInfo;
|
||||
SArray* pReadyMsgList; // SArray<SStreamChkptReadyInfo*>
|
||||
TdThreadMutex lock; // secure the operation of set task status and puting data into inputQ
|
||||
SArray* pUpstreamInfoList;
|
||||
|
||||
// output
|
||||
typedef struct STaskOutputInfo {
|
||||
union {
|
||||
STaskDispatcherFixed fixedDispatcher;
|
||||
STaskDispatcherShuffle shuffleDispatcher;
|
||||
|
@ -379,11 +358,38 @@ struct SStreamTask {
|
|||
STaskSinkFetch fetchSink;
|
||||
};
|
||||
|
||||
void* pTimer; // timer for launch sink tasks
|
||||
int8_t type;
|
||||
STokenBucket* pTokenBucket;
|
||||
SMsgCb* pMsgCb; // msg handle
|
||||
SStreamState* pState; // state backend
|
||||
SArray* pRspMsgList;
|
||||
} STaskOutputInfo;
|
||||
|
||||
typedef struct SUpstreamInfo {
|
||||
SArray* pList;
|
||||
int32_t numOfClosed;
|
||||
} SUpstreamInfo;
|
||||
|
||||
struct SStreamTask {
|
||||
int64_t ver;
|
||||
SStreamTaskId id;
|
||||
SSTaskBasicInfo info;
|
||||
STaskOutputQueue outputq;
|
||||
STaskInputInfo inputInfo;
|
||||
STaskSchedInfo schedInfo; // todo remove it
|
||||
STaskOutputInfo outputInfo;
|
||||
SDispatchMsgInfo msgInfo;
|
||||
SStreamStatus status;
|
||||
SCheckpointInfo chkInfo;
|
||||
STaskExec exec;
|
||||
SDataRange dataRange;
|
||||
SHistoryTaskInfo hTaskInfo;
|
||||
STaskId streamTaskId;
|
||||
STaskExecStatisInfo execInfo;
|
||||
SArray* pReadyMsgList; // SArray<SStreamChkptReadyInfo*>
|
||||
TdThreadMutex lock; // secure the operation of set task status and puting data into inputQ
|
||||
SMsgCb* pMsgCb; // msg handle
|
||||
SStreamState* pState; // state backend
|
||||
SArray* pRspMsgList;
|
||||
SUpstreamInfo upstreamInfo;
|
||||
// the followings attributes don't be serialized
|
||||
int32_t notReadyTasks;
|
||||
int32_t numOfWaitingUpstream;
|
||||
|
@ -669,7 +675,6 @@ void tDeleteStreamDispatchReq(SStreamDispatchReq* pReq);
|
|||
|
||||
int32_t streamSetupScheduleTrigger(SStreamTask* pTask);
|
||||
|
||||
int32_t streamProcessRunReq(SStreamTask* pTask);
|
||||
int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pMsg);
|
||||
int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, int32_t code);
|
||||
|
||||
|
@ -677,7 +682,7 @@ int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieve
|
|||
SStreamChildEpInfo* streamTaskGetUpstreamTaskEpInfo(SStreamTask* pTask, int32_t taskId);
|
||||
|
||||
void streamTaskInputFail(SStreamTask* pTask);
|
||||
int32_t streamTryExec(SStreamTask* pTask);
|
||||
int32_t streamExecTask(SStreamTask* pTask);
|
||||
int32_t streamSchedExec(SStreamTask* pTask);
|
||||
bool streamTaskShouldStop(const SStreamStatus* pStatus);
|
||||
bool streamTaskShouldPause(const SStreamStatus* pStatus);
|
||||
|
@ -693,6 +698,7 @@ int32_t streamTaskLaunchScanHistory(SStreamTask* pTask);
|
|||
int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_t vgId, int64_t stage);
|
||||
int32_t streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList);
|
||||
void streamTaskResetUpstreamStageInfo(SStreamTask* pTask);
|
||||
bool streamTaskAllUpstreamClosed(SStreamTask* pTask);
|
||||
bool streamTaskSetSchedStatusWait(SStreamTask* pTask);
|
||||
int8_t streamTaskSetSchedStatusActive(SStreamTask* pTask);
|
||||
int8_t streamTaskSetSchedStatusInActive(SStreamTask* pTask);
|
||||
|
|
|
@ -87,15 +87,17 @@ END:
|
|||
}
|
||||
|
||||
int32_t mndSetSinkTaskInfo(SStreamObj* pStream, SStreamTask* pTask) {
|
||||
STaskOutputInfo* pInfo = &pTask->outputInfo;
|
||||
|
||||
if (pStream->smaId != 0) {
|
||||
pTask->outputInfo.type = TASK_OUTPUT__SMA;
|
||||
pTask->smaSink.smaId = pStream->smaId;
|
||||
pInfo->type = TASK_OUTPUT__SMA;
|
||||
pInfo->smaSink.smaId = pStream->smaId;
|
||||
} else {
|
||||
pTask->outputInfo.type = TASK_OUTPUT__TABLE;
|
||||
pTask->tbSink.stbUid = pStream->targetStbUid;
|
||||
memcpy(pTask->tbSink.stbFullName, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN);
|
||||
pTask->tbSink.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema);
|
||||
if (pTask->tbSink.pSchemaWrapper == NULL) {
|
||||
pInfo->type = TASK_OUTPUT__TABLE;
|
||||
pInfo->tbSink.stbUid = pStream->targetStbUid;
|
||||
memcpy(pInfo->tbSink.stbFullName, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN);
|
||||
pInfo->tbSink.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema);
|
||||
if (pInfo->tbSink.pSchemaWrapper == NULL) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
}
|
||||
|
@ -113,7 +115,7 @@ int32_t mndAddDispatcherForInternalTask(SMnode* pMnode, SStreamObj* pStream, SAr
|
|||
isShuffle = true;
|
||||
pTask->outputInfo.type = TASK_OUTPUT__SHUFFLE_DISPATCH;
|
||||
pTask->msgInfo.msgType = TDMT_STREAM_TASK_DISPATCH;
|
||||
if (mndExtractDbInfo(pMnode, pDb, &pTask->shuffleDispatcher.dbInfo, NULL) < 0) {
|
||||
if (mndExtractDbInfo(pMnode, pDb, &pTask->outputInfo.shuffleDispatcher.dbInfo, NULL) < 0) {
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
@ -124,8 +126,8 @@ int32_t mndAddDispatcherForInternalTask(SMnode* pMnode, SStreamObj* pStream, SAr
|
|||
int32_t numOfSinkNodes = taosArrayGetSize(pSinkNodeList);
|
||||
|
||||
if (isShuffle) {
|
||||
memcpy(pTask->shuffleDispatcher.stbFullName, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN);
|
||||
SArray* pVgs = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
|
||||
memcpy(pTask->outputInfo.shuffleDispatcher.stbFullName, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN);
|
||||
SArray* pVgs = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos;
|
||||
|
||||
int32_t numOfVgroups = taosArrayGetSize(pVgs);
|
||||
for (int32_t i = 0; i < numOfVgroups; i++) {
|
||||
|
|
|
@ -54,7 +54,7 @@ FAIL:
|
|||
}
|
||||
|
||||
int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t nextProcessVer) {
|
||||
ASSERT(pTask->info.taskLevel == TASK_LEVEL__AGG && taosArrayGetSize(pTask->pUpstreamInfoList) != 0);
|
||||
ASSERT(pTask->info.taskLevel == TASK_LEVEL__AGG && taosArrayGetSize(pTask->upstreamInfo.pList) != 0);
|
||||
int32_t code = streamTaskInit(pTask, pSnode->pMeta, &pSnode->msgCb, nextProcessVer);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
|
@ -70,7 +70,7 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t nextProcessVer
|
|||
qDebug("s-task:%s state:%p", pTask->id.idStr, pTask->pState);
|
||||
}
|
||||
|
||||
int32_t numOfChildEp = taosArrayGetSize(pTask->pUpstreamInfoList);
|
||||
int32_t numOfChildEp = taosArrayGetSize(pTask->upstreamInfo.pList);
|
||||
SReadHandle handle = { .vnode = NULL, .numOfVgroups = numOfChildEp, .pStateBackend = pTask->pState, .fillHistory = pTask->info.fillHistory };
|
||||
initStreamStateAPI(&handle.api);
|
||||
|
||||
|
@ -206,7 +206,7 @@ int32_t sndProcessTaskRunReq(SSnode *pSnode, SRpcMsg *pMsg) {
|
|||
|
||||
SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, pReq->streamId, pReq->taskId);
|
||||
if (pTask) {
|
||||
streamProcessRunReq(pTask);
|
||||
streamExecTask(pTask);
|
||||
streamMetaReleaseTask(pSnode->pMeta, pTask);
|
||||
return 0;
|
||||
} else {
|
||||
|
|
|
@ -797,7 +797,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t nextProcessVer) {
|
|||
tqDebug("s-task:%s state:%p", pTask->id.idStr, pTask->pState);
|
||||
}
|
||||
|
||||
int32_t numOfVgroups = (int32_t)taosArrayGetSize(pTask->pUpstreamInfoList);
|
||||
int32_t numOfVgroups = (int32_t)taosArrayGetSize(pTask->upstreamInfo.pList);
|
||||
SReadHandle handle = {
|
||||
.checkpointId = pTask->chkInfo.checkpointId,
|
||||
.vnode = NULL,
|
||||
|
@ -818,27 +818,27 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t nextProcessVer) {
|
|||
|
||||
// sink
|
||||
if (pTask->outputInfo.type == TASK_OUTPUT__SMA) {
|
||||
pTask->smaSink.vnode = pTq->pVnode;
|
||||
pTask->smaSink.smaSink = smaHandleRes;
|
||||
pTask->outputInfo.smaSink.vnode = pTq->pVnode;
|
||||
pTask->outputInfo.smaSink.smaSink = smaHandleRes;
|
||||
} else if (pTask->outputInfo.type == TASK_OUTPUT__TABLE) {
|
||||
pTask->tbSink.vnode = pTq->pVnode;
|
||||
pTask->tbSink.tbSinkFunc = tqSinkDataIntoDstTable;
|
||||
pTask->outputInfo.tbSink.vnode = pTq->pVnode;
|
||||
pTask->outputInfo.tbSink.tbSinkFunc = tqSinkDataIntoDstTable;
|
||||
|
||||
int32_t ver1 = 1;
|
||||
SMetaInfo info = {0};
|
||||
code = metaGetInfo(pTq->pVnode->pMeta, pTask->tbSink.stbUid, &info, NULL);
|
||||
code = metaGetInfo(pTq->pVnode->pMeta, pTask->outputInfo.tbSink.stbUid, &info, NULL);
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
ver1 = info.skmVer;
|
||||
}
|
||||
|
||||
SSchemaWrapper* pschemaWrapper = pTask->tbSink.pSchemaWrapper;
|
||||
pTask->tbSink.pTSchema = tBuildTSchema(pschemaWrapper->pSchema, pschemaWrapper->nCols, ver1);
|
||||
if (pTask->tbSink.pTSchema == NULL) {
|
||||
SSchemaWrapper* pschemaWrapper = pTask->outputInfo.tbSink.pSchemaWrapper;
|
||||
pTask->outputInfo.tbSink.pTSchema = tBuildTSchema(pschemaWrapper->pSchema, pschemaWrapper->nCols, ver1);
|
||||
if (pTask->outputInfo.tbSink.pTSchema == NULL) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
pTask->tbSink.pTblInfo = tSimpleHashInit(10240, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
|
||||
tSimpleHashSetFreeFp(pTask->tbSink.pTblInfo, freePtr);
|
||||
pTask->outputInfo.tbSink.pTblInfo = tSimpleHashInit(10240, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
|
||||
tSimpleHashSetFreeFp(pTask->outputInfo.tbSink.pTblInfo, freePtr);
|
||||
}
|
||||
|
||||
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
|
||||
|
@ -1202,7 +1202,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
|
|||
streamGetTaskStatusStr(TASK_STATUS__PAUSE), streamGetTaskStatusStr(pTask->status.keepTaskStatus));
|
||||
}
|
||||
|
||||
streamTryExec(pTask); // exec directly
|
||||
streamExecTask(pTask); // exec directly
|
||||
} else {
|
||||
STimeWindow* pWindow = &pTask->dataRange.window;
|
||||
tqDebug("s-task:%s level:%d verRange:%" PRId64 " - %" PRId64 " window:%" PRId64 "-%" PRId64
|
||||
|
@ -1347,7 +1347,7 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
|
|||
if (st == TASK_STATUS__NORMAL || st == TASK_STATUS__SCAN_HISTORY || st == TASK_STATUS__CK) {
|
||||
tqDebug("vgId:%d s-task:%s start to process block from inputQ, next checked ver:%" PRId64, vgId, pTask->id.idStr,
|
||||
pTask->chkInfo.nextProcessVer);
|
||||
streamProcessRunReq(pTask);
|
||||
streamExecTask(pTask);
|
||||
} else {
|
||||
int8_t status = streamTaskSetSchedStatusInActive(pTask);
|
||||
tqDebug("vgId:%d s-task:%s ignore run req since not in ready state, status:%s, sched-status:%d", vgId,
|
||||
|
|
|
@ -142,7 +142,7 @@ static int32_t doBuildAndSendCreateTableMsg(SVnode* pVnode, char* stbFullName, S
|
|||
int64_t suid) {
|
||||
tqDebug("s-task:%s build create table msg", pTask->id.idStr);
|
||||
|
||||
STSchema* pTSchema = pTask->tbSink.pTSchema;
|
||||
STSchema* pTSchema = pTask->outputInfo.tbSink.pTSchema;
|
||||
int32_t rows = pDataBlock->info.rows;
|
||||
SArray* tagArray = NULL;
|
||||
int32_t code = 0;
|
||||
|
@ -588,7 +588,7 @@ int32_t doConvertRows(SSubmitTbData* pTableData, STSchema* pTSchema, SSDataBlock
|
|||
int32_t doWaitForDstTableCreated(SVnode* pVnode, SStreamTask* pTask, STableSinkInfo* pTableSinkInfo,
|
||||
const char* dstTableName, int64_t* uid) {
|
||||
int32_t vgId = TD_VID(pVnode);
|
||||
int64_t suid = pTask->tbSink.stbUid;
|
||||
int64_t suid = pTask->outputInfo.tbSink.stbUid;
|
||||
const char* id = pTask->id.idStr;
|
||||
|
||||
while (pTableSinkInfo->uid == 0) {
|
||||
|
@ -631,12 +631,12 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat
|
|||
char* dstTableName = pDataBlock->info.parTbName;
|
||||
int32_t numOfRows = pDataBlock->info.rows;
|
||||
const char* id = pTask->id.idStr;
|
||||
int64_t suid = pTask->tbSink.stbUid;
|
||||
STSchema* pTSchema = pTask->tbSink.pTSchema;
|
||||
int64_t suid = pTask->outputInfo.tbSink.stbUid;
|
||||
STSchema* pTSchema = pTask->outputInfo.tbSink.pTSchema;
|
||||
int32_t vgId = TD_VID(pVnode);
|
||||
STableSinkInfo* pTableSinkInfo = NULL;
|
||||
|
||||
bool alreadyCached = tqGetTableInfo(pTask->tbSink.pTblInfo, groupId, &pTableSinkInfo);
|
||||
bool alreadyCached = tqGetTableInfo(pTask->outputInfo.tbSink.pTblInfo, groupId, &pTableSinkInfo);
|
||||
|
||||
if (alreadyCached) {
|
||||
if (dstTableName[0] == 0) { // data block does not set the destination table name
|
||||
|
@ -702,7 +702,7 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat
|
|||
}
|
||||
|
||||
pTableSinkInfo->uid = 0;
|
||||
doPutIntoCache(pTask->tbSink.pTblInfo, pTableSinkInfo, groupId, id);
|
||||
doPutIntoCache(pTask->outputInfo.tbSink.pTblInfo, pTableSinkInfo, groupId, id);
|
||||
} else {
|
||||
bool isValid = isValidDstChildTable(&mr, vgId, dstTableName, suid);
|
||||
if (!isValid) {
|
||||
|
@ -716,7 +716,7 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat
|
|||
pTableSinkInfo->uid = mr.me.uid;
|
||||
|
||||
metaReaderClear(&mr);
|
||||
doPutIntoCache(pTask->tbSink.pTblInfo, pTableSinkInfo, groupId, id);
|
||||
doPutIntoCache(pTask->outputInfo.tbSink.pTblInfo, pTableSinkInfo, groupId, id);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -730,11 +730,11 @@ int32_t setDstTableDataPayload(SStreamTask* pTask, int32_t blockIndex, SSDataBlo
|
|||
const char* id = pTask->id.idStr;
|
||||
|
||||
tqDebug("s-task:%s sink data pipeline, build submit msg from %dth resBlock, including %d rows, dst suid:%" PRId64,
|
||||
id, blockIndex + 1, numOfRows, pTask->tbSink.stbUid);
|
||||
id, blockIndex + 1, numOfRows, pTask->outputInfo.tbSink.stbUid);
|
||||
char* dstTableName = pDataBlock->info.parTbName;
|
||||
|
||||
// convert all rows
|
||||
int32_t code = doConvertRows(pTableData, pTask->tbSink.pTSchema, pDataBlock, id);
|
||||
int32_t code = doConvertRows(pTableData, pTask->outputInfo.tbSink.pTSchema, pDataBlock, id);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tqError("s-task:%s failed to convert rows from result block, code:%s", id, tstrerror(terrno));
|
||||
return code;
|
||||
|
@ -759,9 +759,9 @@ bool hasOnlySubmitData(const SArray* pBlocks, int32_t numOfBlocks) {
|
|||
void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) {
|
||||
const SArray* pBlocks = (const SArray*)data;
|
||||
SVnode* pVnode = (SVnode*)vnode;
|
||||
int64_t suid = pTask->tbSink.stbUid;
|
||||
char* stbFullName = pTask->tbSink.stbFullName;
|
||||
STSchema* pTSchema = pTask->tbSink.pTSchema;
|
||||
int64_t suid = pTask->outputInfo.tbSink.stbUid;
|
||||
char* stbFullName = pTask->outputInfo.tbSink.stbFullName;
|
||||
STSchema* pTSchema = pTask->outputInfo.tbSink.pTSchema;
|
||||
int32_t vgId = TD_VID(pVnode);
|
||||
int32_t numOfBlocks = taosArrayGetSize(pBlocks);
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
|
|
@ -126,7 +126,7 @@ int32_t streamSchedExec(SStreamTask* pTask) {
|
|||
SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)};
|
||||
tmsgPutToQueue(pTask->pMsgCb, STREAM_QUEUE, &msg);
|
||||
} else {
|
||||
stDebug("s-task:%s not launch task since sched status:%d", pTask->id.idStr, pTask->status.schedStatus);
|
||||
stTrace("s-task:%s not launch task since sched status:%d", pTask->id.idStr, pTask->status.schedStatus);
|
||||
}
|
||||
|
||||
return 0;
|
||||
|
@ -239,8 +239,9 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S
|
|||
status = TASK_INPUT_STATUS__BLOCKED;
|
||||
} else {
|
||||
// This task has received the checkpoint req from the upstream task, from which all the messages should be
|
||||
// blocked
|
||||
// blocked. Note that there is no race condition here.
|
||||
if (pReq->type == STREAM_INPUT__CHECKPOINT_TRIGGER) {
|
||||
atomic_add_fetch_32(&pTask->upstreamInfo.numOfClosed, 1);
|
||||
streamTaskCloseUpstreamInput(pTask, pReq->upstreamTaskId);
|
||||
stDebug("s-task:%s close inputQ for upstream:0x%x, msgId:%d", id, pReq->upstreamTaskId, pReq->msgId);
|
||||
}
|
||||
|
@ -274,13 +275,6 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t streamProcessRunReq(SStreamTask* pTask) {
|
||||
if (streamTryExec(pTask) < 0) {
|
||||
return -1;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pRsp) {
|
||||
streamTaskEnqueueRetrieve(pTask, pReq, pRsp);
|
||||
ASSERT(pTask->info.taskLevel != TASK_LEVEL__SINK);
|
||||
|
@ -291,15 +285,17 @@ int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, S
|
|||
void streamTaskInputFail(SStreamTask* pTask) { atomic_store_8(&pTask->inputInfo.status, TASK_INPUT_STATUS__FAILED); }
|
||||
|
||||
void streamTaskOpenAllUpstreamInput(SStreamTask* pTask) {
|
||||
int32_t num = taosArrayGetSize(pTask->pUpstreamInfoList);
|
||||
int32_t num = taosArrayGetSize(pTask->upstreamInfo.pList);
|
||||
if (num == 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < num; ++i) {
|
||||
SStreamChildEpInfo* pInfo = taosArrayGetP(pTask->pUpstreamInfoList, i);
|
||||
SStreamChildEpInfo* pInfo = taosArrayGetP(pTask->upstreamInfo.pList, i);
|
||||
pInfo->dataAllowed = true;
|
||||
}
|
||||
|
||||
pTask->upstreamInfo.numOfClosed = 0;
|
||||
}
|
||||
|
||||
void streamTaskCloseUpstreamInput(SStreamTask* pTask, int32_t taskId) {
|
||||
|
@ -310,9 +306,9 @@ void streamTaskCloseUpstreamInput(SStreamTask* pTask, int32_t taskId) {
|
|||
}
|
||||
|
||||
SStreamChildEpInfo* streamTaskGetUpstreamTaskEpInfo(SStreamTask* pTask, int32_t taskId) {
|
||||
int32_t num = taosArrayGetSize(pTask->pUpstreamInfoList);
|
||||
int32_t num = taosArrayGetSize(pTask->upstreamInfo.pList);
|
||||
for (int32_t i = 0; i < num; ++i) {
|
||||
SStreamChildEpInfo* pInfo = taosArrayGetP(pTask->pUpstreamInfoList, i);
|
||||
SStreamChildEpInfo* pInfo = taosArrayGetP(pTask->upstreamInfo.pList, i);
|
||||
if (pInfo->taskId == taskId) {
|
||||
return pInfo;
|
||||
}
|
||||
|
|
|
@ -92,7 +92,7 @@ int32_t tDecodeStreamCheckpointReadyMsg(SDecoder* pDecoder, SStreamCheckpointRea
|
|||
}
|
||||
|
||||
static int32_t streamAlignCheckpoint(SStreamTask* pTask) {
|
||||
int32_t num = taosArrayGetSize(pTask->pUpstreamInfoList);
|
||||
int32_t num = taosArrayGetSize(pTask->upstreamInfo.pList);
|
||||
int64_t old = atomic_val_compare_exchange_32(&pTask->checkpointAlignCnt, 0, num);
|
||||
if (old == 0) {
|
||||
stDebug("s-task:%s set initial align upstream num:%d", pTask->id.idStr, num);
|
||||
|
@ -153,7 +153,7 @@ static int32_t continueDispatchCheckpointBlock(SStreamDataBlock* pBlock, SStream
|
|||
pBlock->srcTaskId = pTask->id.taskId;
|
||||
pBlock->srcVgId = pTask->pMeta->vgId;
|
||||
|
||||
int32_t code = taosWriteQitem(pTask->outputInfo.queue->pQueue, pBlock);
|
||||
int32_t code = taosWriteQitem(pTask->outputq.queue->pQueue, pBlock);
|
||||
if (code == 0) {
|
||||
streamDispatchStreamBlock(pTask);
|
||||
} else {
|
||||
|
@ -192,14 +192,14 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc
|
|||
int32_t taskLevel = pTask->info.taskLevel;
|
||||
if (taskLevel == TASK_LEVEL__SOURCE) {
|
||||
if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
||||
stDebug("s-task:%s set childIdx:%d, and add checkpoint block into outputQ", id, pTask->info.selfChildId);
|
||||
stDebug("s-task:%s set childIdx:%d, and add checkpoint-trigger block into outputQ", id, pTask->info.selfChildId);
|
||||
continueDispatchCheckpointBlock(pBlock, pTask);
|
||||
} else { // only one task exists, no need to dispatch downstream info
|
||||
streamProcessCheckpointReadyMsg(pTask);
|
||||
streamFreeQitem((SStreamQueueItem*)pBlock);
|
||||
}
|
||||
} else if (taskLevel == TASK_LEVEL__SINK || taskLevel == TASK_LEVEL__AGG) {
|
||||
ASSERT(taosArrayGetSize(pTask->pUpstreamInfoList) > 0);
|
||||
ASSERT(taosArrayGetSize(pTask->upstreamInfo.pList) > 0);
|
||||
if (pTask->chkInfo.startTs == 0) {
|
||||
pTask->chkInfo.startTs = taosGetTimestampMs();
|
||||
pTask->execInfo.checkpoint += 1;
|
||||
|
@ -210,7 +210,7 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc
|
|||
|
||||
// there are still some upstream tasks not send checkpoint request, do nothing and wait for then
|
||||
int32_t notReady = streamAlignCheckpoint(pTask);
|
||||
int32_t num = taosArrayGetSize(pTask->pUpstreamInfoList);
|
||||
int32_t num = taosArrayGetSize(pTask->upstreamInfo.pList);
|
||||
if (notReady > 0) {
|
||||
stDebug("s-task:%s received checkpoint block, idx:%d, %d upstream tasks not send checkpoint info yet, total:%d",
|
||||
id, pTask->info.selfChildId, notReady, num);
|
||||
|
|
|
@ -195,11 +195,11 @@ int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock)
|
|||
.retrieveLen = dataStrLen,
|
||||
};
|
||||
|
||||
int32_t sz = taosArrayGetSize(pTask->pUpstreamInfoList);
|
||||
int32_t sz = taosArrayGetSize(pTask->upstreamInfo.pList);
|
||||
ASSERT(sz > 0);
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
req.reqId = tGenIdPI64();
|
||||
SStreamChildEpInfo* pEpInfo = taosArrayGetP(pTask->pUpstreamInfoList, i);
|
||||
SStreamChildEpInfo* pEpInfo = taosArrayGetP(pTask->upstreamInfo.pList, i);
|
||||
req.dstNodeId = pEpInfo->nodeId;
|
||||
req.dstTaskId = pEpInfo->taskId;
|
||||
int32_t len;
|
||||
|
@ -288,7 +288,7 @@ void destroyDispatchMsg(SStreamDispatchReq* pReq, int32_t numOfVgroups) {
|
|||
int32_t getNumOfDispatchBranch(SStreamTask* pTask) {
|
||||
return (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH)
|
||||
? 1
|
||||
: taosArrayGetSize(pTask->shuffleDispatcher.dbInfo.pVgroupInfos);
|
||||
: taosArrayGetSize(pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos);
|
||||
}
|
||||
|
||||
static int32_t doBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* pData) {
|
||||
|
@ -301,7 +301,7 @@ static int32_t doBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* pD
|
|||
if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
|
||||
SStreamDispatchReq* pReq = taosMemoryCalloc(1, sizeof(SStreamDispatchReq));
|
||||
|
||||
int32_t downstreamTaskId = pTask->fixedDispatcher.taskId;
|
||||
int32_t downstreamTaskId = pTask->outputInfo.fixedDispatcher.taskId;
|
||||
code = tInitStreamDispatchReq(pReq, pTask, pData->srcVgId, numOfBlocks, downstreamTaskId, pData->type);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
|
@ -318,10 +318,10 @@ static int32_t doBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* pD
|
|||
|
||||
pTask->msgInfo.pData = pReq;
|
||||
} else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
||||
int32_t rspCnt = atomic_load_32(&pTask->shuffleDispatcher.waitingRspCnt);
|
||||
int32_t rspCnt = atomic_load_32(&pTask->outputInfo.shuffleDispatcher.waitingRspCnt);
|
||||
ASSERT(rspCnt == 0);
|
||||
|
||||
SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
|
||||
SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos;
|
||||
int32_t numOfVgroups = taosArrayGetSize(vgInfo);
|
||||
|
||||
SStreamDispatchReq* pReqs = taosMemoryCalloc(numOfVgroups, sizeof(SStreamDispatchReq));
|
||||
|
@ -352,7 +352,7 @@ static int32_t doBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* pD
|
|||
}
|
||||
|
||||
if (pReqs[j].blockNum == 0) {
|
||||
atomic_add_fetch_32(&pTask->shuffleDispatcher.waitingRspCnt, 1);
|
||||
atomic_add_fetch_32(&pTask->outputInfo.shuffleDispatcher.waitingRspCnt, 1);
|
||||
}
|
||||
|
||||
pReqs[j].blockNum++;
|
||||
|
@ -381,16 +381,16 @@ static int32_t sendDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pDispatch
|
|||
const char* id = pTask->id.idStr;
|
||||
|
||||
if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
|
||||
int32_t vgId = pTask->fixedDispatcher.nodeId;
|
||||
SEpSet* pEpSet = &pTask->fixedDispatcher.epSet;
|
||||
int32_t downstreamTaskId = pTask->fixedDispatcher.taskId;
|
||||
int32_t vgId = pTask->outputInfo.fixedDispatcher.nodeId;
|
||||
SEpSet* pEpSet = &pTask->outputInfo.fixedDispatcher.epSet;
|
||||
int32_t downstreamTaskId = pTask->outputInfo.fixedDispatcher.taskId;
|
||||
|
||||
stDebug("s-task:%s (child taskId:%d) fix-dispatch %d block(s) to s-task:0x%x (vgId:%d), id:%d", id,
|
||||
pTask->info.selfChildId, 1, downstreamTaskId, vgId, msgId);
|
||||
|
||||
code = doSendDispatchMsg(pTask, pDispatchMsg, vgId, pEpSet);
|
||||
} else {
|
||||
SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
|
||||
SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos;
|
||||
int32_t numOfVgroups = taosArrayGetSize(vgInfo);
|
||||
|
||||
stDebug("s-task:%s (child taskId:%d) start to shuffle-dispatch blocks to %d vgroup(s), msgId:%d",
|
||||
|
@ -421,12 +421,12 @@ static void doRetryDispatchData(void* param, void* tmrId) {
|
|||
int32_t msgId = pTask->execInfo.dispatch;
|
||||
|
||||
if (streamTaskShouldStop(&pTask->status)) {
|
||||
int8_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
|
||||
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
|
||||
stDebug("s-task:%s should stop, abort from timer, ref:%d", pTask->id.idStr, ref);
|
||||
return;
|
||||
}
|
||||
|
||||
ASSERT(pTask->outputInfo.status == TASK_OUTPUT_STATUS__WAIT);
|
||||
ASSERT(pTask->outputq.status == TASK_OUTPUT_STATUS__WAIT);
|
||||
|
||||
int32_t code = 0;
|
||||
{
|
||||
|
@ -436,7 +436,7 @@ static void doRetryDispatchData(void* param, void* tmrId) {
|
|||
SStreamDispatchReq *pReq = pTask->msgInfo.pData;
|
||||
|
||||
if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
||||
SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
|
||||
SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos;
|
||||
int32_t numOfVgroups = taosArrayGetSize(vgInfo);
|
||||
|
||||
int32_t numOfFailed = taosArrayGetSize(pList);
|
||||
|
@ -462,9 +462,9 @@ static void doRetryDispatchData(void* param, void* tmrId) {
|
|||
|
||||
stDebug("s-task:%s complete re-try shuffle-dispatch blocks to all %d vnodes, msgId:%d", pTask->id.idStr, numOfFailed, msgId);
|
||||
} else {
|
||||
int32_t vgId = pTask->fixedDispatcher.nodeId;
|
||||
SEpSet* pEpSet = &pTask->fixedDispatcher.epSet;
|
||||
int32_t downstreamTaskId = pTask->fixedDispatcher.taskId;
|
||||
int32_t vgId = pTask->outputInfo.fixedDispatcher.nodeId;
|
||||
SEpSet* pEpSet = &pTask->outputInfo.fixedDispatcher.epSet;
|
||||
int32_t downstreamTaskId = pTask->outputInfo.fixedDispatcher.taskId;
|
||||
|
||||
stDebug("s-task:%s (child taskId:%d) fix-dispatch %d block(s) to s-task:0x%x (vgId:%d), id:%d", id,
|
||||
pTask->info.selfChildId, 1, downstreamTaskId, vgId, msgId);
|
||||
|
@ -476,7 +476,7 @@ static void doRetryDispatchData(void* param, void* tmrId) {
|
|||
if (code != TSDB_CODE_SUCCESS) {
|
||||
if (!streamTaskShouldStop(&pTask->status)) {
|
||||
// stDebug("s-task:%s reset the waitRspCnt to be 0 before launch retry dispatch", pTask->id.idStr);
|
||||
// atomic_store_32(&pTask->shuffleDispatcher.waitingRspCnt, 0);
|
||||
// atomic_store_32(&pTask->outputInfo.shuffleDispatcher.waitingRspCnt, 0);
|
||||
if (streamTaskShouldPause(&pTask->status)) {
|
||||
streamRetryDispatchData(pTask, DISPATCH_RETRY_INTERVAL_MS * 10);
|
||||
} else {
|
||||
|
@ -487,7 +487,7 @@ static void doRetryDispatchData(void* param, void* tmrId) {
|
|||
stDebug("s-task:%s should stop, abort from timer, ref:%d", pTask->id.idStr, ref);
|
||||
}
|
||||
} else {
|
||||
int8_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
|
||||
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
|
||||
stDebug("s-task:%s send success, jump out of timer, ref:%d", pTask->id.idStr, ref);
|
||||
}
|
||||
}
|
||||
|
@ -508,7 +508,7 @@ void streamRetryDispatchData(SStreamTask* pTask, int64_t waitDuration) {
|
|||
int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, SSDataBlock* pDataBlock, int32_t vgSz,
|
||||
int64_t groupId) {
|
||||
uint32_t hashValue = 0;
|
||||
SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
|
||||
SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos;
|
||||
if (pTask->pNameMap == NULL) {
|
||||
pTask->pNameMap = tSimpleHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
|
||||
}
|
||||
|
@ -528,14 +528,14 @@ int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, S
|
|||
}
|
||||
|
||||
if (pDataBlock->info.parTbName[0]) {
|
||||
snprintf(ctbName, TSDB_TABLE_NAME_LEN, "%s.%s", pTask->shuffleDispatcher.dbInfo.db, pDataBlock->info.parTbName);
|
||||
snprintf(ctbName, TSDB_TABLE_NAME_LEN, "%s.%s", pTask->outputInfo.shuffleDispatcher.dbInfo.db, pDataBlock->info.parTbName);
|
||||
} else {
|
||||
buildCtbNameByGroupIdImpl(pTask->shuffleDispatcher.stbFullName, groupId, pDataBlock->info.parTbName);
|
||||
snprintf(ctbName, TSDB_TABLE_NAME_LEN, "%s.%s", pTask->shuffleDispatcher.dbInfo.db, pDataBlock->info.parTbName);
|
||||
buildCtbNameByGroupIdImpl(pTask->outputInfo.shuffleDispatcher.stbFullName, groupId, pDataBlock->info.parTbName);
|
||||
snprintf(ctbName, TSDB_TABLE_NAME_LEN, "%s.%s", pTask->outputInfo.shuffleDispatcher.dbInfo.db, pDataBlock->info.parTbName);
|
||||
}
|
||||
|
||||
/*uint32_t hashValue = MurmurHash3_32(ctbName, strlen(ctbName));*/
|
||||
SUseDbRsp* pDbInfo = &pTask->shuffleDispatcher.dbInfo;
|
||||
SUseDbRsp* pDbInfo = &pTask->outputInfo.shuffleDispatcher.dbInfo;
|
||||
hashValue =
|
||||
taosGetTbHashVal(ctbName, strlen(ctbName), pDbInfo->hashMethod, pDbInfo->hashPrefix, pDbInfo->hashSuffix);
|
||||
taosMemoryFree(ctbName);
|
||||
|
@ -560,7 +560,7 @@ int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, S
|
|||
}
|
||||
|
||||
if (pReqs[j].blockNum == 0) {
|
||||
atomic_add_fetch_32(&pTask->shuffleDispatcher.waitingRspCnt, 1);
|
||||
atomic_add_fetch_32(&pTask->outputInfo.shuffleDispatcher.waitingRspCnt, 1);
|
||||
}
|
||||
|
||||
pReqs[j].blockNum++;
|
||||
|
@ -576,27 +576,27 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
|
|||
ASSERT((pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH));
|
||||
|
||||
const char* id = pTask->id.idStr;
|
||||
int32_t numOfElems = streamQueueGetNumOfItems(pTask->outputInfo.queue);
|
||||
int32_t numOfElems = streamQueueGetNumOfItems(pTask->outputq.queue);
|
||||
if (numOfElems > 0) {
|
||||
double size = SIZE_IN_MiB(taosQueueMemorySize(pTask->outputInfo.queue->pQueue));
|
||||
double size = SIZE_IN_MiB(taosQueueMemorySize(pTask->outputq.queue->pQueue));
|
||||
stDebug("s-task:%s start to dispatch intermediate block to downstream, elem in outputQ:%d, size:%.2fMiB", id, numOfElems, size);
|
||||
}
|
||||
|
||||
// to make sure only one dispatch is running
|
||||
int8_t old =
|
||||
atomic_val_compare_exchange_8(&pTask->outputInfo.status, TASK_OUTPUT_STATUS__NORMAL, TASK_OUTPUT_STATUS__WAIT);
|
||||
atomic_val_compare_exchange_8(&pTask->outputq.status, TASK_OUTPUT_STATUS__NORMAL, TASK_OUTPUT_STATUS__WAIT);
|
||||
if (old != TASK_OUTPUT_STATUS__NORMAL) {
|
||||
stDebug("s-task:%s wait for dispatch rsp, not dispatch now, output status:%d", id, old);
|
||||
return 0;
|
||||
}
|
||||
|
||||
ASSERT(pTask->msgInfo.pData == NULL);
|
||||
stDebug("s-task:%s start to dispatch msg, set output status:%d", id, pTask->outputInfo.status);
|
||||
stDebug("s-task:%s start to dispatch msg, set output status:%d", id, pTask->outputq.status);
|
||||
|
||||
SStreamDataBlock* pBlock = streamQueueNextItem(pTask->outputInfo.queue);
|
||||
SStreamDataBlock* pBlock = streamQueueNextItem(pTask->outputq.queue);
|
||||
if (pBlock == NULL) {
|
||||
atomic_store_8(&pTask->outputInfo.status, TASK_OUTPUT_STATUS__NORMAL);
|
||||
stDebug("s-task:%s not dispatch since no elems in outputQ, output status:%d", id, pTask->outputInfo.status);
|
||||
atomic_store_8(&pTask->outputq.status, TASK_OUTPUT_STATUS__NORMAL);
|
||||
stDebug("s-task:%s not dispatch since no elems in outputQ, output status:%d", id, pTask->outputq.status);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -620,10 +620,10 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
|
|||
}
|
||||
|
||||
stDebug("s-task:%s failed to dispatch msg:%d to downstream, code:%s, output status:%d, retry cnt:%d", id,
|
||||
pTask->execInfo.dispatch, tstrerror(terrno), pTask->outputInfo.status, retryCount);
|
||||
pTask->execInfo.dispatch, tstrerror(terrno), pTask->outputq.status, retryCount);
|
||||
|
||||
// todo deal with only partially success dispatch case
|
||||
atomic_store_32(&pTask->shuffleDispatcher.waitingRspCnt, 0);
|
||||
atomic_store_32(&pTask->outputInfo.shuffleDispatcher.waitingRspCnt, 0);
|
||||
if (terrno == TSDB_CODE_APP_IS_STOPPING) { // in case of this error, do not retry anymore
|
||||
destroyDispatchMsg(pTask->msgInfo.pData, getNumOfDispatchBranch(pTask));
|
||||
pTask->msgInfo.pData = NULL;
|
||||
|
@ -631,7 +631,7 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
|
|||
}
|
||||
|
||||
if (++retryCount > MAX_CONTINUE_RETRY_COUNT) { // add to timer to retry
|
||||
int8_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
|
||||
int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
|
||||
stDebug("s-task:%s failed to dispatch msg to downstream for %d times, code:%s, add timer to retry in %dms, ref:%d",
|
||||
pTask->id.idStr, retryCount, tstrerror(terrno), DISPATCH_RETRY_INTERVAL_MS, ref);
|
||||
|
||||
|
@ -654,11 +654,11 @@ int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask) {
|
|||
|
||||
// serialize
|
||||
if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
|
||||
req.downstreamTaskId = pTask->fixedDispatcher.taskId;
|
||||
req.downstreamTaskId = pTask->outputInfo.fixedDispatcher.taskId;
|
||||
pTask->notReadyTasks = 1;
|
||||
doDispatchScanHistoryFinishMsg(pTask, &req, pTask->fixedDispatcher.nodeId, &pTask->fixedDispatcher.epSet);
|
||||
doDispatchScanHistoryFinishMsg(pTask, &req, pTask->outputInfo.fixedDispatcher.nodeId, &pTask->outputInfo.fixedDispatcher.epSet);
|
||||
} else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
||||
SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
|
||||
SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos;
|
||||
int32_t numOfVgs = taosArrayGetSize(vgInfo);
|
||||
pTask->notReadyTasks = numOfVgs;
|
||||
|
||||
|
@ -680,7 +680,7 @@ int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask) {
|
|||
// this function is usually invoked by sink/agg task
|
||||
int32_t streamTaskSendCheckpointReadyMsg(SStreamTask* pTask) {
|
||||
int32_t num = taosArrayGetSize(pTask->pReadyMsgList);
|
||||
ASSERT(taosArrayGetSize(pTask->pUpstreamInfoList) == num);
|
||||
ASSERT(taosArrayGetSize(pTask->upstreamInfo.pList) == num);
|
||||
|
||||
for (int32_t i = 0; i < num; ++i) {
|
||||
SStreamChkptReadyInfo* pInfo = taosArrayGet(pTask->pReadyMsgList, i);
|
||||
|
@ -1049,7 +1049,7 @@ static int32_t handleDispatchSuccessRsp(SStreamTask* pTask, int32_t downstreamId
|
|||
}
|
||||
|
||||
// now ready for next data output
|
||||
atomic_store_8(&pTask->outputInfo.status, TASK_OUTPUT_STATUS__NORMAL);
|
||||
atomic_store_8(&pTask->outputq.status, TASK_OUTPUT_STATUS__NORMAL);
|
||||
|
||||
// otherwise, continue dispatch the first block to down stream task in pipeline
|
||||
streamDispatchStreamBlock(pTask);
|
||||
|
@ -1061,11 +1061,13 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
|
|||
int32_t vgId = pTask->pMeta->vgId;
|
||||
int32_t msgId = pTask->execInfo.dispatch;
|
||||
|
||||
// follower not handle the dispatch rsp
|
||||
if ((pTask->pMeta->role == NODE_ROLE_FOLLOWER) || (pTask->status.downstreamReady != 1)) {
|
||||
stError("s-task:%s vgId:%d is follower or task just re-launched, not handle the dispatch rsp, discard it", id, vgId);
|
||||
return TSDB_CODE_STREAM_TASK_NOT_EXIST;
|
||||
}
|
||||
|
||||
// discard invalid dispatch rsp msg
|
||||
if ((pRsp->msgId != msgId) || (pRsp->stage != pTask->pMeta->stage)) {
|
||||
stError("s-task:%s vgId:%d not expect rsp, expected: msgId:%d, stage:%" PRId64 " actual msgId:%d, stage:%" PRId64
|
||||
" discard it",
|
||||
|
@ -1107,7 +1109,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
|
|||
|
||||
int32_t leftRsp = 0;
|
||||
if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
||||
leftRsp = atomic_sub_fetch_32(&pTask->shuffleDispatcher.waitingRspCnt, 1);
|
||||
leftRsp = atomic_sub_fetch_32(&pTask->outputInfo.shuffleDispatcher.waitingRspCnt, 1);
|
||||
ASSERT(leftRsp >= 0);
|
||||
|
||||
if (leftRsp > 0) {
|
||||
|
@ -1127,17 +1129,17 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
|
|||
|
||||
// all msg rsp already, continue
|
||||
if (leftRsp == 0) {
|
||||
ASSERT(pTask->outputInfo.status == TASK_OUTPUT_STATUS__WAIT);
|
||||
ASSERT(pTask->outputq.status == TASK_OUTPUT_STATUS__WAIT);
|
||||
|
||||
// we need to re-try send dispatch msg to downstream tasks
|
||||
int32_t numOfFailed = taosArrayGetSize(pTask->msgInfo.pRetryList);
|
||||
if (numOfFailed > 0) {
|
||||
if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
||||
atomic_store_32(&pTask->shuffleDispatcher.waitingRspCnt, numOfFailed);
|
||||
stDebug("s-task:%s waiting rsp set to be %d", id, pTask->shuffleDispatcher.waitingRspCnt);
|
||||
atomic_store_32(&pTask->outputInfo.shuffleDispatcher.waitingRspCnt, numOfFailed);
|
||||
stDebug("s-task:%s waiting rsp set to be %d", id, pTask->outputInfo.shuffleDispatcher.waitingRspCnt);
|
||||
}
|
||||
|
||||
int8_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
|
||||
int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
|
||||
stDebug("s-task:%s failed to dispatch msg to downstream code:%s, add timer to retry in %dms, ref:%d",
|
||||
pTask->id.idStr, tstrerror(terrno), DISPATCH_RETRY_INTERVAL_MS, ref);
|
||||
|
||||
|
@ -1155,7 +1157,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
|
|||
}
|
||||
|
||||
// now ready for next data output
|
||||
atomic_store_8(&pTask->outputInfo.status, TASK_OUTPUT_STATUS__NORMAL);
|
||||
atomic_store_8(&pTask->outputq.status, TASK_OUTPUT_STATUS__NORMAL);
|
||||
} else {
|
||||
handleDispatchSuccessRsp(pTask, pRsp->downstreamTaskId);
|
||||
}
|
||||
|
|
|
@ -36,10 +36,10 @@ static int32_t doOutputResultBlockImpl(SStreamTask* pTask, SStreamDataBlock* pBl
|
|||
int32_t code = 0;
|
||||
int32_t type = pTask->outputInfo.type;
|
||||
if (type == TASK_OUTPUT__TABLE) {
|
||||
pTask->tbSink.tbSinkFunc(pTask, pTask->tbSink.vnode, pBlock->blocks);
|
||||
pTask->outputInfo.tbSink.tbSinkFunc(pTask, pTask->outputInfo.tbSink.vnode, pBlock->blocks);
|
||||
destroyStreamDataBlock(pBlock);
|
||||
} else if (type == TASK_OUTPUT__SMA) {
|
||||
pTask->smaSink.smaSink(pTask->smaSink.vnode, pTask->smaSink.smaId, pBlock->blocks);
|
||||
pTask->outputInfo.smaSink.smaSink(pTask->outputInfo.smaSink.vnode, pTask->outputInfo.smaSink.smaId, pBlock->blocks);
|
||||
destroyStreamDataBlock(pBlock);
|
||||
} else {
|
||||
ASSERT(type == TASK_OUTPUT__FIXED_DISPATCH || type == TASK_OUTPUT__SHUFFLE_DISPATCH);
|
||||
|
@ -487,7 +487,7 @@ int32_t streamProcessTranstateBlock(SStreamTask* pTask, SStreamDataBlock* pBlock
|
|||
// agg task should dispatch trans-state msg to sink task, to flush all data to sink task.
|
||||
if (level == TASK_LEVEL__AGG || level == TASK_LEVEL__SOURCE) {
|
||||
pBlock->srcVgId = pTask->pMeta->vgId;
|
||||
code = taosWriteQitem(pTask->outputInfo.queue->pQueue, pBlock);
|
||||
code = taosWriteQitem(pTask->outputq.queue->pQueue, pBlock);
|
||||
if (code == 0) {
|
||||
streamDispatchStreamBlock(pTask);
|
||||
} else {
|
||||
|
@ -607,7 +607,7 @@ bool streamTaskIsIdle(const SStreamTask* pTask) {
|
|||
pTask->status.taskStatus == TASK_STATUS__DROPPING);
|
||||
}
|
||||
|
||||
int32_t streamTryExec(SStreamTask* pTask) {
|
||||
int32_t streamExecTask(SStreamTask* pTask) {
|
||||
// this function may be executed by multi-threads, so status check is required.
|
||||
const char* id = pTask->id.idStr;
|
||||
|
||||
|
@ -615,7 +615,7 @@ int32_t streamTryExec(SStreamTask* pTask) {
|
|||
if (schedStatus == TASK_SCHED_STATUS__WAITING) {
|
||||
while (1) {
|
||||
int32_t code = streamExecForAll(pTask);
|
||||
if (code < 0) { // todo this status shoudl be removed
|
||||
if (code < 0) { // todo this status should be removed
|
||||
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__FAILED);
|
||||
return -1;
|
||||
}
|
||||
|
@ -663,7 +663,7 @@ int32_t streamTaskReloadState(SStreamTask* pTask) {
|
|||
}
|
||||
|
||||
int32_t streamAlignTransferState(SStreamTask* pTask) {
|
||||
int32_t numOfUpstream = taosArrayGetSize(pTask->pUpstreamInfoList);
|
||||
int32_t numOfUpstream = taosArrayGetSize(pTask->upstreamInfo.pList);
|
||||
int32_t old = atomic_val_compare_exchange_32(&pTask->transferStateAlignCnt, 0, numOfUpstream);
|
||||
if (old == 0) {
|
||||
stDebug("s-task:%s set the transfer state aligncnt %d", pTask->id.idStr, numOfUpstream);
|
||||
|
|
|
@ -891,7 +891,7 @@ void metaHbToMnode(void* param, void* tmrId) {
|
|||
|
||||
entry.inputRate = entry.inputQUsed*100.0/STREAM_TASK_QUEUE_CAPACITY_IN_SIZE;
|
||||
if ((*pTask)->info.taskLevel == TASK_LEVEL__SINK) {
|
||||
entry.sinkQuota = (*pTask)->pTokenBucket->quotaRate;
|
||||
entry.sinkQuota = (*pTask)->outputInfo.pTokenBucket->quotaRate;
|
||||
entry.sinkDataSize = SIZE_IN_MiB((*pTask)->execInfo.sink.dataSize);
|
||||
}
|
||||
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
#define MAX_STREAM_EXEC_BATCH_NUM 32
|
||||
#define MAX_SMOOTH_BURST_RATIO 5 // 5 sec
|
||||
#define WAIT_FOR_DURATION 40
|
||||
#define SINK_TASK_IDLE_DURATION 200 // 200 ms
|
||||
|
||||
// todo refactor:
|
||||
// read data from input queue
|
||||
|
@ -154,6 +155,10 @@ const char* streamQueueItemGetTypeStr(int32_t type) {
|
|||
}
|
||||
}
|
||||
|
||||
static void doLaunchSinkTask(void* param, void* tmrId) {
|
||||
|
||||
}
|
||||
|
||||
int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks,
|
||||
int32_t* blockSize) {
|
||||
int32_t retryTimes = 0;
|
||||
|
@ -166,8 +171,21 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu
|
|||
*blockSize = 0;
|
||||
|
||||
// no available token in bucket for sink task, let's wait for a little bit
|
||||
if (taskLevel == TASK_LEVEL__SINK && (!streamTaskExtractAvailableToken(pTask->pTokenBucket, pTask->id.idStr))) {
|
||||
if (taskLevel == TASK_LEVEL__SINK && (!streamTaskExtractAvailableToken(pTask->outputInfo.pTokenBucket, pTask->id.idStr))) {
|
||||
stDebug("s-task:%s no available token in bucket for sink data, wait", id);
|
||||
|
||||
// if (streamTaskAllUpstreamClosed(pTask)) {
|
||||
// int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
|
||||
// stDebug("s-task:%s try start task in %dms in tmr, since all upstream inputQ is closed, ref:%d", pTask->id.idStr,
|
||||
// SINK_TASK_IDLE_DURATION, ref);
|
||||
//
|
||||
// if (pTask->outputInfo.pTimer == NULL) {
|
||||
// pTask->outputInfo.pTimer = taosTmrStart(doLaunchSinkTask, SINK_TASK_IDLE_DURATION, pTask, streamEnv.timer);
|
||||
// } else {
|
||||
// taosTmrReset(doLaunchSinkTask, SINK_TASK_IDLE_DURATION, pTask, streamEnv.timer, &pTask->outputInfo.pTimer);
|
||||
// }
|
||||
// }
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -188,10 +206,10 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu
|
|||
if (*numOfBlocks > 0) {
|
||||
*blockSize = streamQueueItemGetSize(*pInput);
|
||||
if (taskLevel == TASK_LEVEL__SINK) {
|
||||
streamTaskConsumeQuota(pTask->pTokenBucket, *blockSize);
|
||||
streamTaskConsumeQuota(pTask->outputInfo.pTokenBucket, *blockSize);
|
||||
}
|
||||
} else {
|
||||
streamTaskPutbackToken(pTask->pTokenBucket);
|
||||
streamTaskPutbackToken(pTask->outputInfo.pTokenBucket);
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -207,7 +225,7 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu
|
|||
stDebug("s-task:%s %s msg extracted, start to process immediately", id, p);
|
||||
|
||||
// restore the token to bucket in case of checkpoint/trans-state msg
|
||||
streamTaskPutbackToken(pTask->pTokenBucket);
|
||||
streamTaskPutbackToken(pTask->outputInfo.pTokenBucket);
|
||||
*blockSize = 0;
|
||||
*numOfBlocks = 1;
|
||||
*pInput = qItem;
|
||||
|
@ -216,7 +234,7 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu
|
|||
stDebug("s-task:%s %s msg extracted, handle previous blocks, numOfBlocks:%d", id, p, *numOfBlocks);
|
||||
*blockSize = streamQueueItemGetSize(*pInput);
|
||||
if (taskLevel == TASK_LEVEL__SINK) {
|
||||
streamTaskConsumeQuota(pTask->pTokenBucket, *blockSize);
|
||||
streamTaskConsumeQuota(pTask->outputInfo.pTokenBucket, *blockSize);
|
||||
}
|
||||
|
||||
streamQueueProcessFail(pTask->inputInfo.queue);
|
||||
|
@ -237,7 +255,7 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu
|
|||
|
||||
*blockSize = streamQueueItemGetSize(*pInput);
|
||||
if (taskLevel == TASK_LEVEL__SINK) {
|
||||
streamTaskConsumeQuota(pTask->pTokenBucket, *blockSize);
|
||||
streamTaskConsumeQuota(pTask->outputInfo.pTokenBucket, *blockSize);
|
||||
}
|
||||
|
||||
streamQueueProcessFail(pTask->inputInfo.queue);
|
||||
|
@ -255,7 +273,7 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu
|
|||
|
||||
*blockSize = streamQueueItemGetSize(*pInput);
|
||||
if (taskLevel == TASK_LEVEL__SINK) {
|
||||
streamTaskConsumeQuota(pTask->pTokenBucket, *blockSize);
|
||||
streamTaskConsumeQuota(pTask->outputInfo.pTokenBucket, *blockSize);
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -350,15 +368,15 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem)
|
|||
|
||||
// the result should be put into the outputQ in any cases, otherwise, the result may be lost
|
||||
int32_t streamTaskPutDataIntoOutputQ(SStreamTask* pTask, SStreamDataBlock* pBlock) {
|
||||
STaosQueue* pQueue = pTask->outputInfo.queue->pQueue;
|
||||
STaosQueue* pQueue = pTask->outputq.queue->pQueue;
|
||||
|
||||
while (streamQueueIsFull(pTask->outputInfo.queue)) {
|
||||
while (streamQueueIsFull(pTask->outputq.queue)) {
|
||||
if (streamTaskShouldStop(&pTask->status)) {
|
||||
stInfo("s-task:%s discard result block due to task stop", pTask->id.idStr);
|
||||
return TSDB_CODE_STREAM_EXEC_CANCELLED;
|
||||
}
|
||||
|
||||
int32_t total = streamQueueGetNumOfItems(pTask->outputInfo.queue);
|
||||
int32_t total = streamQueueGetNumOfItems(pTask->outputq.queue);
|
||||
double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue));
|
||||
// let's wait for there are enough space to hold this result pBlock
|
||||
stDebug("s-task:%s outputQ is full, wait for 500ms and retry, outputQ items:%d, size:%.2fMiB", pTask->id.idStr,
|
||||
|
@ -368,7 +386,7 @@ int32_t streamTaskPutDataIntoOutputQ(SStreamTask* pTask, SStreamDataBlock* pBloc
|
|||
|
||||
int32_t code = taosWriteQitem(pQueue, pBlock);
|
||||
|
||||
int32_t total = streamQueueGetNumOfItems(pTask->outputInfo.queue);
|
||||
int32_t total = streamQueueGetNumOfItems(pTask->outputq.queue);
|
||||
double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue));
|
||||
if (code != 0) {
|
||||
stError("s-task:%s failed to put res into outputQ, outputQ items:%d, size:%.2fMiB code:%s, result lost",
|
||||
|
|
|
@ -40,7 +40,7 @@ static void streamTaskSetReady(SStreamTask* pTask, int32_t numOfReqs) {
|
|||
int32_t vgId = pMeta->vgId;
|
||||
|
||||
if (pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY && pTask->info.taskLevel != TASK_LEVEL__SOURCE) {
|
||||
pTask->numOfWaitingUpstream = taosArrayGetSize(pTask->pUpstreamInfoList);
|
||||
pTask->numOfWaitingUpstream = taosArrayGetSize(pTask->upstreamInfo.pList);
|
||||
stDebug("s-task:%s level:%d task wait for %d upstream tasks complete scan-history procedure, status:%s",
|
||||
pTask->id.idStr, pTask->info.taskLevel, pTask->numOfWaitingUpstream,
|
||||
streamGetTaskStatusStr(pTask->status.taskStatus));
|
||||
|
@ -144,8 +144,8 @@ static int32_t doCheckDownstreamStatus(SStreamTask* pTask) {
|
|||
// serialize streamProcessScanHistoryFinishRsp
|
||||
if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
|
||||
req.reqId = tGenIdPI64();
|
||||
req.downstreamNodeId = pTask->fixedDispatcher.nodeId;
|
||||
req.downstreamTaskId = pTask->fixedDispatcher.taskId;
|
||||
req.downstreamNodeId = pTask->outputInfo.fixedDispatcher.nodeId;
|
||||
req.downstreamTaskId = pTask->outputInfo.fixedDispatcher.taskId;
|
||||
pTask->checkReqId = req.reqId;
|
||||
|
||||
stDebug("s-task:%s (vgId:%d) stage:%" PRId64 " check single downstream task:0x%x(vgId:%d) ver:%" PRId64 "-%" PRId64
|
||||
|
@ -153,9 +153,9 @@ static int32_t doCheckDownstreamStatus(SStreamTask* pTask) {
|
|||
pTask->id.idStr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId,
|
||||
pRange->range.minVer, pRange->range.maxVer, pWindow->skey, pWindow->ekey, req.reqId);
|
||||
|
||||
streamSendCheckMsg(pTask, &req, pTask->fixedDispatcher.nodeId, &pTask->fixedDispatcher.epSet);
|
||||
streamSendCheckMsg(pTask, &req, pTask->outputInfo.fixedDispatcher.nodeId, &pTask->outputInfo.fixedDispatcher.epSet);
|
||||
} else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
||||
SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
|
||||
SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos;
|
||||
|
||||
int32_t numOfVgs = taosArrayGetSize(vgInfo);
|
||||
pTask->notReadyTasks = numOfVgs;
|
||||
|
@ -225,9 +225,9 @@ static void recheckDownstreamTasks(void* param, void* tmrId) {
|
|||
if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
|
||||
stDebug("s-task:%s (vgId:%d) check downstream task:0x%x (vgId:%d) stage:%" PRId64 " (recheck)", pTask->id.idStr,
|
||||
pTask->info.nodeId, pReq->downstreamTaskId, pReq->downstreamNodeId, pReq->stage);
|
||||
streamSendCheckMsg(pTask, pReq, pReq->downstreamNodeId, &pTask->fixedDispatcher.epSet);
|
||||
streamSendCheckMsg(pTask, pReq, pReq->downstreamNodeId, &pTask->outputInfo.fixedDispatcher.epSet);
|
||||
} else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
||||
SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
|
||||
SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos;
|
||||
|
||||
int32_t numOfVgs = taosArrayGetSize(vgInfo);
|
||||
for (int32_t i = 0; i < numOfVgs; i++) {
|
||||
|
@ -241,7 +241,7 @@ static void recheckDownstreamTasks(void* param, void* tmrId) {
|
|||
}
|
||||
|
||||
destroyRecheckInfo(pInfo);
|
||||
int8_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
|
||||
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
|
||||
stDebug("s-task:%s complete send check in timer, ref:%d", pTask->id.idStr, ref);
|
||||
}
|
||||
|
||||
|
@ -341,7 +341,7 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
|
|||
|
||||
doProcessDownstreamReadyRsp(pTask, numOfReqs);
|
||||
} else {
|
||||
int32_t total = taosArrayGetSize(pTask->shuffleDispatcher.dbInfo.pVgroupInfos);
|
||||
int32_t total = taosArrayGetSize(pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos);
|
||||
stDebug("s-task:%s (vgId:%d) recv check rsp from task:0x%x (vgId:%d) status:%d, total:%d not ready:%d", id,
|
||||
pRsp->upstreamNodeId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->status, total, left);
|
||||
}
|
||||
|
@ -367,7 +367,7 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
|
|||
} else {
|
||||
STaskRecheckInfo* pInfo = createRecheckInfo(pTask, pRsp);
|
||||
|
||||
int8_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
|
||||
int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
|
||||
stDebug("s-task:%s downstream taskId:0x%x (vgId:%d) not ready, stage:%d, retry in 100ms, ref:%d ", id,
|
||||
pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->oldStage, ref);
|
||||
|
||||
|
@ -528,7 +528,7 @@ int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, SStreamScanHistory
|
|||
ASSERT(left >= 0);
|
||||
|
||||
if (left == 0) {
|
||||
int32_t numOfTasks = taosArrayGetSize(pTask->pUpstreamInfoList);
|
||||
int32_t numOfTasks = taosArrayGetSize(pTask->upstreamInfo.pList);
|
||||
stDebug(
|
||||
"s-task:%s all %d upstream tasks finish scan-history data, set param for agg task for stream data and send "
|
||||
"rsp to all upstream tasks",
|
||||
|
@ -640,7 +640,7 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) {
|
|||
}
|
||||
|
||||
if (pHTaskInfo->retryTimes > MAX_RETRY_LAUNCH_HISTORY_TASK) {
|
||||
int8_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
|
||||
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
|
||||
streamMetaReleaseTask(pMeta, pTask);
|
||||
|
||||
stError("s-task:%s max retry:%d reached, quit from retrying launch related fill-history task:0x%x, ref:%d",
|
||||
|
@ -672,7 +672,7 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) {
|
|||
}
|
||||
|
||||
// not in timer anymore
|
||||
int8_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
|
||||
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
|
||||
stDebug("s-task:0x%x fill-history task launch completed, retry times:%d, ref:%d", (int32_t)pInfo->id.taskId,
|
||||
pHTaskInfo->retryTimes, ref);
|
||||
streamMetaReleaseTask(pMeta, pTask);
|
||||
|
|
Loading…
Reference in New Issue