refactor: do some internal refactor.

This commit is contained in:
Haojun Liao 2023-07-13 16:32:25 +08:00
parent af7f78ada2
commit 813f4cb363
9 changed files with 53 additions and 24 deletions

View File

@ -249,6 +249,7 @@ typedef struct SStreamChildEpInfo {
int32_t childId;
int32_t taskId;
SEpSet epSet;
bool dataAllowed; // denote if the data from this upstream task is allowed to put into inputQ, not serialize it
} SStreamChildEpInfo;
typedef struct SStreamId {
@ -310,8 +311,9 @@ struct SStreamTask {
SHistDataRange dataRange;
SStreamId historyTaskId;
SStreamId streamTaskId;
SArray* pUpstreamEpInfoList; // SArray<SStreamChildEpInfo*>, // children info
SArray* pRpcMsgList; // SArray<SRpcMsg*>
SArray* pUpstreamInfoList; // SArray<SStreamChildEpInfo*>, // children info
SArray* pRpcMsgList; // SArray<SRpcMsg*>
// output
union {
STaskDispatcherFixedEp fixedEpDispatcher;
@ -554,6 +556,8 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S
int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, int32_t code);
int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pMsg);
void streamTaskOpenUpstreamInput(SStreamTask* pTask);
void streamTaskCloseUpstreamInput(SStreamTask* pTask, int32_t taskId);
void streamTaskInputFail(SStreamTask* pTask);
int32_t streamTryExec(SStreamTask* pTask);

View File

@ -301,11 +301,11 @@ int32_t setEpToDownstreamTask(SStreamTask* pTask, SStreamTask* pDownstream) {
return TSDB_CODE_OUT_OF_MEMORY;
}
if (pDownstream->pUpstreamEpInfoList == NULL) {
pDownstream->pUpstreamEpInfoList = taosArrayInit(4, POINTER_BYTES);
if (pDownstream->pUpstreamInfoList == NULL) {
pDownstream->pUpstreamInfoList = taosArrayInit(4, POINTER_BYTES);
}
taosArrayPush(pDownstream->pUpstreamEpInfoList, &pEpInfo);
taosArrayPush(pDownstream->pUpstreamInfoList, &pEpInfo);
return TSDB_CODE_SUCCESS;
}

View File

@ -59,7 +59,7 @@ FAIL:
}
int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t ver) {
ASSERT(pTask->info.taskLevel == TASK_LEVEL__AGG && taosArrayGetSize(pTask->pUpstreamEpInfoList) != 0);
ASSERT(pTask->info.taskLevel == TASK_LEVEL__AGG && taosArrayGetSize(pTask->pUpstreamInfoList) != 0);
pTask->refCnt = 1;
pTask->id.idStr = createStreamTaskIdStr(pTask->id.streamId, pTask->id.taskId);
@ -82,7 +82,7 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t ver) {
return -1;
}
int32_t numOfChildEp = taosArrayGetSize(pTask->pUpstreamEpInfoList);
int32_t numOfChildEp = taosArrayGetSize(pTask->pUpstreamInfoList);
SReadHandle handle = { .vnode = NULL, .numOfVgroups = numOfChildEp, .pStateBackend = pTask->pState, .fillHistory = pTask->info.fillHistory };
initStreamStateAPI(&handle.api);

View File

@ -814,7 +814,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
return -1;
}
int32_t numOfVgroups = (int32_t)taosArrayGetSize(pTask->pUpstreamEpInfoList);
int32_t numOfVgroups = (int32_t)taosArrayGetSize(pTask->pUpstreamInfoList);
SReadHandle handle = {
.checkpointId = pTask->chkInfo.checkpointId,
.vnode = NULL,
@ -865,6 +865,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
pTask->exec.pWalReader = walOpenReader(pTq->pVnode->pWal, &cond);
}
streamTaskOpenUpstreamInput(pTask);
streamSetupScheduleTrigger(pTask);
SCheckpointInfo* pChkInfo = &pTask->chkInfo;

View File

@ -356,3 +356,26 @@ void* streamQueueNextItem(SStreamQueue* pQueue) {
}
void streamTaskInputFail(SStreamTask* pTask) { atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__FAILED); }
void streamTaskOpenUpstreamInput(SStreamTask* pTask) {
int32_t num = taosArrayGetSize(pTask->pUpstreamInfoList);
if (num == 0) {
return;
}
for(int32_t i = 0; i < num; ++i) {
SStreamChildEpInfo* pInfo = taosArrayGet(pTask->pUpstreamInfoList, i);
pInfo->dataAllowed = true;
}
}
void streamTaskCloseUpstreamInput(SStreamTask* pTask, int32_t taskId) {
int32_t num = taosArrayGetSize(pTask->pUpstreamInfoList);
for(int32_t i = 0; i < num; ++i) {
SStreamChildEpInfo* pInfo = taosArrayGet(pTask->pUpstreamInfoList, i);
if (pInfo->taskId == taskId) {
pInfo->dataAllowed = false;
break;
}
}
}

View File

@ -114,7 +114,7 @@ int32_t tDecodeStreamCheckpointRsp(SDecoder* pDecoder, SStreamCheckpointRsp* pRs
}
static int32_t streamAlignCheckpoint(SStreamTask* pTask, int64_t checkpointId, int32_t childId) {
int32_t num = taosArrayGetSize(pTask->pUpstreamEpInfoList);
int32_t num = taosArrayGetSize(pTask->pUpstreamInfoList);
int64_t old = atomic_val_compare_exchange_32(&pTask->checkpointAlignCnt, 0, num);
if (old == 0) {
qDebug("s-task:%s set initial align upstream num:%d", pTask->id.idStr, num);
@ -180,13 +180,14 @@ int32_t streamProcessCheckpointReq(SStreamTask* pTask, SStreamCheckpointReq* pRe
streamSchedExec(pTask);
qDebug("s-task:%s sink task set to checkpoint ready, start to send rsp to upstream", pTask->id.idStr);
} else {
// todo close the inputQ for data from childId, which means data from childId are not allowed to put into intpuQ
// anymore
ASSERT(taosArrayGetSize(pTask->pUpstreamEpInfoList) > 0);
ASSERT(taosArrayGetSize(pTask->pUpstreamInfoList) > 0);
// close the inputQ for data from upstream task.
streamTaskCloseUpstreamInput(pTask, pReq->upstreamTaskId);
// there are still some upstream tasks not send checkpoint request, do nothing and wait for then
int32_t notReady = streamAlignCheckpoint(pTask, checkpointId, childId);
int32_t num = taosArrayGetSize(pTask->pUpstreamEpInfoList);
int32_t num = taosArrayGetSize(pTask->pUpstreamInfoList);
if (notReady > 0) {
qDebug("s-task:%s received checkpoint req, %d upstream tasks not send checkpoint info yet, total:%d",
pTask->id.idStr, notReady, num);

View File

@ -157,11 +157,11 @@ int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock)
.retrieveLen = dataStrLen,
};
int32_t sz = taosArrayGetSize(pTask->pUpstreamEpInfoList);
int32_t sz = taosArrayGetSize(pTask->pUpstreamInfoList);
ASSERT(sz > 0);
for (int32_t i = 0; i < sz; i++) {
req.reqId = tGenIdPI64();
SStreamChildEpInfo* pEpInfo = taosArrayGetP(pTask->pUpstreamEpInfoList, i);
SStreamChildEpInfo* pEpInfo = taosArrayGetP(pTask->pUpstreamInfoList, i);
req.dstNodeId = pEpInfo->nodeId;
req.dstTaskId = pEpInfo->taskId;
int32_t len;
@ -516,7 +516,7 @@ int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask) {
// this function is usually invoked by sink/agg task
int32_t streamTaskSendCheckpointRsp(SStreamTask* pTask) {
int32_t num = taosArrayGetSize(pTask->pRpcMsgList);
ASSERT(taosArrayGetSize(pTask->pUpstreamEpInfoList) == num);
ASSERT(taosArrayGetSize(pTask->pUpstreamInfoList) == num);
qDebug("s-task:%s level:%d checkpoint completed msg sent to %d upstream tasks", pTask->id.idStr, pTask->info.taskLevel,
num);

View File

@ -355,7 +355,7 @@ int32_t streamDispatchTransferStateMsg(SStreamTask* pTask) {
// agg
int32_t streamAggScanHistoryPrepare(SStreamTask* pTask) {
pTask->numOfWaitingUpstream = taosArrayGetSize(pTask->pUpstreamEpInfoList);
pTask->numOfWaitingUpstream = taosArrayGetSize(pTask->pUpstreamInfoList);
qDebug("s-task:%s agg task is ready and wait for %d upstream tasks complete scan-history procedure", pTask->id.idStr,
pTask->numOfWaitingUpstream);
return 0;
@ -379,7 +379,7 @@ int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, int32_t taskId, in
ASSERT(left >= 0);
if (left == 0) {
int32_t numOfTasks = taosArrayGetSize(pTask->pUpstreamEpInfoList);
int32_t numOfTasks = taosArrayGetSize(pTask->pUpstreamInfoList);
qDebug("s-task:%s all %d upstream tasks finish scan-history data, set param for agg task for stream data",
pTask->id.idStr, numOfTasks);

View File

@ -99,10 +99,10 @@ int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
if (tEncodeI64(pEncoder, pTask->dataRange.window.skey)) return -1;
if (tEncodeI64(pEncoder, pTask->dataRange.window.ekey)) return -1;
int32_t epSz = taosArrayGetSize(pTask->pUpstreamEpInfoList);
int32_t epSz = taosArrayGetSize(pTask->pUpstreamInfoList);
if (tEncodeI32(pEncoder, epSz) < 0) return -1;
for (int32_t i = 0; i < epSz; i++) {
SStreamChildEpInfo* pInfo = taosArrayGetP(pTask->pUpstreamEpInfoList, i);
SStreamChildEpInfo* pInfo = taosArrayGetP(pTask->pUpstreamInfoList, i);
if (tEncodeStreamEpInfo(pEncoder, pInfo) < 0) return -1;
}
@ -165,7 +165,7 @@ int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
int32_t epSz;
if (tDecodeI32(pDecoder, &epSz) < 0) return -1;
pTask->pUpstreamEpInfoList = taosArrayInit(epSz, POINTER_BYTES);
pTask->pUpstreamInfoList = taosArrayInit(epSz, POINTER_BYTES);
for (int32_t i = 0; i < epSz; i++) {
SStreamChildEpInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamChildEpInfo));
if (pInfo == NULL) return -1;
@ -173,7 +173,7 @@ int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
taosMemoryFreeClear(pInfo);
return -1;
}
taosArrayPush(pTask->pUpstreamEpInfoList, &pInfo);
taosArrayPush(pTask->pUpstreamInfoList, &pInfo);
}
if (pTask->info.taskLevel != TASK_LEVEL__SINK) {
@ -226,7 +226,7 @@ void tFreeStreamTask(SStreamTask* pTask) {
walCloseReader(pTask->exec.pWalReader);
}
taosArrayDestroyP(pTask->pUpstreamEpInfoList, taosMemoryFree);
taosArrayDestroyP(pTask->pUpstreamInfoList, taosMemoryFree);
if (pTask->outputType == TASK_OUTPUT__TABLE) {
tDeleteSchemaWrapper(pTask->tbSink.pSchemaWrapper);
taosMemoryFree(pTask->tbSink.pTSchema);