Merge pull request #21396 from taosdata/refact/fillhistory

Refact/fillhistory
This commit is contained in:
Haojun Liao 2023-05-21 01:12:00 +08:00 committed by GitHub
commit 9423001fea
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 419 additions and 261 deletions

View File

@ -372,6 +372,7 @@ typedef struct {
int32_t upstreamChildId; int32_t upstreamChildId;
int32_t upstreamNodeId; int32_t upstreamNodeId;
int32_t blockNum; int32_t blockNum;
int64_t totalLen;
SArray* dataLen; // SArray<int32_t> SArray* dataLen; // SArray<int32_t>
SArray* data; // SArray<SRetrieveTableRsp*> SArray* data; // SArray<SRetrieveTableRsp*>
} SStreamDispatchReq; } SStreamDispatchReq;
@ -527,7 +528,7 @@ void tDeleteStreamDispatchReq(SStreamDispatchReq* pReq);
int32_t streamSetupTrigger(SStreamTask* pTask); int32_t streamSetupTrigger(SStreamTask* pTask);
int32_t streamProcessRunReq(SStreamTask* pTask); int32_t streamProcessRunReq(SStreamTask* pTask);
int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pMsg, bool exec); int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pMsg, bool exec);
int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, int32_t code); int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, int32_t code);
int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pMsg); int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pMsg);
@ -536,7 +537,7 @@ int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, S
void streamTaskInputFail(SStreamTask* pTask); void streamTaskInputFail(SStreamTask* pTask);
int32_t streamTryExec(SStreamTask* pTask); int32_t streamTryExec(SStreamTask* pTask);
int32_t streamSchedExec(SStreamTask* pTask); int32_t streamSchedExec(SStreamTask* pTask);
int32_t streamTaskOutput(SStreamTask* pTask, SStreamDataBlock* pBlock); int32_t streamTaskOutputResultBlock(SStreamTask* pTask, SStreamDataBlock* pBlock);
bool streamTaskShouldStop(const SStreamStatus* pStatus); bool streamTaskShouldStop(const SStreamStatus* pStatus);
bool streamTaskShouldPause(const SStreamStatus* pStatus); bool streamTaskShouldPause(const SStreamStatus* pStatus);

View File

@ -2513,9 +2513,6 @@ int32_t blockEncode(const SSDataBlock* pBlock, char* data, int32_t numOfCols) {
*actualLen = dataLen; *actualLen = dataLen;
*groupId = pBlock->info.id.groupId; *groupId = pBlock->info.id.groupId;
ASSERT(dataLen > 0); ASSERT(dataLen > 0);
uDebug("build data block, actualLen:%d, rows:%d, cols:%d", dataLen, *rows, *cols);
return dataLen; return dataLen;
} }

View File

@ -43,7 +43,7 @@ void sndEnqueueStreamDispatch(SSnode *pSnode, SRpcMsg *pMsg) {
.info = pMsg->info, .info = pMsg->info,
.code = 0, .code = 0,
}; };
streamProcessDispatchReq(pTask, &req, &rsp, false); streamProcessDispatchMsg(pTask, &req, &rsp, false);
streamMetaReleaseTask(pSnode->pMeta, pTask); streamMetaReleaseTask(pSnode->pMeta, pTask);
rpcFreeCont(pMsg->pCont); rpcFreeCont(pMsg->pCont);
taosFreeQitem(pMsg); taosFreeQitem(pMsg);
@ -203,17 +203,13 @@ int32_t sndProcessTaskDispatchReq(SSnode *pSnode, SRpcMsg *pMsg, bool exec) {
SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, taskId); SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, taskId);
if (pTask) { if (pTask) {
SRpcMsg rsp = { SRpcMsg rsp = { .info = pMsg->info, .code = 0 };
.info = pMsg->info, streamProcessDispatchMsg(pTask, &req, &rsp, exec);
.code = 0,
};
streamProcessDispatchReq(pTask, &req, &rsp, exec);
streamMetaReleaseTask(pSnode->pMeta, pTask); streamMetaReleaseTask(pSnode->pMeta, pTask);
return 0; return 0;
} else { } else {
return -1; return -1;
} }
return 0;
} }
int32_t sndProcessTaskRetrieveReq(SSnode *pSnode, SRpcMsg *pMsg) { int32_t sndProcessTaskRetrieveReq(SSnode *pSnode, SRpcMsg *pMsg) {
@ -227,11 +223,9 @@ int32_t sndProcessTaskRetrieveReq(SSnode *pSnode, SRpcMsg *pMsg) {
tDecoderClear(&decoder); tDecoderClear(&decoder);
int32_t taskId = req.dstTaskId; int32_t taskId = req.dstTaskId;
SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, taskId); SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, taskId);
if (pTask) { if (pTask) {
SRpcMsg rsp = { SRpcMsg rsp = { .info = pMsg->info, .code = 0};
.info = pMsg->info,
.code = 0,
};
streamProcessRetrieveReq(pTask, &req, &rsp); streamProcessRetrieveReq(pTask, &req, &rsp);
streamMetaReleaseTask(pSnode->pMeta, pTask); streamMetaReleaseTask(pSnode->pMeta, pTask);
tDeleteStreamRetrieveReq(&req); tDeleteStreamRetrieveReq(&req);

View File

@ -198,6 +198,7 @@ void *tsdbGetIdx(SMeta *pMeta);
void *tsdbGetIvtIdx(SMeta *pMeta); void *tsdbGetIvtIdx(SMeta *pMeta);
uint64_t tsdbGetReaderMaxVersion(STsdbReader *pReader); uint64_t tsdbGetReaderMaxVersion(STsdbReader *pReader);
void tsdbReaderSetCloseFlag(STsdbReader *pReader); void tsdbReaderSetCloseFlag(STsdbReader *pReader);
int64_t tsdbGetLastTimestamp(SVnode* pVnode, void* pTableList, int32_t numOfTables, const char* pIdStr);
int32_t tsdbReuseCacherowsReader(void* pReader, void* pTableIdList, int32_t numOfTables); int32_t tsdbReuseCacherowsReader(void* pReader, void* pTableIdList, int32_t numOfTables);
int32_t tsdbCacherowsReaderOpen(void *pVnode, int32_t type, void *pTableIdList, int32_t numOfTables, int32_t numOfCols, int32_t tsdbCacherowsReaderOpen(void *pVnode, int32_t type, void *pTableIdList, int32_t numOfTables, int32_t numOfCols,

View File

@ -760,6 +760,7 @@ void freePtr(void *ptr) {
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
int32_t vgId = TD_VID(pTq->pVnode); int32_t vgId = TD_VID(pTq->pVnode);
pTask->id.idStr = createStreamTaskIdStr(pTask->id.streamId, pTask->id.taskId); pTask->id.idStr = createStreamTaskIdStr(pTask->id.streamId, pTask->id.taskId);
pTask->refCnt = 1; pTask->refCnt = 1;
pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE; pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE;
@ -837,7 +838,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
} }
if (pTask->taskLevel == TASK_LEVEL__SOURCE) { if (pTask->taskLevel == TASK_LEVEL__SOURCE) {
SWalFilterCond cond = {.deleteMsg = 1}; SWalFilterCond cond = {.deleteMsg = 1}; // delete msg also extract from wal files
pTask->exec.pWalReader = walOpenReader(pTq->pVnode->pWal, &cond); pTask->exec.pWalReader = walOpenReader(pTq->pVnode->pWal, &cond);
} }
@ -852,14 +853,17 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
} }
int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) {
char* msgStr = pMsg->pCont; char* msgStr = pMsg->pCont;
char* msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead)); char* msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
SStreamTaskCheckReq req; SStreamTaskCheckReq req;
SDecoder decoder; SDecoder decoder;
tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen); tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
tDecodeSStreamTaskCheckReq(&decoder, &req); tDecodeSStreamTaskCheckReq(&decoder, &req);
tDecoderClear(&decoder); tDecoderClear(&decoder);
int32_t taskId = req.downstreamTaskId; int32_t taskId = req.downstreamTaskId;
SStreamTaskCheckRsp rsp = { SStreamTaskCheckRsp rsp = {
.reqId = req.reqId, .reqId = req.reqId,
@ -873,18 +877,18 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) {
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId); SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
if (pTask) { if (pTask != NULL) {
rsp.status = streamTaskCheckStatus(pTask); rsp.status = streamTaskCheckStatus(pTask);
streamMetaReleaseTask(pTq->pStreamMeta, pTask); streamMetaReleaseTask(pTq->pStreamMeta, pTask);
tqDebug("tq recv task check req(reqId:0x%" PRIx64 tqDebug("s-task:%s recv task check req(reqId:0x%" PRIx64
") %d at node %d task status:%d, check req from task %d at node %d, rsp status %d", ") %d at node %d task status:%d, check req from task %d at node %d, rsp status %d",
rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, pTask->status.taskStatus, rsp.upstreamTaskId, pTask->id.idStr, rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, pTask->status.taskStatus,
rsp.upstreamNodeId, rsp.status); rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);
} else { } else {
rsp.status = 0; rsp.status = 0;
tqDebug("tq recv task check(taskId:%d not built yet) req(reqId:0x%" PRIx64 tqDebug("tq recv task check(taskId:0x%x not built yet) req(reqId:0x%" PRIx64
") %d at node %d, check req from task %d at node %d, rsp status %d", ") %d at node %d, check req from task:0x%x at node %d, rsp status %d",
taskId, rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.upstreamTaskId, rsp.upstreamNodeId, taskId, rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.upstreamTaskId, rsp.upstreamNodeId,
rsp.status); rsp.status);
} }
@ -892,9 +896,10 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) {
SEncoder encoder; SEncoder encoder;
int32_t code; int32_t code;
int32_t len; int32_t len;
tEncodeSize(tEncodeSStreamTaskCheckRsp, &rsp, len, code); tEncodeSize(tEncodeSStreamTaskCheckRsp, &rsp, len, code);
if (code < 0) { if (code < 0) {
tqError("unable to encode rsp %d", __LINE__); tqError("vgId:%d failed to encode task check rsp, task:0x%x", pTq->pStreamMeta->vgId, taskId);
return -1; return -1;
} }
@ -907,6 +912,7 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) {
tEncoderClear(&encoder); tEncoderClear(&encoder);
SRpcMsg rspMsg = {.code = 0, .pCont = buf, .contLen = sizeof(SMsgHead) + len, .info = pMsg->info}; SRpcMsg rspMsg = {.code = 0, .pCont = buf, .contLen = sizeof(SMsgHead) + len, .info = pMsg->info};
tmsgSendRsp(&rspMsg); tmsgSendRsp(&rspMsg);
return 0; return 0;
} }
@ -918,17 +924,20 @@ int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t sversion, char* msg, int32
SDecoder decoder; SDecoder decoder;
tDecoderInit(&decoder, (uint8_t*)msg, msgLen); tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
code = tDecodeSStreamTaskCheckRsp(&decoder, &rsp); code = tDecodeSStreamTaskCheckRsp(&decoder, &rsp);
if (code < 0) { if (code < 0) {
tDecoderClear(&decoder); tDecoderClear(&decoder);
return -1; return -1;
} }
tDecoderClear(&decoder); tDecoderClear(&decoder);
tqDebug("tq recv task check rsp(reqId:0x%" PRIx64 ") %d at node %d check req from task %d at node %d, status %d", tqDebug("tq recv task check rsp(reqId:0x%" PRIx64 ") %d at node %d check req from task:0x%x at node %d, status %d",
rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status); rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, rsp.upstreamTaskId); SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, rsp.upstreamTaskId);
if (pTask == NULL) { if (pTask == NULL) {
tqError("tq failed to locate the stream task:0x%x vgId:%d, it may have been destroyed", rsp.upstreamTaskId,
pTq->pStreamMeta->vgId);
return -1; return -1;
} }
@ -948,6 +957,8 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms
// 1.deserialize msg and build task // 1.deserialize msg and build task
SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask)); SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
if (pTask == NULL) { if (pTask == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
tqError("vgId:%d failed to create stream task due to out of memory, alloc size:%d", vgId, (int32_t) sizeof(SStreamTask));
return -1; return -1;
} }
@ -965,9 +976,9 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms
// 2.save task, use the newest commit version as the initial start version of stream task. // 2.save task, use the newest commit version as the initial start version of stream task.
taosWLockLatch(&pTq->pStreamMeta->lock); taosWLockLatch(&pTq->pStreamMeta->lock);
code = streamMetaAddDeployedTask(pTq->pStreamMeta, sversion, pTask); code = streamMetaAddDeployedTask(pTq->pStreamMeta, sversion, pTask);
int32_t numOfTasks = streamMetaGetNumOfTasks(pTq->pStreamMeta);
if (code < 0) { if (code < 0) {
tqError("vgId:%d failed to add s-task:%s, total:%d", vgId, pTask->id.idStr, tqError("vgId:%d failed to add s-task:%s, total:%d", vgId, pTask->id.idStr, numOfTasks);
streamMetaGetNumOfTasks(pTq->pStreamMeta));
taosWUnLockLatch(&pTq->pStreamMeta->lock); taosWUnLockLatch(&pTq->pStreamMeta->lock);
return -1; return -1;
} }
@ -980,7 +991,7 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms
} }
tqDebug("vgId:%d s-task:%s is deployed and add meta from mnd, status:%d, total:%d", vgId, pTask->id.idStr, tqDebug("vgId:%d s-task:%s is deployed and add meta from mnd, status:%d, total:%d", vgId, pTask->id.idStr,
pTask->status.taskStatus, streamMetaGetNumOfTasks(pTq->pStreamMeta)); pTask->status.taskStatus, numOfTasks);
return 0; return 0;
} }
@ -1229,15 +1240,17 @@ int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec) {
char* msgStr = pMsg->pCont; char* msgStr = pMsg->pCont;
char* msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead)); char* msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
SStreamDispatchReq req;
SDecoder decoder; SStreamDispatchReq req = {0};
SDecoder decoder;
tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen); tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
tDecodeStreamDispatchReq(&decoder, &req); tDecodeStreamDispatchReq(&decoder, &req);
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.taskId); SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.taskId);
if (pTask) { if (pTask) {
SRpcMsg rsp = {.info = pMsg->info, .code = 0}; SRpcMsg rsp = {.info = pMsg->info, .code = 0};
streamProcessDispatchReq(pTask, &req, &rsp, exec); streamProcessDispatchMsg(pTask, &req, &rsp, exec);
streamMetaReleaseTask(pTq->pStreamMeta, pTask); streamMetaReleaseTask(pTq->pStreamMeta, pTask);
return 0; return 0;
} else { } else {
@ -1356,7 +1369,7 @@ int32_t vnodeEnqueueStreamMsg(SVnode* pVnode, SRpcMsg* pMsg) {
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId); SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
if (pTask) { if (pTask) {
SRpcMsg rsp = {.info = pMsg->info, .code = 0}; SRpcMsg rsp = {.info = pMsg->info, .code = 0};
streamProcessDispatchReq(pTask, &req, &rsp, false); streamProcessDispatchMsg(pTask, &req, &rsp, false);
streamMetaReleaseTask(pTq->pStreamMeta, pTask); streamMetaReleaseTask(pTq->pStreamMeta, pTask);
rpcFreeCont(pMsg->pCont); rpcFreeCont(pMsg->pCont);
taosFreeQitem(pMsg); taosFreeQitem(pMsg);

View File

@ -119,7 +119,7 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) {
int32_t status = pTask->status.taskStatus; int32_t status = pTask->status.taskStatus;
if (pTask->taskLevel != TASK_LEVEL__SOURCE) { if (pTask->taskLevel != TASK_LEVEL__SOURCE) {
tqDebug("s-task:%s level:%d not source task, no need to start", pTask->id.idStr, pTask->taskLevel); // tqTrace("s-task:%s level:%d not source task, no need to start", pTask->id.idStr, pTask->taskLevel);
streamMetaReleaseTask(pStreamMeta, pTask); streamMetaReleaseTask(pStreamMeta, pTask);
continue; continue;
} }
@ -132,7 +132,7 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) {
} }
if (tInputQueueIsFull(pTask)) { if (tInputQueueIsFull(pTask)) {
tqDebug("s-task:%s input queue is full, do nothing", pTask->id.idStr); tqTrace("s-task:%s input queue is full, do nothing", pTask->id.idStr);
streamMetaReleaseTask(pStreamMeta, pTask); streamMetaReleaseTask(pStreamMeta, pTask);
continue; continue;
} }

View File

@ -5506,3 +5506,58 @@ void tsdbReaderSetId(STsdbReader* pReader, const char* idstr) {
} }
void tsdbReaderSetCloseFlag(STsdbReader* pReader) { pReader->code = TSDB_CODE_TSC_QUERY_CANCELLED; } void tsdbReaderSetCloseFlag(STsdbReader* pReader) { pReader->code = TSDB_CODE_TSC_QUERY_CANCELLED; }
/*-------------todo:refactor the implementation of those APIs in this file to seperate the API into two files------*/
// opt perf, do NOT create so many readers
int64_t tsdbGetLastTimestamp(SVnode* pVnode, void* pTableList, int32_t numOfTables, const char* pIdStr) {
SQueryTableDataCond cond = {.type = TIMEWINDOW_RANGE_CONTAINED, .numOfCols = 1, .order = TSDB_ORDER_DESC,
.startVersion = -1, .endVersion = -1};
cond.twindows.skey = INT64_MIN;
cond.twindows.ekey = INT64_MAX;
cond.colList = taosMemoryCalloc(1, sizeof(SColumnInfo));
cond.pSlotList = taosMemoryMalloc(sizeof(int32_t) * cond.numOfCols);
if (cond.colList == NULL || cond.pSlotList == NULL) {
// todo
}
cond.colList[0].colId = 1;
cond.colList[0].slotId = 0;
cond.colList[0].type = TSDB_DATA_TYPE_TIMESTAMP;
cond.pSlotList[0] = 0;
STableKeyInfo* pTableKeyInfo = pTableList;
STsdbReader* pReader = NULL;
SSDataBlock* pBlock = createDataBlock();
SColumnInfoData data = {0};
data.info = (SColumnInfo) {.type = TSDB_DATA_TYPE_TIMESTAMP, .colId = 1, .bytes = TSDB_KEYSIZE};
blockDataAppendColInfo(pBlock, &data);
int64_t key = INT64_MIN;
for(int32_t i = 0; i < numOfTables; ++i) {
int32_t code = tsdbReaderOpen(pVnode, &cond, &pTableKeyInfo[i], 1, pBlock, &pReader, pIdStr, false, NULL);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
bool hasData = false;
code = tsdbNextDataBlock(pReader, &hasData);
if (!hasData || code != TSDB_CODE_SUCCESS) {
continue;
}
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, 0);
int64_t k = *(int64_t*)pCol->pData;
if (key < k) {
key = k;
}
tsdbReaderClose(pReader);
}
return 0;
}

View File

@ -551,9 +551,9 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx)
// start to restore all stream tasks // start to restore all stream tasks
if (tsDisableStream) { if (tsDisableStream) {
vInfo("vgId:%d, not restore stream tasks, since disabled", pVnode->config.vgId); vInfo("vgId:%d, not launch stream tasks, since stream tasks are disabled", pVnode->config.vgId);
} else { } else {
vInfo("vgId:%d start to restore stream tasks", pVnode->config.vgId); vInfo("vgId:%d start to launch stream tasks", pVnode->config.vgId);
tqStartStreamTasks(pVnode->pTq); tqStartStreamTasks(pVnode->pTq);
} }
} }

View File

@ -33,8 +33,12 @@ typedef struct {
static SStreamGlobalEnv streamEnv; static SStreamGlobalEnv streamEnv;
int32_t streamDispatch(SStreamTask* pTask); int32_t streamDispatchStreamBlock(SStreamTask* pTask);
int32_t streamDispatchReqToData(const SStreamDispatchReq* pReq, SStreamDataBlock* pData);
SStreamDataBlock* createStreamDataFromDispatchMsg(const SStreamDispatchReq* pReq, int32_t blockType, int32_t srcVg);
SStreamDataBlock* createStreamBlockFromResults(SStreamQueueItem* pItem, SStreamTask* pTask, int64_t resultSize, SArray* pRes);
void destroyStreamDataBlock(SStreamDataBlock* pBlock);
int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock* pData); int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock* pData);
int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* data); int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* data);

View File

@ -120,39 +120,35 @@ int32_t streamSchedExec(SStreamTask* pTask) {
} }
int32_t streamTaskEnqueueBlocks(SStreamTask* pTask, const SStreamDispatchReq* pReq, SRpcMsg* pRsp) { int32_t streamTaskEnqueueBlocks(SStreamTask* pTask, const SStreamDispatchReq* pReq, SRpcMsg* pRsp) {
SStreamDataBlock* pData = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0); int8_t status = 0;
int8_t status;
// enqueue data block SStreamDataBlock* pBlock = createStreamDataFromDispatchMsg(pReq, STREAM_INPUT__DATA_BLOCK, pReq->dataSrcVgId);
if (pData != NULL) { if (pBlock == NULL) {
pData->type = STREAM_INPUT__DATA_BLOCK; streamTaskInputFail(pTask);
pData->srcVgId = pReq->dataSrcVgId; status = TASK_INPUT_STATUS__FAILED;
// decode qDebug("vgId:%d, s-task:%s failed to receive dispatch msg, reason: out of memory", pTask->pMeta->vgId,
/*pData->blocks = pReq->data;*/ pTask->id.idStr);
/*pBlock->sourceVer = pReq->sourceVer;*/ } else {
streamDispatchReqToData(pReq, pData); if (tAppendDataToInputQueue(pTask, (SStreamQueueItem*)pBlock) == 0) {
if (tAppendDataToInputQueue(pTask, (SStreamQueueItem*)pData) == 0) {
status = TASK_INPUT_STATUS__NORMAL; status = TASK_INPUT_STATUS__NORMAL;
} else { // input queue is full, upstream is blocked now } else { // input queue is full, upstream is blocked now
status = TASK_INPUT_STATUS__BLOCKED; status = TASK_INPUT_STATUS__BLOCKED;
} }
} else {
streamTaskInputFail(pTask);
status = TASK_INPUT_STATUS__FAILED;
} }
// rsp by input status // rsp by input status
void* buf = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamDispatchRsp)); void* buf = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamDispatchRsp));
((SMsgHead*)buf)->vgId = htonl(pReq->upstreamNodeId); ((SMsgHead*)buf)->vgId = htonl(pReq->upstreamNodeId);
SStreamDispatchRsp* pCont = POINTER_SHIFT(buf, sizeof(SMsgHead)); SStreamDispatchRsp* pDispatchRsp = POINTER_SHIFT(buf, sizeof(SMsgHead));
pCont->inputStatus = status;
pCont->streamId = htobe64(pReq->streamId);
pCont->upstreamNodeId = htonl(pReq->upstreamNodeId);
pCont->upstreamTaskId = htonl(pReq->upstreamTaskId);
pCont->downstreamNodeId = htonl(pTask->nodeId);
pCont->downstreamTaskId = htonl(pTask->id.taskId);
pRsp->pCont = buf;
pDispatchRsp->inputStatus = status;
pDispatchRsp->streamId = htobe64(pReq->streamId);
pDispatchRsp->upstreamNodeId = htonl(pReq->upstreamNodeId);
pDispatchRsp->upstreamTaskId = htonl(pReq->upstreamTaskId);
pDispatchRsp->downstreamNodeId = htonl(pTask->nodeId);
pDispatchRsp->downstreamTaskId = htonl(pTask->id.taskId);
pRsp->pCont = buf;
pRsp->contLen = sizeof(SMsgHead) + sizeof(SStreamDispatchRsp); pRsp->contLen = sizeof(SMsgHead) + sizeof(SStreamDispatchRsp);
tmsgSendRsp(pRsp); tmsgSendRsp(pRsp);
@ -165,7 +161,7 @@ int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pReq,
// enqueue // enqueue
if (pData != NULL) { if (pData != NULL) {
qDebug("task %d(child %d) recv retrieve req from task %d, reqId %" PRId64, pTask->id.taskId, pTask->selfChildId, qDebug("s-task:%s (child %d) recv retrieve req from task:0x%x, reqId %" PRId64, pTask->id.idStr, pTask->selfChildId,
pReq->srcTaskId, pReq->reqId); pReq->srcTaskId, pReq->reqId);
pData->type = STREAM_INPUT__DATA_RETRIEVE; pData->type = STREAM_INPUT__DATA_RETRIEVE;
@ -197,30 +193,31 @@ int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pReq,
return status == TASK_INPUT_STATUS__NORMAL ? 0 : -1; return status == TASK_INPUT_STATUS__NORMAL ? 0 : -1;
} }
int32_t streamTaskOutput(SStreamTask* pTask, SStreamDataBlock* pBlock) { // todo add log
int32_t streamTaskOutputResultBlock(SStreamTask* pTask, SStreamDataBlock* pBlock) {
int32_t code = 0; int32_t code = 0;
if (pTask->outputType == TASK_OUTPUT__TABLE) { if (pTask->outputType == TASK_OUTPUT__TABLE) {
pTask->tbSink.tbSinkFunc(pTask, pTask->tbSink.vnode, 0, pBlock->blocks); pTask->tbSink.tbSinkFunc(pTask, pTask->tbSink.vnode, 0, pBlock->blocks);
taosArrayDestroyEx(pBlock->blocks, (FDelete)blockDataFreeRes); destroyStreamDataBlock(pBlock);
taosFreeQitem(pBlock);
} else if (pTask->outputType == TASK_OUTPUT__SMA) { } else if (pTask->outputType == TASK_OUTPUT__SMA) {
pTask->smaSink.smaSink(pTask->smaSink.vnode, pTask->smaSink.smaId, pBlock->blocks); pTask->smaSink.smaSink(pTask->smaSink.vnode, pTask->smaSink.smaId, pBlock->blocks);
taosArrayDestroyEx(pBlock->blocks, (FDelete)blockDataFreeRes); destroyStreamDataBlock(pBlock);
taosFreeQitem(pBlock);
} else { } else {
ASSERT(pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH); ASSERT(pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH);
code = taosWriteQitem(pTask->outputQueue->queue, pBlock); code = taosWriteQitem(pTask->outputQueue->queue, pBlock);
if (code != 0) { if (code != 0) { // todo failed to add it into the output queue, free it.
return code; return code;
} }
streamDispatch(pTask);
streamDispatchStreamBlock(pTask);
} }
return 0; return 0;
} }
int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pRsp, bool exec) { int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pRsp, bool exec) {
qDebug("s-task:%s receive dispatch msg from taskId:%d(vgId:%d)", pTask->id.idStr, pReq->upstreamTaskId, qDebug("s-task:%s receive dispatch msg from taskId:0x%x(vgId:%d), msgLen:%" PRId64, pTask->id.idStr,
pReq->upstreamNodeId); pReq->upstreamTaskId, pReq->upstreamNodeId, pReq->totalLen);
// todo add the input queue buffer limitation // todo add the input queue buffer limitation
streamTaskEnqueueBlocks(pTask, pReq, pRsp); streamTaskEnqueueBlocks(pTask, pReq, pRsp);
@ -257,8 +254,8 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
return 0; return 0;
} }
// continue dispatch // continue dispatch one block to down stream in pipeline
streamDispatch(pTask); streamDispatchStreamBlock(pTask);
return 0; return 0;
} }
@ -268,13 +265,13 @@ int32_t streamProcessRunReq(SStreamTask* pTask) {
} }
/*if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {*/ /*if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {*/
/*streamDispatch(pTask);*/ /*streamDispatchStreamBlock(pTask);*/
/*}*/ /*}*/
return 0; return 0;
} }
int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pRsp) { int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pRsp) {
qDebug("s-task:%s receive retrieve req from node %d taskId:%d", pTask->id.idStr, pReq->srcNodeId, pReq->srcTaskId); qDebug("s-task:%s receive retrieve req from node %d taskId:0x%x", pTask->id.idStr, pReq->srcNodeId, pReq->srcTaskId);
streamTaskEnqueueRetrieve(pTask, pReq, pRsp); streamTaskEnqueueRetrieve(pTask, pReq, pRsp);
ASSERT(pTask->taskLevel != TASK_LEVEL__SINK); ASSERT(pTask->taskLevel != TASK_LEVEL__SINK);
@ -294,16 +291,13 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) {
double size = QUEUE_MEM_SIZE_IN_MB(pTask->inputQueue->queue); double size = QUEUE_MEM_SIZE_IN_MB(pTask->inputQueue->queue);
if (type == STREAM_INPUT__DATA_SUBMIT) { if (type == STREAM_INPUT__DATA_SUBMIT) {
int32_t numOfBlocks = taosQueueItemSize(pTask->inputQueue->queue) + 1;
double size = QUEUE_MEM_SIZE_IN_MB(pTask->inputQueue->queue);
SStreamDataSubmit* px = (SStreamDataSubmit*)pItem; SStreamDataSubmit* px = (SStreamDataSubmit*)pItem;
qDebug("s-task:%s submit enqueue msgLen:%d ver:%" PRId64 ", total in queue:%d, size:%.2fMiB", pTask->id.idStr, qDebug("s-task:%s submit enqueue msgLen:%d ver:%" PRId64 ", total in queue:%d, size:%.2fMiB", pTask->id.idStr,
px->submit.msgLen, px->submit.ver, numOfBlocks, size); px->submit.msgLen, px->submit.ver, total, size);
if ((pTask->taskLevel == TASK_LEVEL__SOURCE) && tInputQueueIsFull(pTask)) { if ((pTask->taskLevel == TASK_LEVEL__SOURCE) && tInputQueueIsFull(pTask)) {
qError("s-task:%s input queue is full, capacity(size:%d num:%dMiB), current(blocks:%d, size:%.2fMiB) abort", qError("s-task:%s input queue is full, capacity(size:%d num:%dMiB), current(blocks:%d, size:%.2fMiB) abort",
pTask->id.idStr, STREAM_TASK_INPUT_QUEUEU_CAPACITY, STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE, numOfBlocks, pTask->id.idStr, STREAM_TASK_INPUT_QUEUEU_CAPACITY, STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE, total,
size); size);
streamDataSubmitDestroy(px); streamDataSubmitDestroy(px);
taosFreeQitem(pItem); taosFreeQitem(pItem);
@ -312,22 +306,20 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) {
taosWriteQitem(pTask->inputQueue->queue, pItem); taosWriteQitem(pTask->inputQueue->queue, pItem);
} else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE || } else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE ||
type == STREAM_INPUT__REF_DATA_BLOCK) { type == STREAM_INPUT__REF_DATA_BLOCK) {
int32_t numOfBlocks = taosQueueItemSize(pTask->inputQueue->queue) + 1;
double size = QUEUE_MEM_SIZE_IN_MB(pTask->inputQueue->queue);
if ((pTask->taskLevel == TASK_LEVEL__SOURCE) && (tInputQueueIsFull(pTask))) { if ((pTask->taskLevel == TASK_LEVEL__SOURCE) && (tInputQueueIsFull(pTask))) {
qError("s-task:%s input queue is full, capacity:%d size:%d MiB, current(blocks:%d, size:%.2fMiB) abort", qError("s-task:%s input queue is full, capacity:%d size:%d MiB, current(blocks:%d, size:%.2fMiB) abort",
pTask->id.idStr, STREAM_TASK_INPUT_QUEUEU_CAPACITY, STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE, numOfBlocks, pTask->id.idStr, STREAM_TASK_INPUT_QUEUEU_CAPACITY, STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE, total,
size); size);
return -1; return -1;
} }
qDebug("s-task:%s data block enqueue, total in queue:%d", pTask->id.idStr, numOfBlocks); qDebug("s-task:%s data block enqueue, current(blocks:%d, size:%.2fMiB)", pTask->id.idStr, total, size);
taosWriteQitem(pTask->inputQueue->queue, pItem); taosWriteQitem(pTask->inputQueue->queue, pItem);
} else if (type == STREAM_INPUT__CHECKPOINT) { } else if (type == STREAM_INPUT__CHECKPOINT) {
taosWriteQitem(pTask->inputQueue->queue, pItem); taosWriteQitem(pTask->inputQueue->queue, pItem);
} else if (type == STREAM_INPUT__GET_RES) { } else if (type == STREAM_INPUT__GET_RES) {
taosWriteQitem(pTask->inputQueue->queue, pItem); taosWriteQitem(pTask->inputQueue->queue, pItem);
qDebug("s-task:%s data res enqueue, current(blocks:%d, size:%.2fMiB)", pTask->id.idStr, total, size);
} }
if (type != STREAM_INPUT__GET_RES && type != STREAM_INPUT__CHECKPOINT && pTask->triggerParam != 0) { if (type != STREAM_INPUT__GET_RES && type != STREAM_INPUT__CHECKPOINT && pTask->triggerParam != 0) {

View File

@ -15,20 +15,28 @@
#include "streamInc.h" #include "streamInc.h"
int32_t streamDispatchReqToData(const SStreamDispatchReq* pReq, SStreamDataBlock* pData) { SStreamDataBlock* createStreamDataFromDispatchMsg(const SStreamDispatchReq* pReq, int32_t blockType, int32_t srcVg) {
SStreamDataBlock* pData = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, pReq->totalLen);
if (pData == NULL) {
return NULL;
}
pData->type = blockType;
pData->srcVgId = srcVg;
int32_t blockNum = pReq->blockNum; int32_t blockNum = pReq->blockNum;
SArray* pArray = taosArrayInit_s(sizeof(SSDataBlock), blockNum); SArray* pArray = taosArrayInit_s(sizeof(SSDataBlock), blockNum);
if (pArray == NULL) { if (pArray == NULL) {
return -1; return NULL;
} }
ASSERT(pReq->blockNum == taosArrayGetSize(pReq->data)); ASSERT((pReq->blockNum == taosArrayGetSize(pReq->data)) && (pReq->blockNum == taosArrayGetSize(pReq->dataLen)));
ASSERT(pReq->blockNum == taosArrayGetSize(pReq->dataLen));
for (int32_t i = 0; i < blockNum; i++) { for (int32_t i = 0; i < blockNum; i++) {
SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*) taosArrayGetP(pReq->data, i); SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*) taosArrayGetP(pReq->data, i);
SSDataBlock* pDataBlock = taosArrayGet(pArray, i); SSDataBlock* pDataBlock = taosArrayGet(pArray, i);
blockDecode(pDataBlock, pRetrieve->data); blockDecode(pDataBlock, pRetrieve->data);
// TODO: refactor // TODO: refactor
pDataBlock->info.window.skey = be64toh(pRetrieve->skey); pDataBlock->info.window.skey = be64toh(pRetrieve->skey);
pDataBlock->info.window.ekey = be64toh(pRetrieve->ekey); pDataBlock->info.window.ekey = be64toh(pRetrieve->ekey);
@ -39,8 +47,41 @@ int32_t streamDispatchReqToData(const SStreamDispatchReq* pReq, SStreamDataBlock
pDataBlock->info.type = pRetrieve->streamBlockType; pDataBlock->info.type = pRetrieve->streamBlockType;
pDataBlock->info.childId = pReq->upstreamChildId; pDataBlock->info.childId = pReq->upstreamChildId;
} }
pData->blocks = pArray; pData->blocks = pArray;
return 0; return pData;
}
SStreamDataBlock* createStreamBlockFromResults(SStreamQueueItem* pItem, SStreamTask* pTask, int64_t resultSize, SArray* pRes) {
SStreamDataBlock* pStreamBlocks = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, resultSize);
if (pStreamBlocks == NULL) {
taosArrayClearEx(pRes, (FDelete)blockDataFreeRes);
return NULL;
}
pStreamBlocks->type = STREAM_INPUT__DATA_BLOCK;
pStreamBlocks->blocks = pRes;
if (pItem->type == STREAM_INPUT__DATA_SUBMIT) {
SStreamDataSubmit* pSubmit = (SStreamDataSubmit*)pItem;
pStreamBlocks->childId = pTask->selfChildId;
pStreamBlocks->sourceVer = pSubmit->ver;
} else if (pItem->type == STREAM_INPUT__MERGED_SUBMIT) {
SStreamMergedSubmit* pMerged = (SStreamMergedSubmit*)pItem;
pStreamBlocks->childId = pTask->selfChildId;
pStreamBlocks->sourceVer = pMerged->ver;
}
return pStreamBlocks;
}
void destroyStreamDataBlock(SStreamDataBlock* pBlock) {
if (pBlock == NULL) {
return;
}
taosArrayDestroyEx(pBlock->blocks, (FDelete)blockDataFreeRes);
taosFreeQitem(pBlock);
} }
int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock* pData) { int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock* pData) {
@ -184,11 +225,13 @@ void streamFreeQitem(SStreamQueueItem* data) {
taosFreeQitem(data); taosFreeQitem(data);
} else if (type == STREAM_INPUT__MERGED_SUBMIT) { } else if (type == STREAM_INPUT__MERGED_SUBMIT) {
SStreamMergedSubmit* pMerge = (SStreamMergedSubmit*)data; SStreamMergedSubmit* pMerge = (SStreamMergedSubmit*)data;
int32_t sz = taosArrayGetSize(pMerge->submits);
int32_t sz = taosArrayGetSize(pMerge->submits);
for (int32_t i = 0; i < sz; i++) { for (int32_t i = 0; i < sz; i++) {
int32_t* pRef = taosArrayGetP(pMerge->dataRefs, i); int32_t* pRef = taosArrayGetP(pMerge->dataRefs, i);
int32_t ref = atomic_sub_fetch_32(pRef, 1); int32_t ref = atomic_sub_fetch_32(pRef, 1);
ASSERT(ref >= 0); ASSERT(ref >= 0);
if (ref == 0) { if (ref == 0) {
SPackedData* pSubmit = (SPackedData*)taosArrayGet(pMerge->submits, i); SPackedData* pSubmit = (SPackedData*)taosArrayGet(pMerge->submits, i);
taosMemoryFree(pSubmit->msgStr); taosMemoryFree(pSubmit->msgStr);

View File

@ -24,6 +24,7 @@ int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatchReq* p
if (tEncodeI32(pEncoder, pReq->upstreamChildId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->upstreamChildId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->upstreamNodeId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->upstreamNodeId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->blockNum) < 0) return -1; if (tEncodeI32(pEncoder, pReq->blockNum) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->totalLen) < 0) return -1;
ASSERT(taosArrayGetSize(pReq->data) == pReq->blockNum); ASSERT(taosArrayGetSize(pReq->data) == pReq->blockNum);
ASSERT(taosArrayGetSize(pReq->dataLen) == pReq->blockNum); ASSERT(taosArrayGetSize(pReq->dataLen) == pReq->blockNum);
for (int32_t i = 0; i < pReq->blockNum; i++) { for (int32_t i = 0; i < pReq->blockNum; i++) {
@ -45,6 +46,8 @@ int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq) {
if (tDecodeI32(pDecoder, &pReq->upstreamChildId) < 0) return -1; if (tDecodeI32(pDecoder, &pReq->upstreamChildId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->upstreamNodeId) < 0) return -1; if (tDecodeI32(pDecoder, &pReq->upstreamNodeId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->blockNum) < 0) return -1; if (tDecodeI32(pDecoder, &pReq->blockNum) < 0) return -1;
if (tDecodeI64(pDecoder, &pReq->totalLen) < 0) return -1;
ASSERT(pReq->blockNum > 0); ASSERT(pReq->blockNum > 0);
pReq->data = taosArrayInit(pReq->blockNum, sizeof(void*)); pReq->data = taosArrayInit(pReq->blockNum, sizeof(void*));
pReq->dataLen = taosArrayInit(pReq->blockNum, sizeof(int32_t)); pReq->dataLen = taosArrayInit(pReq->blockNum, sizeof(int32_t));
@ -135,7 +138,6 @@ int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock)
SStreamChildEpInfo* pEpInfo = taosArrayGetP(pTask->childEpInfo, i); SStreamChildEpInfo* pEpInfo = taosArrayGetP(pTask->childEpInfo, i);
req.dstNodeId = pEpInfo->nodeId; req.dstNodeId = pEpInfo->nodeId;
req.dstTaskId = pEpInfo->taskId; req.dstTaskId = pEpInfo->taskId;
int32_t code;
int32_t len; int32_t len;
tEncodeSize(tEncodeStreamRetrieveReq, &req, len, code); tEncodeSize(tEncodeStreamRetrieveReq, &req, len, code);
if (code < 0) { if (code < 0) {
@ -155,30 +157,25 @@ int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock)
tEncodeStreamRetrieveReq(&encoder, &req); tEncodeStreamRetrieveReq(&encoder, &req);
tEncoderClear(&encoder); tEncoderClear(&encoder);
SRpcMsg rpcMsg = { SRpcMsg rpcMsg = { .code = 0, .msgType = TDMT_STREAM_RETRIEVE, .pCont = buf, .contLen = sizeof(SMsgHead) + len };
.code = 0,
.msgType = TDMT_STREAM_RETRIEVE,
.pCont = buf,
.contLen = sizeof(SMsgHead) + len,
};
if (tmsgSendReq(&pEpInfo->epSet, &rpcMsg) < 0) { if (tmsgSendReq(&pEpInfo->epSet, &rpcMsg) < 0) {
ASSERT(0); ASSERT(0);
goto CLEAR; goto CLEAR;
} }
buf = NULL;
qDebug("s-task:%s (child %d) send retrieve req to task %d at node %d, reqId %" PRId64, pTask->id.idStr, buf = NULL;
qDebug("s-task:%s (child %d) send retrieve req to task %d at node %d, reqId:0x%" PRIx64, pTask->id.idStr,
pTask->selfChildId, pEpInfo->taskId, pEpInfo->nodeId, req.reqId); pTask->selfChildId, pEpInfo->taskId, pEpInfo->nodeId, req.reqId);
} }
code = 0; code = 0;
CLEAR: CLEAR:
taosMemoryFree(pRetrieve); taosMemoryFree(pRetrieve);
rpcFreeCont(buf); rpcFreeCont(buf);
return code; return code;
} }
static int32_t streamAddBlockToDispatchMsg(const SSDataBlock* pBlock, SStreamDispatchReq* pReq) { static int32_t streamAddBlockIntoDispatchMsg(const SSDataBlock* pBlock, SStreamDispatchReq* pReq) {
int32_t dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock); int32_t dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock);
void* buf = taosMemoryCalloc(1, dataStrLen); void* buf = taosMemoryCalloc(1, dataStrLen);
if (buf == NULL) return -1; if (buf == NULL) return -1;
@ -205,6 +202,7 @@ static int32_t streamAddBlockToDispatchMsg(const SSDataBlock* pBlock, SStreamDis
taosArrayPush(pReq->dataLen, &actualLen); taosArrayPush(pReq->dataLen, &actualLen);
taosArrayPush(pReq->data, &buf); taosArrayPush(pReq->data, &buf);
pReq->totalLen += dataStrLen;
return 0; return 0;
} }
@ -291,7 +289,7 @@ int32_t streamDispatchOneRecoverFinishReq(SStreamTask* pTask, const SStreamRecov
return 0; return 0;
} }
int32_t streamDispatchOneDataReq(SStreamTask* pTask, const SStreamDispatchReq* pReq, int32_t vgId, SEpSet* pEpSet) { int32_t doSendDispatchMsg(SStreamTask* pTask, const SStreamDispatchReq* pReq, int32_t vgId, SEpSet* pEpSet) {
void* buf = NULL; void* buf = NULL;
int32_t code = -1; int32_t code = -1;
SRpcMsg msg = {0}; SRpcMsg msg = {0};
@ -320,11 +318,12 @@ int32_t streamDispatchOneDataReq(SStreamTask* pTask, const SStreamDispatchReq* p
msg.pCont = buf; msg.pCont = buf;
msg.msgType = pTask->dispatchMsgType; msg.msgType = pTask->dispatchMsgType;
qDebug("dispatch from s-task:%s to taskId:%d vgId:%d data msg", pTask->id.idStr, pReq->taskId, vgId); qDebug("dispatch from s-task:%s to taskId:0x%x vgId:%d data msg", pTask->id.idStr, pReq->taskId, vgId);
tmsgSendReq(pEpSet, &msg); tmsgSendReq(pEpSet, &msg);
code = 0; code = 0;
return 0; return 0;
FAIL: FAIL:
if (buf) rpcFreeCont(buf); if (buf) rpcFreeCont(buf);
return code; return code;
@ -360,7 +359,7 @@ int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, S
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, j); SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, j);
ASSERT(pVgInfo->vgId > 0); ASSERT(pVgInfo->vgId > 0);
if (hashValue >= pVgInfo->hashBegin && hashValue <= pVgInfo->hashEnd) { if (hashValue >= pVgInfo->hashBegin && hashValue <= pVgInfo->hashEnd) {
if (streamAddBlockToDispatchMsg(pDataBlock, &pReqs[j]) < 0) { if (streamAddBlockIntoDispatchMsg(pDataBlock, &pReqs[j]) < 0) {
return -1; return -1;
} }
if (pReqs[j].blockNum == 0) { if (pReqs[j].blockNum == 0) {
@ -376,9 +375,9 @@ int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, S
} }
int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pData) { int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pData) {
int32_t code = -1; int32_t code = 0;
int32_t blockNum = taosArrayGetSize(pData->blocks); int32_t numOfBlocks = taosArrayGetSize(pData->blocks);
ASSERT(blockNum != 0); ASSERT(numOfBlocks != 0);
if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) { if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) {
SStreamDispatchReq req = { SStreamDispatchReq req = {
@ -387,19 +386,25 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat
.upstreamTaskId = pTask->id.taskId, .upstreamTaskId = pTask->id.taskId,
.upstreamChildId = pTask->selfChildId, .upstreamChildId = pTask->selfChildId,
.upstreamNodeId = pTask->nodeId, .upstreamNodeId = pTask->nodeId,
.blockNum = blockNum, .blockNum = numOfBlocks,
}; };
req.data = taosArrayInit(blockNum, sizeof(void*)); req.data = taosArrayInit(numOfBlocks, sizeof(void*));
req.dataLen = taosArrayInit(blockNum, sizeof(int32_t)); req.dataLen = taosArrayInit(numOfBlocks, sizeof(int32_t));
if (req.data == NULL || req.dataLen == NULL) { if (req.data == NULL || req.dataLen == NULL) {
goto FAIL_FIXED_DISPATCH; taosArrayDestroyP(req.data, taosMemoryFree);
taosArrayDestroy(req.dataLen);
return TSDB_CODE_OUT_OF_MEMORY;
} }
for (int32_t i = 0; i < blockNum; i++) { for (int32_t i = 0; i < numOfBlocks; i++) {
SSDataBlock* pDataBlock = taosArrayGet(pData->blocks, i); SSDataBlock* pDataBlock = taosArrayGet(pData->blocks, i);
if (streamAddBlockToDispatchMsg(pDataBlock, &req) < 0) { code = streamAddBlockIntoDispatchMsg(pDataBlock, &req);
goto FAIL_FIXED_DISPATCH;
if (code != TSDB_CODE_SUCCESS) {
taosArrayDestroyP(req.data, taosMemoryFree);
taosArrayDestroy(req.dataLen);
return code;
} }
} }
@ -410,19 +415,12 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat
req.taskId = downstreamTaskId; req.taskId = downstreamTaskId;
qDebug("s-task:%s (child taskId:%d) fix-dispatch blocks:%d to down stream s-task:%d in vgId:%d", pTask->id.idStr, qDebug("s-task:%s (child taskId:%d) fix-dispatch blocks:%d to down stream s-task:%d in vgId:%d", pTask->id.idStr,
pTask->selfChildId, blockNum, downstreamTaskId, vgId); pTask->selfChildId, numOfBlocks, downstreamTaskId, vgId);
if (streamDispatchOneDataReq(pTask, &req, vgId, pEpSet) < 0) { code = doSendDispatchMsg(pTask, &req, vgId, pEpSet);
goto FAIL_FIXED_DISPATCH;
}
code = 0;
FAIL_FIXED_DISPATCH:
taosArrayDestroyP(req.data, taosMemoryFree); taosArrayDestroyP(req.data, taosMemoryFree);
taosArrayDestroy(req.dataLen); taosArrayDestroy(req.dataLen);
return code; return code;
} else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { } else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
int32_t rspCnt = atomic_load_32(&pTask->shuffleDispatcher.waitingRspCnt); int32_t rspCnt = atomic_load_32(&pTask->shuffleDispatcher.waitingRspCnt);
ASSERT(rspCnt == 0); ASSERT(rspCnt == 0);
@ -452,13 +450,13 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat
pReqs[i].taskId = pVgInfo->taskId; pReqs[i].taskId = pVgInfo->taskId;
} }
for (int32_t i = 0; i < blockNum; i++) { for (int32_t i = 0; i < numOfBlocks; i++) {
SSDataBlock* pDataBlock = taosArrayGet(pData->blocks, i); SSDataBlock* pDataBlock = taosArrayGet(pData->blocks, i);
// TODO: do not use broadcast // TODO: do not use broadcast
if (pDataBlock->info.type == STREAM_DELETE_RESULT) { if (pDataBlock->info.type == STREAM_DELETE_RESULT) {
for (int32_t j = 0; j < vgSz; j++) { for (int32_t j = 0; j < vgSz; j++) {
if (streamAddBlockToDispatchMsg(pDataBlock, &pReqs[j]) < 0) { if (streamAddBlockIntoDispatchMsg(pDataBlock, &pReqs[j]) < 0) {
goto FAIL_SHUFFLE_DISPATCH; goto FAIL_SHUFFLE_DISPATCH;
} }
if (pReqs[j].blockNum == 0) { if (pReqs[j].blockNum == 0) {
@ -475,7 +473,7 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat
} }
qDebug("s-task:%s (child taskId:%d) shuffle-dispatch blocks:%d to %d vgroups", pTask->id.idStr, pTask->selfChildId, qDebug("s-task:%s (child taskId:%d) shuffle-dispatch blocks:%d to %d vgroups", pTask->id.idStr, pTask->selfChildId,
blockNum, vgSz); numOfBlocks, vgSz);
for (int32_t i = 0; i < vgSz; i++) { for (int32_t i = 0; i < vgSz; i++) {
if (pReqs[i].blockNum > 0) { if (pReqs[i].blockNum > 0) {
@ -483,7 +481,7 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat
qDebug("s-task:%s (child taskId:%d) shuffle-dispatch blocks:%d to vgId:%d", pTask->id.idStr, pTask->selfChildId, qDebug("s-task:%s (child taskId:%d) shuffle-dispatch blocks:%d to vgId:%d", pTask->id.idStr, pTask->selfChildId,
pReqs[i].blockNum, pVgInfo->vgId); pReqs[i].blockNum, pVgInfo->vgId);
if (streamDispatchOneDataReq(pTask, &pReqs[i], pVgInfo->vgId, &pVgInfo->epSet) < 0) { if (doSendDispatchMsg(pTask, &pReqs[i], pVgInfo->vgId, &pVgInfo->epSet) < 0) {
goto FAIL_SHUFFLE_DISPATCH; goto FAIL_SHUFFLE_DISPATCH;
} }
} }
@ -501,9 +499,8 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat
return code; return code;
} }
int32_t streamDispatch(SStreamTask* pTask) { int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
ASSERT(pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH); ASSERT(pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH);
int32_t numOfElems = taosQueueItemSize(pTask->outputQueue->queue); int32_t numOfElems = taosQueueItemSize(pTask->outputQueue->queue);
if (numOfElems > 0) { if (numOfElems > 0) {
qDebug("s-task:%s try to dispatch intermediate result block to downstream, elem in outputQ:%d", pTask->id.idStr, qDebug("s-task:%s try to dispatch intermediate result block to downstream, elem in outputQ:%d", pTask->id.idStr,
@ -517,23 +514,22 @@ int32_t streamDispatch(SStreamTask* pTask) {
return 0; return 0;
} }
SStreamDataBlock* pBlock = streamQueueNextItem(pTask->outputQueue); SStreamDataBlock* pDispatchedBlock = streamQueueNextItem(pTask->outputQueue);
if (pBlock == NULL) { if (pDispatchedBlock == NULL) {
qDebug("s-task:%s stop dispatching since no output in output queue", pTask->id.idStr); qDebug("s-task:%s stop dispatching since no output in output queue", pTask->id.idStr);
atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL); atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL);
return 0; return 0;
} }
ASSERT(pBlock->type == STREAM_INPUT__DATA_BLOCK); ASSERT(pDispatchedBlock->type == STREAM_INPUT__DATA_BLOCK);
int32_t code = 0; int32_t code = streamDispatchAllBlocks(pTask, pDispatchedBlock);
if (streamDispatchAllBlocks(pTask, pBlock) < 0) { if (code != TSDB_CODE_SUCCESS) {
code = -1;
streamQueueProcessFail(pTask->outputQueue); streamQueueProcessFail(pTask->outputQueue);
atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL); atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL);
} }
taosArrayDestroyEx(pBlock->blocks, (FDelete)blockDataFreeRes); // this block can be freed only when it has been pushed to down stream.
taosFreeQitem(pBlock); destroyStreamDataBlock(pDispatchedBlock);
return code; return code;
} }

View File

@ -18,6 +18,9 @@
// maximum allowed processed block batches. One block may include several submit blocks // maximum allowed processed block batches. One block may include several submit blocks
#define MAX_STREAM_EXEC_BATCH_NUM 128 #define MAX_STREAM_EXEC_BATCH_NUM 128
#define MIN_STREAM_EXEC_BATCH_NUM 16 #define MIN_STREAM_EXEC_BATCH_NUM 16
#define MAX_STREAM_RESULT_DUMP_THRESHOLD 1000
static int32_t updateCheckPointInfo (SStreamTask* pTask);
bool streamTaskShouldStop(const SStreamStatus* pStatus) { bool streamTaskShouldStop(const SStreamStatus* pStatus) {
int32_t status = atomic_load_8((int8_t*)&pStatus->taskStatus); int32_t status = atomic_load_8((int8_t*)&pStatus->taskStatus);
@ -29,56 +32,57 @@ bool streamTaskShouldPause(const SStreamStatus* pStatus) {
return (status == TASK_STATUS__PAUSE); return (status == TASK_STATUS__PAUSE);
} }
static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray* pRes) { static int32_t doDumpResult(SStreamTask* pTask, SStreamQueueItem* pItem, SArray* pRes, int32_t size, int64_t* totalSize,
int32_t* totalBlocks) {
int32_t code = updateCheckPointInfo(pTask);
if (code != TSDB_CODE_SUCCESS) {
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
return code;
}
int32_t numOfBlocks = taosArrayGetSize(pRes);
if (numOfBlocks > 0) {
SStreamDataBlock* pStreamBlocks = createStreamBlockFromResults(pItem, pTask, size, pRes);
if (pStreamBlocks == NULL) {
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
return -1;
}
qDebug("s-task:%s dump stream result data blocks, num:%d, size:%.2fMiB", pTask->id.idStr, numOfBlocks, size/1048576.0);
code = streamTaskOutputResultBlock(pTask, pStreamBlocks);
if (code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY) { // back pressure and record position
destroyStreamDataBlock(pStreamBlocks);
return -1;
}
*totalSize += size;
*totalBlocks += numOfBlocks;
} else {
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
}
return TSDB_CODE_SUCCESS;
}
static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* totalSize, int32_t* totalBlocks) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
void* pExecutor = pTask->exec.pExecutor; void* pExecutor = pTask->exec.pExecutor;
while (pTask->taskLevel == TASK_LEVEL__SOURCE) { *totalBlocks = 0;
int8_t status = atomic_load_8(&pTask->status.taskStatus); *totalSize = 0;
if (status != TASK_STATUS__NORMAL && status != TASK_STATUS__PAUSE) {
qError("stream task wait for the end of fill history, s-task:%s, status:%d", pTask->id.idStr,
atomic_load_8(&pTask->status.taskStatus));
taosMsleep(2);
} else {
break;
}
}
// set input int32_t size = 0;
const SStreamQueueItem* pItem = (const SStreamQueueItem*)data; int32_t numOfBlocks = 0;
if (pItem->type == STREAM_INPUT__GET_RES) { SArray* pRes = NULL;
const SStreamTrigger* pTrigger = (const SStreamTrigger*)data;
qSetMultiStreamInput(pExecutor, pTrigger->pBlock, 1, STREAM_INPUT__DATA_BLOCK);
} else if (pItem->type == STREAM_INPUT__DATA_SUBMIT) {
ASSERT(pTask->taskLevel == TASK_LEVEL__SOURCE);
const SStreamDataSubmit* pSubmit = (const SStreamDataSubmit*)data;
qSetMultiStreamInput(pExecutor, &pSubmit->submit, 1, STREAM_INPUT__DATA_SUBMIT);
qDebug("s-task:%s set submit blocks as source block completed, %p %p len:%d ver:%" PRId64, pTask->id.idStr, pSubmit,
pSubmit->submit.msgStr, pSubmit->submit.msgLen, pSubmit->submit.ver);
} else if (pItem->type == STREAM_INPUT__DATA_BLOCK || pItem->type == STREAM_INPUT__DATA_RETRIEVE) {
const SStreamDataBlock* pBlock = (const SStreamDataBlock*)data;
SArray* pBlockList = pBlock->blocks;
int32_t numOfBlocks = taosArrayGetSize(pBlockList);
qDebug("s-task:%s set sdata blocks as input num:%d, ver:%" PRId64, pTask->id.idStr, numOfBlocks, pBlock->sourceVer);
qSetMultiStreamInput(pExecutor, pBlockList->pData, numOfBlocks, STREAM_INPUT__DATA_BLOCK);
} else if (pItem->type == STREAM_INPUT__MERGED_SUBMIT) {
const SStreamMergedSubmit* pMerged = (const SStreamMergedSubmit*)data;
SArray* pBlockList = pMerged->submits;
int32_t numOfBlocks = taosArrayGetSize(pBlockList);
qDebug("s-task:%s %p set submit input (merged), batch num:%d", pTask->id.idStr, pTask, numOfBlocks);
qSetMultiStreamInput(pExecutor, pBlockList->pData, numOfBlocks, STREAM_INPUT__MERGED_SUBMIT);
} else if (pItem->type == STREAM_INPUT__REF_DATA_BLOCK) {
const SStreamRefDataBlock* pRefBlock = (const SStreamRefDataBlock*)data;
qSetMultiStreamInput(pExecutor, pRefBlock->pBlock, 1, STREAM_INPUT__DATA_BLOCK);
} else {
ASSERT(0);
}
// pExecutor
while (1) { while (1) {
if (pRes == NULL) {
pRes = taosArrayInit(4, sizeof(SSDataBlock));
}
if (streamTaskShouldStop(&pTask->status)) { if (streamTaskShouldStop(&pTask->status)) {
taosArrayDestroy(pRes); // memory leak
return 0; return 0;
} }
@ -97,17 +101,18 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray*
if (pItem->type == STREAM_INPUT__DATA_RETRIEVE) { if (pItem->type == STREAM_INPUT__DATA_RETRIEVE) {
SSDataBlock block = {0}; SSDataBlock block = {0};
const SStreamDataBlock* pRetrieveBlock = (const SStreamDataBlock*)data; const SStreamDataBlock* pRetrieveBlock = (const SStreamDataBlock*) pItem;
ASSERT(taosArrayGetSize(pRetrieveBlock->blocks) == 1); ASSERT(taosArrayGetSize(pRetrieveBlock->blocks) == 1);
assignOneDataBlock(&block, taosArrayGet(pRetrieveBlock->blocks, 0)); assignOneDataBlock(&block, taosArrayGet(pRetrieveBlock->blocks, 0));
block.info.type = STREAM_PULL_OVER; block.info.type = STREAM_PULL_OVER;
block.info.childId = pTask->selfChildId; block.info.childId = pTask->selfChildId;
taosArrayPush(pRes, &block); taosArrayPush(pRes, &block);
numOfBlocks += 1;
qDebug("task %d(child %d) processed retrieve, reqId %" PRId64, pTask->id.taskId, pTask->selfChildId, qDebug("s-task:%s(child %d) processed retrieve, reqId:0x%" PRIx64, pTask->id.idStr, pTask->selfChildId,
pRetrieveBlock->reqId); pRetrieveBlock->reqId);
} }
break; break;
} }
@ -118,15 +123,40 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray*
continue; continue;
} }
qDebug("s-task:%s (child %d) executed and get block", pTask->id.idStr, pTask->selfChildId);
SSDataBlock block = {0}; SSDataBlock block = {0};
assignOneDataBlock(&block, output); assignOneDataBlock(&block, output);
block.info.childId = pTask->selfChildId; block.info.childId = pTask->selfChildId;
size += blockDataGetSize(output) + sizeof(SSDataBlock) + sizeof(SColumnInfoData) * blockDataGetNumOfCols(&block);
numOfBlocks += 1;
taosArrayPush(pRes, &block); taosArrayPush(pRes, &block);
qDebug("s-task:%s (child %d) executed and get block, total blocks:%d, size:%.2fMiB", pTask->id.idStr,
pTask->selfChildId, numOfBlocks, size / 1048576.0);
// current output should be dispatched to down stream nodes
if (numOfBlocks >= MAX_STREAM_RESULT_DUMP_THRESHOLD) {
ASSERT(numOfBlocks == taosArrayGetSize(pRes));
code = doDumpResult(pTask, pItem, pRes, size, totalSize, totalBlocks);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
pRes = NULL;
size = 0;
numOfBlocks = 0;
}
} }
return 0; if (numOfBlocks > 0) {
ASSERT(numOfBlocks == taosArrayGetSize(pRes));
code = doDumpResult(pTask, pItem, pRes, size, totalSize, totalBlocks);
} else {
taosArrayDestroy(pRes);
}
return code;
} }
int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) { int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) {
@ -200,7 +230,7 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) {
qRes->type = STREAM_INPUT__DATA_BLOCK; qRes->type = STREAM_INPUT__DATA_BLOCK;
qRes->blocks = pRes; qRes->blocks = pRes;
code = streamTaskOutput(pTask, qRes); code = streamTaskOutputResultBlock(pTask, qRes);
if (code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY) { if (code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY) {
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
taosFreeQitem(qRes); taosFreeQitem(qRes);
@ -209,7 +239,7 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) {
if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
qDebug("s-task:%s scan exec dispatch blocks:%d", pTask->id.idStr, batchCnt); qDebug("s-task:%s scan exec dispatch blocks:%d", pTask->id.idStr, batchCnt);
streamDispatch(pTask); streamDispatchStreamBlock(pTask);
} }
if (finished) { if (finished) {
@ -246,7 +276,7 @@ int32_t streamBatchExec(SStreamTask* pTask, int32_t batchLimit) {
if (pTask->taskLevel == TASK_LEVEL__SINK) { if (pTask->taskLevel == TASK_LEVEL__SINK) {
ASSERT(((SStreamQueueItem*)pItem)->type == STREAM_INPUT__DATA_BLOCK); ASSERT(((SStreamQueueItem*)pItem)->type == STREAM_INPUT__DATA_BLOCK);
streamTaskOutput(pTask, (SStreamDataBlock*)pItem); streamTaskOutputResultBlock(pTask, (SStreamDataBlock*)pItem);
} }
// exec impl // exec impl
@ -257,6 +287,34 @@ int32_t streamBatchExec(SStreamTask* pTask, int32_t batchLimit) {
} }
#endif #endif
int32_t updateCheckPointInfo (SStreamTask* pTask) {
int64_t ckId = 0;
int64_t dataVer = 0;
qGetCheckpointVersion(pTask->exec.pExecutor, &dataVer, &ckId);
SCheckpointInfo* pCkInfo = &pTask->chkInfo;
if (ckId > pCkInfo->id) { // save it since the checkpoint is updated
qDebug("s-task:%s exec end, start to update check point, ver from %" PRId64 " to %" PRId64
", checkPoint id:%" PRId64 " -> %" PRId64, pTask->id.idStr, pCkInfo->version, dataVer, pCkInfo->id, ckId);
pTask->chkInfo = (SCheckpointInfo){.version = dataVer, .id = ckId, .currentVer = pCkInfo->currentVer};
taosWLockLatch(&pTask->pMeta->lock);
streamMetaSaveTask(pTask->pMeta, pTask);
if (streamMetaCommit(pTask->pMeta) < 0) {
taosWUnLockLatch(&pTask->pMeta->lock);
qError("s-task:%s failed to commit stream meta, since %s", pTask->id.idStr, terrstr());
return -1;
} else {
taosWUnLockLatch(&pTask->pMeta->lock);
qDebug("s-task:%s update checkpoint ver succeed", pTask->id.idStr);
}
}
return TSDB_CODE_SUCCESS;
}
int32_t streamExecForAll(SStreamTask* pTask) { int32_t streamExecForAll(SStreamTask* pTask) {
int32_t code = 0; int32_t code = 0;
while (1) { while (1) {
@ -272,6 +330,7 @@ int32_t streamExecForAll(SStreamTask* pTask) {
if (streamTaskShouldPause(&pTask->status)) { if (streamTaskShouldPause(&pTask->status)) {
return 0; return 0;
} }
SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue); SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue);
if (qItem == NULL) { if (qItem == NULL) {
if (pTask->taskLevel == TASK_LEVEL__SOURCE && batchSize < MIN_STREAM_EXEC_BATCH_NUM && times < 5) { if (pTask->taskLevel == TASK_LEVEL__SOURCE && batchSize < MIN_STREAM_EXEC_BATCH_NUM && times < 5) {
@ -324,74 +383,70 @@ int32_t streamExecForAll(SStreamTask* pTask) {
if (pTask->taskLevel == TASK_LEVEL__SINK) { if (pTask->taskLevel == TASK_LEVEL__SINK) {
ASSERT(pInput->type == STREAM_INPUT__DATA_BLOCK); ASSERT(pInput->type == STREAM_INPUT__DATA_BLOCK);
qDebug("s-task:%s sink node start to sink result. numOfBlocks:%d", pTask->id.idStr, batchSize); qDebug("s-task:%s sink node start to sink result. numOfBlocks:%d", pTask->id.idStr, batchSize);
streamTaskOutput(pTask, (SStreamDataBlock*)pInput); streamTaskOutputResultBlock(pTask, (SStreamDataBlock*)pInput);
continue; continue;
} }
SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock)); // wait for the task to be ready to go
while (pTask->taskLevel == TASK_LEVEL__SOURCE) {
int8_t status = atomic_load_8(&pTask->status.taskStatus);
if (status != TASK_STATUS__NORMAL && status != TASK_STATUS__PAUSE) {
qError("stream task wait for the end of fill history, s-task:%s, status:%d", pTask->id.idStr,
atomic_load_8(&pTask->status.taskStatus));
taosMsleep(2);
} else {
break;
}
}
int64_t st = taosGetTimestampMs();
qDebug("s-task:%s start to execute, block batches:%d", pTask->id.idStr, batchSize); qDebug("s-task:%s start to execute, block batches:%d", pTask->id.idStr, batchSize);
streamTaskExecImpl(pTask, pInput, pRes); {
// set input
void* pExecutor = pTask->exec.pExecutor;
int64_t ckId = 0; const SStreamQueueItem* pItem = pInput;
int64_t dataVer = 0; if (pItem->type == STREAM_INPUT__GET_RES) {
qGetCheckpointVersion(pTask->exec.pExecutor, &dataVer, &ckId); const SStreamTrigger* pTrigger = (const SStreamTrigger*)pInput;
if (ckId > pTask->chkInfo.id) { // save it since the checkpoint is updated qSetMultiStreamInput(pExecutor, pTrigger->pBlock, 1, STREAM_INPUT__DATA_BLOCK);
qDebug("s-task:%s exec end, start to update check point, ver from %" PRId64 " to %" PRId64 } else if (pItem->type == STREAM_INPUT__DATA_SUBMIT) {
", checkPoint id:%" PRId64 " -> %" PRId64, ASSERT(pTask->taskLevel == TASK_LEVEL__SOURCE);
pTask->id.idStr, pTask->chkInfo.version, dataVer, pTask->chkInfo.id, ckId); const SStreamDataSubmit* pSubmit = (const SStreamDataSubmit*)pInput;
qSetMultiStreamInput(pExecutor, &pSubmit->submit, 1, STREAM_INPUT__DATA_SUBMIT);
qDebug("s-task:%s set submit blocks as source block completed, %p %p len:%d ver:%" PRId64, pTask->id.idStr, pSubmit,
pSubmit->submit.msgStr, pSubmit->submit.msgLen, pSubmit->submit.ver);
} else if (pItem->type == STREAM_INPUT__DATA_BLOCK || pItem->type == STREAM_INPUT__DATA_RETRIEVE) {
const SStreamDataBlock* pBlock = (const SStreamDataBlock*)pInput;
pTask->chkInfo = (SCheckpointInfo){.version = dataVer, .id = ckId, .currentVer = pTask->chkInfo.currentVer}; SArray* pBlockList = pBlock->blocks;
int32_t numOfBlocks = taosArrayGetSize(pBlockList);
qDebug("s-task:%s set sdata blocks as input num:%d, ver:%" PRId64, pTask->id.idStr, numOfBlocks, pBlock->sourceVer);
qSetMultiStreamInput(pExecutor, pBlockList->pData, numOfBlocks, STREAM_INPUT__DATA_BLOCK);
} else if (pItem->type == STREAM_INPUT__MERGED_SUBMIT) {
const SStreamMergedSubmit* pMerged = (const SStreamMergedSubmit*)pInput;
taosWLockLatch(&pTask->pMeta->lock); SArray* pBlockList = pMerged->submits;
int32_t numOfBlocks = taosArrayGetSize(pBlockList);
streamMetaSaveTask(pTask->pMeta, pTask); qDebug("s-task:%s %p set submit input (merged), batch num:%d", pTask->id.idStr, pTask, numOfBlocks);
if (streamMetaCommit(pTask->pMeta) < 0) { qSetMultiStreamInput(pExecutor, pBlockList->pData, numOfBlocks, STREAM_INPUT__MERGED_SUBMIT);
taosWUnLockLatch(&pTask->pMeta->lock); } else if (pItem->type == STREAM_INPUT__REF_DATA_BLOCK) {
qError("s-task:%s failed to commit stream meta, since %s", pTask->id.idStr, terrstr()); const SStreamRefDataBlock* pRefBlock = (const SStreamRefDataBlock*)pInput;
return -1; qSetMultiStreamInput(pExecutor, pRefBlock->pBlock, 1, STREAM_INPUT__DATA_BLOCK);
} else { } else {
taosWUnLockLatch(&pTask->pMeta->lock); ASSERT(0);
qDebug("s-task:%s update checkpoint ver succeed", pTask->id.idStr);
} }
} else {
qDebug("s-task:%s exec end", pTask->id.idStr);
} }
if (taosArrayGetSize(pRes) != 0) { int64_t resSize = 0;
SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0); int32_t totalBlocks = 0;
if (qRes == NULL) { streamTaskExecImpl(pTask, pInput, &resSize, &totalBlocks);
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
streamFreeQitem(pInput);
return -1;
}
qRes->type = STREAM_INPUT__DATA_BLOCK; double el = (taosGetTimestampMs() - st) / 1000.0;
qRes->blocks = pRes; qDebug("s-task:%s exec end, elapsed time:%.2fs, result size:%.2fMiB, numOfBlocks:%d", pTask->id.idStr, el, resSize / 1048576.0, totalBlocks);
if (((SStreamQueueItem*)pInput)->type == STREAM_INPUT__DATA_SUBMIT) {
SStreamDataSubmit* pSubmit = (SStreamDataSubmit*)pInput;
qRes->childId = pTask->selfChildId;
qRes->sourceVer = pSubmit->ver;
} else if (((SStreamQueueItem*)pInput)->type == STREAM_INPUT__MERGED_SUBMIT) {
SStreamMergedSubmit* pMerged = (SStreamMergedSubmit*)pInput;
qRes->childId = pTask->selfChildId;
qRes->sourceVer = pMerged->ver;
}
code = streamTaskOutput(pTask, qRes);
if (code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY) {
// backpressure and record position
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
streamFreeQitem(pInput);
taosFreeQitem(qRes);
return -1;
}
} else {
taosArrayDestroy(pRes);
}
streamFreeQitem(pInput); streamFreeQitem(pInput);
} }
return 0; return 0;
} }

View File

@ -296,6 +296,7 @@ int32_t streamMetaBegin(SStreamMeta* pMeta) {
return 0; return 0;
} }
// todo add error log
int32_t streamMetaCommit(SStreamMeta* pMeta) { int32_t streamMetaCommit(SStreamMeta* pMeta) {
if (tdbCommit(pMeta->db, pMeta->txn) < 0) { if (tdbCommit(pMeta->db, pMeta->txn) < 0) {
qError("failed to commit stream meta"); qError("failed to commit stream meta");
@ -311,6 +312,7 @@ int32_t streamMetaCommit(SStreamMeta* pMeta) {
TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) { TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) {
return -1; return -1;
} }
return 0; return 0;
} }
@ -373,7 +375,7 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) {
} }
if (pTask->fillHistory) { if (pTask->fillHistory) {
pTask->status.taskStatus = TASK_STATUS__WAIT_DOWNSTREAM; ASSERT(pTask->status.taskStatus == TASK_STATUS__WAIT_DOWNSTREAM);
streamTaskCheckDownstream(pTask, ver); streamTaskCheckDownstream(pTask, ver);
} }
} }

View File

@ -20,7 +20,7 @@ int32_t streamTaskLaunchRecover(SStreamTask* pTask, int64_t version) {
if (pTask->taskLevel == TASK_LEVEL__SOURCE) { if (pTask->taskLevel == TASK_LEVEL__SOURCE) {
atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__RECOVER_PREPARE); atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__RECOVER_PREPARE);
qDebug("s-task:%s set task status:%d and start recover", pTask->id.idStr, pTask->status.taskStatus); qDebug("s-task:%s set task status:%d and start to recover", pTask->id.idStr, pTask->status.taskStatus);
streamSetParamForRecover(pTask); streamSetParamForRecover(pTask);
streamSourceRecoverPrepareStep1(pTask, version); streamSourceRecoverPrepareStep1(pTask, version);
@ -46,6 +46,7 @@ int32_t streamTaskLaunchRecover(SStreamTask* pTask, int64_t version) {
streamSetParamForRecover(pTask); streamSetParamForRecover(pTask);
streamAggRecoverPrepare(pTask); streamAggRecoverPrepare(pTask);
} else if (pTask->taskLevel == TASK_LEVEL__SINK) { } else if (pTask->taskLevel == TASK_LEVEL__SINK) {
// sink nodes has no specified operation for fill history
atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__NORMAL); atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__NORMAL);
} }
@ -54,7 +55,7 @@ int32_t streamTaskLaunchRecover(SStreamTask* pTask, int64_t version) {
// checkstatus // checkstatus
int32_t streamTaskCheckDownstream(SStreamTask* pTask, int64_t version) { int32_t streamTaskCheckDownstream(SStreamTask* pTask, int64_t version) {
qDebug("s-taks:%s in fill history stage, ver:%"PRId64, pTask->id.idStr, version); qDebug("s-task:%s in fill history stage, ver:%"PRId64, pTask->id.idStr, version);
SStreamTaskCheckReq req = { SStreamTaskCheckReq req = {
.streamId = pTask->id.streamId, .streamId = pTask->id.streamId,
@ -71,23 +72,23 @@ int32_t streamTaskCheckDownstream(SStreamTask* pTask, int64_t version) {
req.downstreamTaskId = pTask->fixedEpDispatcher.taskId; req.downstreamTaskId = pTask->fixedEpDispatcher.taskId;
pTask->checkReqId = req.reqId; pTask->checkReqId = req.reqId;
qDebug("s-task:%s at node %d check downstream task %d at node %d", pTask->id.idStr, pTask->nodeId, req.downstreamTaskId, qDebug("s-task:%s at node %d check downstream task:0x%x at node %d", pTask->id.idStr, pTask->nodeId, req.downstreamTaskId,
req.downstreamNodeId); req.downstreamNodeId);
streamDispatchCheckMsg(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet); streamDispatchCheckMsg(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet);
} else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { } else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos; SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
int32_t vgSz = taosArrayGetSize(vgInfo); int32_t numOfVgs = taosArrayGetSize(vgInfo);
pTask->recoverTryingDownstream = vgSz; pTask->recoverTryingDownstream = numOfVgs;
pTask->checkReqIds = taosArrayInit(vgSz, sizeof(int64_t)); pTask->checkReqIds = taosArrayInit(numOfVgs, sizeof(int64_t));
for (int32_t i = 0; i < vgSz; i++) { for (int32_t i = 0; i < numOfVgs; i++) {
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i); SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
req.reqId = tGenIdPI64(); req.reqId = tGenIdPI64();
taosArrayPush(pTask->checkReqIds, &req.reqId); taosArrayPush(pTask->checkReqIds, &req.reqId);
req.downstreamNodeId = pVgInfo->vgId; req.downstreamNodeId = pVgInfo->vgId;
req.downstreamTaskId = pVgInfo->taskId; req.downstreamTaskId = pVgInfo->taskId;
qDebug("s-task:%s at node %d check downstream task %d at node %d (shuffle)", pTask->id.idStr, pTask->nodeId, qDebug("s-task:%s at node %d check downstream task:0x%x at node %d (shuffle)", pTask->id.idStr, pTask->nodeId,
req.downstreamTaskId, req.downstreamNodeId); req.downstreamTaskId, req.downstreamNodeId);
streamDispatchCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet); streamDispatchCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
} }
@ -110,15 +111,16 @@ int32_t streamRecheckOneDownstream(SStreamTask* pTask, const SStreamTaskCheckRsp
.childId = pRsp->childId, .childId = pRsp->childId,
}; };
qDebug("s-task:%s at node %d check downstream task %d at node %d (recheck)", pTask->id.idStr, pTask->nodeId, qDebug("s-task:%s at node %d check downstream task:0x%x at node %d (recheck)", pTask->id.idStr, pTask->nodeId,
req.downstreamTaskId, req.downstreamNodeId); req.downstreamTaskId, req.downstreamNodeId);
if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) { if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) {
streamDispatchCheckMsg(pTask, &req, pRsp->downstreamNodeId, &pTask->fixedEpDispatcher.epSet); streamDispatchCheckMsg(pTask, &req, pRsp->downstreamNodeId, &pTask->fixedEpDispatcher.epSet);
} else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { } else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos; SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
int32_t vgSz = taosArrayGetSize(vgInfo);
for (int32_t i = 0; i < vgSz; i++) { int32_t numOfVgs = taosArrayGetSize(vgInfo);
for (int32_t i = 0; i < numOfVgs; i++) {
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i); SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
if (pVgInfo->taskId == req.downstreamTaskId) { if (pVgInfo->taskId == req.downstreamTaskId) {
streamDispatchCheckMsg(pTask, &req, pRsp->downstreamNodeId, &pVgInfo->epSet); streamDispatchCheckMsg(pTask, &req, pRsp->downstreamNodeId, &pVgInfo->epSet);
@ -134,7 +136,9 @@ int32_t streamTaskCheckStatus(SStreamTask* pTask) {
} }
int32_t streamProcessTaskCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp, int64_t version) { int32_t streamProcessTaskCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp, int64_t version) {
qDebug("task %d at node %d recv check rsp from task %d at node %d: status %d", pRsp->upstreamTaskId, ASSERT(pTask->id.taskId == pRsp->upstreamTaskId);
qDebug("s-task:%s at node %d recv check rsp from task:0x%x at node %d: status %d", pTask->id.idStr,
pRsp->upstreamNodeId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->status); pRsp->upstreamNodeId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->status);
if (pRsp->status == 1) { if (pRsp->status == 1) {
@ -161,7 +165,7 @@ int32_t streamProcessTaskCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp*
taosArrayDestroy(pTask->checkReqIds); taosArrayDestroy(pTask->checkReqIds);
pTask->checkReqIds = NULL; pTask->checkReqIds = NULL;
qDebug("s-task:%s all downstream tasks:%d are ready, now enter into recover stage", pTask->id.idStr, numOfReqs); qDebug("s-task:%s all %d downstream tasks are ready, now enter into recover stage", pTask->id.idStr, numOfReqs);
streamTaskLaunchRecover(pTask, version); streamTaskLaunchRecover(pTask, version);
} }
} else if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) { } else if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) {
@ -174,9 +178,10 @@ int32_t streamProcessTaskCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp*
ASSERT(0); ASSERT(0);
} }
} else { // not ready, wait for 100ms and retry } else { // not ready, wait for 100ms and retry
qDebug("s-task:%s downstream taskId:%d (vgId:%d) not ready, wait for 100ms and retry", pTask->id.idStr, qDebug("s-task:%s downstream taskId:0x%x (vgId:%d) not ready, wait for 100ms and retry", pTask->id.idStr,
pRsp->downstreamTaskId, pRsp->downstreamNodeId); pRsp->downstreamTaskId, pRsp->downstreamNodeId);
taosMsleep(100); taosMsleep(100);
streamRecheckOneDownstream(pTask, pRsp); streamRecheckOneDownstream(pTask, pRsp);
} }