refactor(stream): do some internal refactor.
This commit is contained in:
parent
06cf3588da
commit
cb26dd9fa2
|
@ -292,8 +292,16 @@ typedef struct SSTaskBasicInfo {
|
||||||
typedef struct SDispatchMsgInfo {
|
typedef struct SDispatchMsgInfo {
|
||||||
void* pData; // current dispatch data
|
void* pData; // current dispatch data
|
||||||
int16_t msgType; // dispatch msg type
|
int16_t msgType; // dispatch msg type
|
||||||
|
int32_t retryCount; // retry send data count
|
||||||
|
int64_t blockingTs; // output blocking timestamp
|
||||||
} SDispatchMsgInfo;
|
} SDispatchMsgInfo;
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
int8_t outputType;
|
||||||
|
int8_t outputStatus;
|
||||||
|
SStreamQueue* outputQueue;
|
||||||
|
} SSTaskOutputInfo;
|
||||||
|
|
||||||
struct SStreamTask {
|
struct SStreamTask {
|
||||||
SStreamId id;
|
SStreamId id;
|
||||||
SSTaskBasicInfo info;
|
SSTaskBasicInfo info;
|
||||||
|
@ -536,7 +544,9 @@ int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq);
|
||||||
int32_t tDecodeStreamRetrieveReq(SDecoder* pDecoder, SStreamRetrieveReq* pReq);
|
int32_t tDecodeStreamRetrieveReq(SDecoder* pDecoder, SStreamRetrieveReq* pReq);
|
||||||
void tDeleteStreamRetrieveReq(SStreamRetrieveReq* pReq);
|
void tDeleteStreamRetrieveReq(SStreamRetrieveReq* pReq);
|
||||||
|
|
||||||
void tDeleteStreamDispatchReq(SStreamDispatchReq* pReq);
|
int32_t tInitStreamDispatchReq(SStreamDispatchReq* pReq, const SStreamTask* pTask, int32_t vgId, int32_t numOfBlocks,
|
||||||
|
int64_t dstTaskId);
|
||||||
|
void tDeleteStreamDispatchReq(SStreamDispatchReq* pReq);
|
||||||
|
|
||||||
int32_t streamSetupTrigger(SStreamTask* pTask);
|
int32_t streamSetupTrigger(SStreamTask* pTask);
|
||||||
|
|
||||||
|
|
|
@ -33,6 +33,7 @@ typedef struct {
|
||||||
|
|
||||||
extern SStreamGlobalEnv streamEnv;
|
extern SStreamGlobalEnv streamEnv;
|
||||||
|
|
||||||
|
void streamRetryDispatchStreamBlock(SStreamTask* pTask, int64_t waitDuration);
|
||||||
int32_t streamDispatchStreamBlock(SStreamTask* pTask);
|
int32_t streamDispatchStreamBlock(SStreamTask* pTask);
|
||||||
|
|
||||||
SStreamDataBlock* createStreamDataFromDispatchMsg(const SStreamDispatchReq* pReq, int32_t blockType, int32_t srcVg);
|
SStreamDataBlock* createStreamDataFromDispatchMsg(const SStreamDispatchReq* pReq, int32_t blockType, int32_t srcVg);
|
||||||
|
@ -44,6 +45,7 @@ int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock)
|
||||||
|
|
||||||
int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const SStreamRetrieveReq* pReq);
|
int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const SStreamRetrieveReq* pReq);
|
||||||
|
|
||||||
|
int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pData);
|
||||||
int32_t streamDispatchCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pReq, int32_t nodeId, SEpSet* pEpSet);
|
int32_t streamDispatchCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pReq, int32_t nodeId, SEpSet* pEpSet);
|
||||||
|
|
||||||
int32_t streamDoDispatchRecoverFinishMsg(SStreamTask* pTask, const SStreamRecoverFinishReq* pReq, int32_t vgId,
|
int32_t streamDoDispatchRecoverFinishMsg(SStreamTask* pTask, const SStreamRecoverFinishReq* pReq, int32_t vgId,
|
||||||
|
|
|
@ -235,9 +235,26 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// todo record the idle time for dispatch data
|
||||||
int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, int32_t code) {
|
int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, int32_t code) {
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
// dispatch message failed: network error, or node not available.
|
||||||
|
// in case of the input queue is full, the code will be TSDB_CODE_SUCCESS, the and pRsp>inputStatus will be set
|
||||||
|
// flag. here we need to retry dispatch this message to downstream task immediately. handle the case the failure
|
||||||
|
// happened too fast. todo handle the shuffle dispatch failure
|
||||||
|
qError("s-task:%s failed to dispatch msg to task:0x%x, code:%s, retry cnt:%d", pTask->id.idStr,
|
||||||
|
pRsp->downstreamTaskId, tstrerror(code), ++pTask->msgInfo.retryCount);
|
||||||
|
int32_t ret = streamDispatchAllBlocks(pTask, pTask->msgInfo.pData);
|
||||||
|
if (ret != TSDB_CODE_SUCCESS) {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
qDebug("s-task:%s receive dispatch rsp, output status:%d code:%d", pTask->id.idStr, pRsp->inputStatus, code);
|
qDebug("s-task:%s receive dispatch rsp, output status:%d code:%d", pTask->id.idStr, pRsp->inputStatus, code);
|
||||||
|
|
||||||
|
// there are other dispatch message not response yet
|
||||||
if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
||||||
int32_t leftRsp = atomic_sub_fetch_32(&pTask->shuffleDispatcher.waitingRspCnt, 1);
|
int32_t leftRsp = atomic_sub_fetch_32(&pTask->shuffleDispatcher.waitingRspCnt, 1);
|
||||||
qDebug("s-task:%s is shuffle, left waiting rsp %d", pTask->id.idStr, leftRsp);
|
qDebug("s-task:%s is shuffle, left waiting rsp %d", pTask->id.idStr, leftRsp);
|
||||||
|
@ -246,23 +263,31 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pTask->msgInfo.retryCount = 0;
|
||||||
int8_t old = atomic_exchange_8(&pTask->outputStatus, pRsp->inputStatus);
|
int8_t old = atomic_exchange_8(&pTask->outputStatus, pRsp->inputStatus);
|
||||||
ASSERT(old == TASK_OUTPUT_STATUS__WAIT);
|
ASSERT(old == TASK_OUTPUT_STATUS__WAIT);
|
||||||
|
|
||||||
|
qDebug("s-task:%s output status is set to:%d", pTask->id.idStr, pTask->outputStatus);
|
||||||
|
|
||||||
// the input queue of the (down stream) task that receive the output data is full, so the TASK_INPUT_STATUS_BLOCKED is rsp
|
// the input queue of the (down stream) task that receive the output data is full, so the TASK_INPUT_STATUS_BLOCKED is rsp
|
||||||
// todo we need to send EMPTY PACKAGE to detect if the input queue is available for output of upstream task, every 50 ms.
|
if (pTask->outputStatus == TASK_INPUT_STATUS__BLOCKED) {
|
||||||
if (pRsp->inputStatus == TASK_INPUT_STATUS__BLOCKED) {
|
pTask->msgInfo.blockingTs = taosGetTimestampMs(); // record the blocking start time
|
||||||
// TODO: init recover timer
|
|
||||||
qError("s-task:%s inputQ of downstream task:0x%x is full, need to block output", pTask->id.idStr, pRsp->downstreamTaskId);
|
int32_t waitDuration = 300; // 300 ms
|
||||||
|
qError("s-task:%s inputQ of downstream task:0x%x is full, time:%" PRId64 "wait for %dms and retry dispatch data",
|
||||||
|
pTask->id.idStr, pRsp->downstreamTaskId, pTask->msgInfo.blockingTs, waitDuration);
|
||||||
|
streamRetryDispatchStreamBlock(pTask, waitDuration);
|
||||||
|
} else { // pipeline send data in output queue
|
||||||
|
// this message has been sent successfully, let's try next one.
|
||||||
|
destroyStreamDataBlock(pTask->msgInfo.pData);
|
||||||
|
pTask->msgInfo.pData = NULL;
|
||||||
|
|
||||||
atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL);
|
atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL);
|
||||||
qError("s-task:%s ignore error, and reset task output status:%d", pTask->id.idStr, pTask->outputStatus);
|
|
||||||
|
|
||||||
return 0;
|
// otherwise, continue dispatch the first block to down stream task in pipeline
|
||||||
|
streamDispatchStreamBlock(pTask);
|
||||||
}
|
}
|
||||||
|
|
||||||
// otherwise, continue dispatch the first block to down stream task in pipeline
|
|
||||||
streamDispatchStreamBlock(pTask);
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -271,7 +296,7 @@ int32_t streamProcessRunReq(SStreamTask* pTask) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
/*if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {*/
|
/*if (pTask->dispatchType == TASK_OUTPUT__FIXED_DISPATCH || pTask->dispatchType == TASK_OUTPUT__SHUFFLE_DISPATCH) {*/
|
||||||
/*streamDispatchStreamBlock(pTask);*/
|
/*streamDispatchStreamBlock(pTask);*/
|
||||||
/*}*/
|
/*}*/
|
||||||
return 0;
|
return 0;
|
||||||
|
|
|
@ -13,10 +13,9 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
#include <util/ttimer.h>
|
||||||
#include "streamInc.h"
|
#include "streamInc.h"
|
||||||
|
|
||||||
static int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pData);
|
|
||||||
|
|
||||||
static int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatchReq* pReq) {
|
static int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatchReq* pReq) {
|
||||||
if (tStartEncode(pEncoder) < 0) return -1;
|
if (tStartEncode(pEncoder) < 0) return -1;
|
||||||
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
|
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
|
||||||
|
@ -98,6 +97,27 @@ int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t tInitStreamDispatchReq(SStreamDispatchReq* pReq, const SStreamTask* pTask, int32_t vgId, int32_t numOfBlocks,
|
||||||
|
int64_t dstTaskId) {
|
||||||
|
pReq->streamId = pTask->id.streamId;
|
||||||
|
pReq->dataSrcVgId = vgId;
|
||||||
|
pReq->upstreamTaskId = pTask->id.taskId;
|
||||||
|
pReq->upstreamChildId = pTask->info.selfChildId;
|
||||||
|
pReq->upstreamNodeId = pTask->info.nodeId;
|
||||||
|
pReq->blockNum = numOfBlocks;
|
||||||
|
pReq->taskId = dstTaskId;
|
||||||
|
|
||||||
|
pReq->data = taosArrayInit(numOfBlocks, POINTER_BYTES);
|
||||||
|
pReq->dataLen = taosArrayInit(numOfBlocks, sizeof(int32_t));
|
||||||
|
if (pReq->data == NULL || pReq->dataLen == NULL) {
|
||||||
|
taosArrayDestroyP(pReq->data, taosMemoryFree);
|
||||||
|
taosArrayDestroy(pReq->dataLen);
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
void tDeleteStreamDispatchReq(SStreamDispatchReq* pReq) {
|
void tDeleteStreamDispatchReq(SStreamDispatchReq* pReq) {
|
||||||
taosArrayDestroyP(pReq->data, taosMemoryFree);
|
taosArrayDestroyP(pReq->data, taosMemoryFree);
|
||||||
taosArrayDestroy(pReq->dataLen);
|
taosArrayDestroy(pReq->dataLen);
|
||||||
|
@ -248,7 +268,7 @@ int32_t streamDispatchCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pR
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamDoDispatchRecoverFinishMsg(SStreamTask* pTask, const SStreamRecoverFinishReq* pReq, int32_t vgId,
|
int32_t streamDoDispatchRecoverFinishMsg(SStreamTask* pTask, const SStreamRecoverFinishReq* pReq, int32_t vgId,
|
||||||
SEpSet* pEpSet) {
|
SEpSet* pEpSet) {
|
||||||
void* buf = NULL;
|
void* buf = NULL;
|
||||||
int32_t code = -1;
|
int32_t code = -1;
|
||||||
SRpcMsg msg = {0};
|
SRpcMsg msg = {0};
|
||||||
|
@ -377,25 +397,17 @@ int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, S
|
||||||
|
|
||||||
int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pData) {
|
int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pData) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
int32_t numOfBlocks = taosArrayGetSize(pData->blocks);
|
int32_t numOfBlocks = taosArrayGetSize(pData->blocks);
|
||||||
ASSERT(numOfBlocks != 0);
|
ASSERT(numOfBlocks != 0);
|
||||||
|
|
||||||
if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) {
|
if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) {
|
||||||
SStreamDispatchReq req = {
|
SStreamDispatchReq req = {0};
|
||||||
.streamId = pTask->id.streamId,
|
|
||||||
.dataSrcVgId = pData->srcVgId,
|
|
||||||
.upstreamTaskId = pTask->id.taskId,
|
|
||||||
.upstreamChildId = pTask->info.selfChildId,
|
|
||||||
.upstreamNodeId = pTask->info.nodeId,
|
|
||||||
.blockNum = numOfBlocks,
|
|
||||||
};
|
|
||||||
|
|
||||||
req.data = taosArrayInit(numOfBlocks, POINTER_BYTES);
|
int32_t downstreamTaskId = pTask->fixedEpDispatcher.taskId;
|
||||||
req.dataLen = taosArrayInit(numOfBlocks, sizeof(int32_t));
|
code = tInitStreamDispatchReq(&req, pTask, pData->srcVgId, numOfBlocks, downstreamTaskId);
|
||||||
if (req.data == NULL || req.dataLen == NULL) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
taosArrayDestroyP(req.data, taosMemoryFree);
|
return code;
|
||||||
taosArrayDestroy(req.dataLen);
|
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int32_t i = 0; i < numOfBlocks; i++) {
|
for (int32_t i = 0; i < numOfBlocks; i++) {
|
||||||
|
@ -411,9 +423,6 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat
|
||||||
|
|
||||||
int32_t vgId = pTask->fixedEpDispatcher.nodeId;
|
int32_t vgId = pTask->fixedEpDispatcher.nodeId;
|
||||||
SEpSet* pEpSet = &pTask->fixedEpDispatcher.epSet;
|
SEpSet* pEpSet = &pTask->fixedEpDispatcher.epSet;
|
||||||
int32_t downstreamTaskId = pTask->fixedEpDispatcher.taskId;
|
|
||||||
|
|
||||||
req.taskId = downstreamTaskId;
|
|
||||||
|
|
||||||
qDebug("s-task:%s (child taskId:%d) fix-dispatch %d block(s) to s-task:0x%x (vgId:%d)", pTask->id.idStr,
|
qDebug("s-task:%s (child taskId:%d) fix-dispatch %d block(s) to s-task:0x%x (vgId:%d)", pTask->id.idStr,
|
||||||
pTask->info.selfChildId, numOfBlocks, downstreamTaskId, vgId);
|
pTask->info.selfChildId, numOfBlocks, downstreamTaskId, vgId);
|
||||||
|
@ -426,8 +435,9 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat
|
||||||
int32_t rspCnt = atomic_load_32(&pTask->shuffleDispatcher.waitingRspCnt);
|
int32_t rspCnt = atomic_load_32(&pTask->shuffleDispatcher.waitingRspCnt);
|
||||||
ASSERT(rspCnt == 0);
|
ASSERT(rspCnt == 0);
|
||||||
|
|
||||||
SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
|
SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
|
||||||
int32_t vgSz = taosArrayGetSize(vgInfo);
|
int32_t vgSz = taosArrayGetSize(vgInfo);
|
||||||
|
|
||||||
SStreamDispatchReq* pReqs = taosMemoryCalloc(vgSz, sizeof(SStreamDispatchReq));
|
SStreamDispatchReq* pReqs = taosMemoryCalloc(vgSz, sizeof(SStreamDispatchReq));
|
||||||
if (pReqs == NULL) {
|
if (pReqs == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
@ -435,20 +445,11 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int32_t i = 0; i < vgSz; i++) {
|
for (int32_t i = 0; i < vgSz; i++) {
|
||||||
pReqs[i].streamId = pTask->id.streamId;
|
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
|
||||||
pReqs[i].dataSrcVgId = pData->srcVgId;
|
code = tInitStreamDispatchReq(&pReqs[i], pTask, pData->srcVgId, 0, pVgInfo->taskId);
|
||||||
pReqs[i].upstreamTaskId = pTask->id.taskId;
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
pReqs[i].upstreamChildId = pTask->info.selfChildId;
|
|
||||||
pReqs[i].upstreamNodeId = pTask->info.nodeId;
|
|
||||||
pReqs[i].blockNum = 0;
|
|
||||||
pReqs[i].data = taosArrayInit(0, sizeof(void*));
|
|
||||||
pReqs[i].dataLen = taosArrayInit(0, sizeof(int32_t));
|
|
||||||
if (pReqs[i].data == NULL || pReqs[i].dataLen == NULL) {
|
|
||||||
goto FAIL_SHUFFLE_DISPATCH;
|
goto FAIL_SHUFFLE_DISPATCH;
|
||||||
}
|
}
|
||||||
|
|
||||||
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
|
|
||||||
pReqs[i].taskId = pVgInfo->taskId;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int32_t i = 0; i < numOfBlocks; i++) {
|
for (int32_t i = 0; i < numOfBlocks; i++) {
|
||||||
|
@ -456,15 +457,18 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat
|
||||||
|
|
||||||
// TODO: do not use broadcast
|
// TODO: do not use broadcast
|
||||||
if (pDataBlock->info.type == STREAM_DELETE_RESULT) {
|
if (pDataBlock->info.type == STREAM_DELETE_RESULT) {
|
||||||
|
|
||||||
for (int32_t j = 0; j < vgSz; j++) {
|
for (int32_t j = 0; j < vgSz; j++) {
|
||||||
if (streamAddBlockIntoDispatchMsg(pDataBlock, &pReqs[j]) < 0) {
|
if (streamAddBlockIntoDispatchMsg(pDataBlock, &pReqs[j]) < 0) {
|
||||||
goto FAIL_SHUFFLE_DISPATCH;
|
goto FAIL_SHUFFLE_DISPATCH;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pReqs[j].blockNum == 0) {
|
if (pReqs[j].blockNum == 0) {
|
||||||
atomic_add_fetch_32(&pTask->shuffleDispatcher.waitingRspCnt, 1);
|
atomic_add_fetch_32(&pTask->shuffleDispatcher.waitingRspCnt, 1);
|
||||||
}
|
}
|
||||||
pReqs[j].blockNum++;
|
pReqs[j].blockNum++;
|
||||||
}
|
}
|
||||||
|
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -495,13 +499,31 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat
|
||||||
taosArrayDestroyP(pReqs[i].data, taosMemoryFree);
|
taosArrayDestroyP(pReqs[i].data, taosMemoryFree);
|
||||||
taosArrayDestroy(pReqs[i].dataLen);
|
taosArrayDestroy(pReqs[i].dataLen);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosMemoryFree(pReqs);
|
taosMemoryFree(pReqs);
|
||||||
}
|
}
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void doRetryDispatchData(void* param, void* tmrId) {
|
||||||
|
SStreamTask* pTask = param;
|
||||||
|
ASSERT(pTask->outputStatus == TASK_OUTPUT_STATUS__WAIT);
|
||||||
|
|
||||||
|
int32_t code = streamDispatchAllBlocks(pTask, pTask->msgInfo.pData);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
streamRetryDispatchStreamBlock(pTask, 300);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void streamRetryDispatchStreamBlock(SStreamTask* pTask, int64_t waitDuration) {
|
||||||
|
qError("s-task:%s dispatch data in %"PRId64"ms", pTask->id.idStr, waitDuration);
|
||||||
|
taosTmrReset(doRetryDispatchData, waitDuration, pTask, streamEnv.timer, &pTask->timer);
|
||||||
|
}
|
||||||
|
|
||||||
int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
|
int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
|
||||||
ASSERT(pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH);
|
ASSERT((pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH));
|
||||||
|
|
||||||
int32_t numOfElems = taosQueueItemSize(pTask->outputQueue->queue);
|
int32_t numOfElems = taosQueueItemSize(pTask->outputQueue->queue);
|
||||||
if (numOfElems > 0) {
|
if (numOfElems > 0) {
|
||||||
qDebug("s-task:%s try to dispatch intermediate result block to downstream, elem in outputQ:%d", pTask->id.idStr,
|
qDebug("s-task:%s try to dispatch intermediate result block to downstream, elem in outputQ:%d", pTask->id.idStr,
|
||||||
|
@ -516,6 +538,7 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ASSERT(pTask->msgInfo.pData == NULL);
|
||||||
qDebug("s-task:%s start to dispatch msg, set output status:%d", pTask->id.idStr, pTask->outputStatus);
|
qDebug("s-task:%s start to dispatch msg, set output status:%d", pTask->id.idStr, pTask->outputStatus);
|
||||||
|
|
||||||
SStreamDataBlock* pBlock = streamQueueNextItem(pTask->outputQueue);
|
SStreamDataBlock* pBlock = streamQueueNextItem(pTask->outputQueue);
|
||||||
|
@ -525,16 +548,28 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pTask->msgInfo.pData = pBlock;
|
||||||
ASSERT(pBlock->type == STREAM_INPUT__DATA_BLOCK);
|
ASSERT(pBlock->type == STREAM_INPUT__DATA_BLOCK);
|
||||||
|
|
||||||
int32_t code = streamDispatchAllBlocks(pTask, pBlock);
|
int32_t retryCount = 0;
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
|
||||||
streamQueueProcessFail(pTask->outputQueue);
|
while (1) {
|
||||||
atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL);
|
int32_t code = streamDispatchAllBlocks(pTask, pBlock);
|
||||||
qDebug("s-task:%s failed to dispatch msg to downstream, output status:%d", pTask->id.idStr, pTask->outputStatus);
|
if (code == TSDB_CODE_SUCCESS) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
qDebug("s-task:%s failed to dispatch msg to downstream, code:%s, output status:%d, retry cnt:%d", pTask->id.idStr,
|
||||||
|
tstrerror(code), pTask->outputStatus, retryCount);
|
||||||
|
|
||||||
|
if (++retryCount > 5) { // add to timer to retry
|
||||||
|
qDebug("s-task:%s failed to dispatch msg to downstream for %d times, code:%s, retry in %dms", pTask->id.idStr,
|
||||||
|
retryCount, tstrerror(code), 300);
|
||||||
|
streamRetryDispatchStreamBlock(pTask, 300);
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// this block can be freed only when it has been pushed to down stream.
|
// this block can not be deleted until it has been sent to downstream task successfully.
|
||||||
destroyStreamDataBlock(pBlock);
|
return TSDB_CODE_SUCCESS;
|
||||||
return code;
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -237,11 +237,6 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) {
|
||||||
taosFreeQitem(qRes);
|
taosFreeQitem(qRes);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
//
|
|
||||||
// if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
|
||||||
// qDebug("s-task:%s scan exec dispatch blocks:%d", pTask->id.idStr, batchCnt);
|
|
||||||
// streamDispatchStreamBlock(pTask);
|
|
||||||
// }
|
|
||||||
|
|
||||||
if (finished) {
|
if (finished) {
|
||||||
break;
|
break;
|
||||||
|
@ -334,7 +329,8 @@ int32_t streamExecForAll(SStreamTask* pTask) {
|
||||||
qDebug("s-task:%s start to extract data block from inputQ", id);
|
qDebug("s-task:%s start to extract data block from inputQ", id);
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
if (streamTaskShouldPause(&pTask->status)) {
|
// downstream task's input queue is blocked, stop immediately
|
||||||
|
if (streamTaskShouldPause(&pTask->status) || (pTask->outputStatus == TASK_INPUT_STATUS__BLOCKED)) {
|
||||||
if (batchSize > 1) {
|
if (batchSize > 1) {
|
||||||
break;
|
break;
|
||||||
} else {
|
} else {
|
||||||
|
@ -399,18 +395,6 @@ int32_t streamExecForAll(SStreamTask* pTask) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
// wait for the task to be ready to go
|
|
||||||
while (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
|
|
||||||
int8_t status = atomic_load_8(&pTask->status.taskStatus);
|
|
||||||
if (status != TASK_STATUS__NORMAL && status != TASK_STATUS__PAUSE) {
|
|
||||||
qError("stream task wait for the end of fill history, s-task:%s, status:%d", id,
|
|
||||||
atomic_load_8(&pTask->status.taskStatus));
|
|
||||||
taosMsleep(100);
|
|
||||||
} else {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
int64_t st = taosGetTimestampMs();
|
int64_t st = taosGetTimestampMs();
|
||||||
qDebug("s-task:%s start to process batch of blocks, num:%d", id, batchSize);
|
qDebug("s-task:%s start to process batch of blocks, num:%d", id, batchSize);
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue