fix(stream): do some internal refactor.
This commit is contained in:
parent
b25e6d3250
commit
4490f40c96
|
@ -174,7 +174,7 @@ SStreamDataBlock* createStreamBlockFromResults(SStreamQueueItem* pItem, SStreamT
|
|||
SArray* pRes);
|
||||
void destroyStreamDataBlock(SStreamDataBlock* pBlock);
|
||||
|
||||
int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock* pData);
|
||||
int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock* pData, const char* idstr);
|
||||
int32_t streamBroadcastToUpTasks(SStreamTask* pTask, const SSDataBlock* pBlock);
|
||||
|
||||
int32_t streamSendCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pReq, int32_t nodeId, SEpSet* pEpSet);
|
||||
|
|
|
@ -104,10 +104,12 @@ void destroyStreamDataBlock(SStreamDataBlock* pBlock) {
|
|||
taosFreeQitem(pBlock);
|
||||
}
|
||||
|
||||
int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock* pData) {
|
||||
int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock* pData, const char* id) {
|
||||
SArray* pArray = taosArrayInit(1, sizeof(SSDataBlock));
|
||||
if (pArray == NULL) {
|
||||
return -1;
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
stError("failed to prepare retrieve block, %s", id);
|
||||
return terrno;
|
||||
}
|
||||
|
||||
taosArrayPush(pArray, &(SSDataBlock){0});
|
||||
|
@ -126,7 +128,7 @@ int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock
|
|||
pData->reqId = pReq->reqId;
|
||||
pData->blocks = pArray;
|
||||
|
||||
return 0;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
SStreamDataSubmit* streamDataSubmitNew(SPackedData* pData, int32_t type) {
|
||||
|
|
|
@ -944,32 +944,36 @@ char* createStreamTaskIdStr(int64_t streamId, int32_t taskId) {
|
|||
|
||||
static int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pReq) {
|
||||
SStreamDataBlock* pData = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, sizeof(SStreamDataBlock));
|
||||
int8_t status = TASK_INPUT_STATUS__NORMAL;
|
||||
|
||||
// enqueue
|
||||
if (pData != NULL) {
|
||||
stDebug("s-task:%s (child %d) recv retrieve req from task:0x%x(vgId:%d), reqId:0x%" PRIx64, pTask->id.idStr,
|
||||
pTask->info.selfChildId, pReq->srcTaskId, pReq->srcNodeId, pReq->reqId);
|
||||
|
||||
pData->type = STREAM_INPUT__DATA_RETRIEVE;
|
||||
pData->srcVgId = 0;
|
||||
streamRetrieveReqToData(pReq, pData);
|
||||
if (streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pData) == 0) {
|
||||
status = TASK_INPUT_STATUS__NORMAL;
|
||||
} else {
|
||||
status = TASK_INPUT_STATUS__FAILED;
|
||||
}
|
||||
} else { // todo handle oom
|
||||
/*streamTaskInputFail(pTask);*/
|
||||
/*status = TASK_INPUT_STATUS__FAILED;*/
|
||||
if (pData == NULL) {
|
||||
stError("s-task:%s failed to allocated retrieve-block", pTask->id.idStr);
|
||||
return terrno;
|
||||
}
|
||||
|
||||
return status == TASK_INPUT_STATUS__NORMAL ? 0 : -1;
|
||||
// enqueue
|
||||
stDebug("s-task:%s (vgId:%d level:%d) recv retrieve req from task:0x%x(vgId:%d), reqId:0x%" PRIx64, pTask->id.idStr,
|
||||
pTask->pMeta->vgId, pTask->info.taskLevel, pReq->srcTaskId, pReq->srcNodeId, pReq->reqId);
|
||||
|
||||
pData->type = STREAM_INPUT__DATA_RETRIEVE;
|
||||
pData->srcVgId = 0;
|
||||
|
||||
int32_t code = streamRetrieveReqToData(pReq, pData, pTask->id.idStr);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
taosFreeQitem(pData);
|
||||
return code;
|
||||
}
|
||||
|
||||
code = streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pData);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
stError("s-task:%s failed to put retrieve-block into inputQ, inputQ is full, discard the retrieve msg",
|
||||
pTask->id.idStr);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq) {
|
||||
int32_t code = streamTaskEnqueueRetrieve(pTask, pReq);
|
||||
if(code != 0){
|
||||
if (code != 0) {
|
||||
return code;
|
||||
}
|
||||
return streamTrySchedExec(pTask);
|
||||
|
|
Loading…
Reference in New Issue