fix(stream): dispatch
This commit is contained in:
parent
9c8055c479
commit
3fa5791889
|
@ -171,8 +171,8 @@ typedef struct {
|
||||||
} STaskDispatcherFixedEp;
|
} STaskDispatcherFixedEp;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
// int8_t hashMethod;
|
|
||||||
char stbFullName[TSDB_TABLE_FNAME_LEN];
|
char stbFullName[TSDB_TABLE_FNAME_LEN];
|
||||||
|
int32_t waitingRspCnt;
|
||||||
SUseDbRsp dbInfo;
|
SUseDbRsp dbInfo;
|
||||||
} STaskDispatcherShuffle;
|
} STaskDispatcherShuffle;
|
||||||
|
|
||||||
|
|
|
@ -114,18 +114,26 @@ int32_t mndAddSinkToTask(SMnode* pMnode, SStreamObj* pStream, SStreamTask* pTask
|
||||||
|
|
||||||
int32_t mndAddDispatcherToInnerTask(SMnode* pMnode, SStreamObj* pStream, SStreamTask* pTask) {
|
int32_t mndAddDispatcherToInnerTask(SMnode* pMnode, SStreamObj* pStream, SStreamTask* pTask) {
|
||||||
pTask->sinkType = TASK_SINK__NONE;
|
pTask->sinkType = TASK_SINK__NONE;
|
||||||
|
|
||||||
|
bool isShuffle = false;
|
||||||
|
|
||||||
if (pStream->fixedSinkVgId == 0) {
|
if (pStream->fixedSinkVgId == 0) {
|
||||||
pTask->dispatchType = TASK_DISPATCH__SHUFFLE;
|
pTask->dispatchType = TASK_DISPATCH__SHUFFLE;
|
||||||
pTask->dispatchMsgType = TDMT_STREAM_TASK_DISPATCH;
|
pTask->dispatchMsgType = TDMT_STREAM_TASK_DISPATCH;
|
||||||
SDbObj* pDb = mndAcquireDb(pMnode, pStream->targetDb);
|
SDbObj* pDb = mndAcquireDb(pMnode, pStream->targetDb);
|
||||||
ASSERT(pDb);
|
ASSERT(pDb);
|
||||||
|
if (pDb->cfg.numOfVgroups > 1) {
|
||||||
|
isShuffle = true;
|
||||||
if (mndExtractDbInfo(pMnode, pDb, &pTask->shuffleDispatcher.dbInfo, NULL) < 0) {
|
if (mndExtractDbInfo(pMnode, pDb, &pTask->shuffleDispatcher.dbInfo, NULL) < 0) {
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
sdbRelease(pMnode->pSdb, pDb);
|
}
|
||||||
|
|
||||||
|
sdbRelease(pMnode->pSdb, pDb);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (isShuffle) {
|
||||||
memcpy(pTask->shuffleDispatcher.stbFullName, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN);
|
memcpy(pTask->shuffleDispatcher.stbFullName, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN);
|
||||||
SArray* pVgs = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
|
SArray* pVgs = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
|
||||||
int32_t sz = taosArrayGetSize(pVgs);
|
int32_t sz = taosArrayGetSize(pVgs);
|
||||||
|
|
|
@ -468,7 +468,6 @@ typedef struct SStreamScanInfo {
|
||||||
SSDataBlock* pUpdateDataRes;
|
SSDataBlock* pUpdateDataRes;
|
||||||
// status for tmq
|
// status for tmq
|
||||||
// SSchemaWrapper schema;
|
// SSchemaWrapper schema;
|
||||||
STqOffset offset;
|
|
||||||
SNodeList* pGroupTags;
|
SNodeList* pGroupTags;
|
||||||
SNode* pTagCond;
|
SNode* pTagCond;
|
||||||
SNode* pTagIndexCond;
|
SNode* pTagIndexCond;
|
||||||
|
|
|
@ -14,12 +14,12 @@
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "executor.h"
|
#include "executor.h"
|
||||||
#include "tref.h"
|
|
||||||
#include "executorimpl.h"
|
#include "executorimpl.h"
|
||||||
#include "planner.h"
|
#include "planner.h"
|
||||||
#include "tdatablock.h"
|
#include "tdatablock.h"
|
||||||
#include "vnode.h"
|
#include "tref.h"
|
||||||
#include "tudf.h"
|
#include "tudf.h"
|
||||||
|
#include "vnode.h"
|
||||||
|
|
||||||
static TdThreadOnce initPoolOnce = PTHREAD_ONCE_INIT;
|
static TdThreadOnce initPoolOnce = PTHREAD_ONCE_INIT;
|
||||||
int32_t exchangeObjRefPool = -1;
|
int32_t exchangeObjRefPool = -1;
|
||||||
|
@ -95,16 +95,6 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#if 0
|
|
||||||
int32_t qStreamScanSnapshot(qTaskInfo_t tinfo) {
|
|
||||||
if (tinfo == NULL) {
|
|
||||||
return TSDB_CODE_QRY_APP_ERROR;
|
|
||||||
}
|
|
||||||
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
|
||||||
return doSetStreamBlock(pTaskInfo->pRoot, NULL, 0, STREAM_INPUT__TABLE_SCAN, 0, NULL);
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
int32_t qSetStreamInput(qTaskInfo_t tinfo, const void* input, int32_t type, bool assignUid) {
|
int32_t qSetStreamInput(qTaskInfo_t tinfo, const void* input, int32_t type, bool assignUid) {
|
||||||
return qSetMultiStreamInput(tinfo, input, 1, type, assignUid);
|
return qSetMultiStreamInput(tinfo, input, 1, type, assignUid);
|
||||||
}
|
}
|
||||||
|
@ -573,11 +563,6 @@ const SSchemaWrapper* qExtractSchemaFromStreamScanner(void* scanner) {
|
||||||
return pInfo->tqReader->pSchemaWrapper;
|
return pInfo->tqReader->pSchemaWrapper;
|
||||||
}
|
}
|
||||||
|
|
||||||
const STqOffset* qExtractStatusFromStreamScanner(void* scanner) {
|
|
||||||
SStreamScanInfo* pInfo = scanner;
|
|
||||||
return &pInfo->offset;
|
|
||||||
}
|
|
||||||
|
|
||||||
void* qStreamExtractMetaMsg(qTaskInfo_t tinfo) {
|
void* qStreamExtractMetaMsg(qTaskInfo_t tinfo) {
|
||||||
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
||||||
ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE);
|
ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE);
|
||||||
|
@ -600,7 +585,12 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, const STqOffsetVal* pOffset) {
|
||||||
while (1) {
|
while (1) {
|
||||||
uint8_t type = pOperator->operatorType;
|
uint8_t type = pOperator->operatorType;
|
||||||
pOperator->status = OP_OPENED;
|
pOperator->status = OP_OPENED;
|
||||||
if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
|
// TODO add more check
|
||||||
|
if (type != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
|
||||||
|
ASSERT(pOperator->numOfDownstream == 1);
|
||||||
|
pOperator = pOperator->pDownstream[0];
|
||||||
|
}
|
||||||
|
|
||||||
SStreamScanInfo* pInfo = pOperator->info;
|
SStreamScanInfo* pInfo = pOperator->info;
|
||||||
if (pOffset->type == TMQ_OFFSET__LOG) {
|
if (pOffset->type == TMQ_OFFSET__LOG) {
|
||||||
STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
|
STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
|
||||||
|
@ -681,35 +671,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, const STqOffsetVal* pOffset) {
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
} else {
|
|
||||||
ASSERT(pOperator->numOfDownstream == 1);
|
|
||||||
pOperator = pOperator->pDownstream[0];
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
#if 0
|
|
||||||
int32_t qStreamPrepareTsdbScan(qTaskInfo_t tinfo, uint64_t uid, int64_t ts) {
|
|
||||||
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
|
||||||
|
|
||||||
if (uid == 0) {
|
|
||||||
if (taosArrayGetSize(pTaskInfo->tableqinfoList.pTableList) != 0) {
|
|
||||||
STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, 0);
|
|
||||||
uid = pTableInfo->uid;
|
|
||||||
ts = INT64_MIN;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return doPrepareScan(pTaskInfo->pRoot, uid, ts);
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t qGetStreamScanStatus(qTaskInfo_t tinfo, uint64_t* uid, int64_t* ts) {
|
|
||||||
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
|
||||||
|
|
||||||
return doGetScanStatus(pTaskInfo->pRoot, uid, ts);
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
|
|
|
@ -1396,13 +1396,11 @@ SOperatorInfo* createRawScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNo
|
||||||
|
|
||||||
static void destroyStreamScanOperatorInfo(void* param, int32_t numOfOutput) {
|
static void destroyStreamScanOperatorInfo(void* param, int32_t numOfOutput) {
|
||||||
SStreamScanInfo* pStreamScan = (SStreamScanInfo*)param;
|
SStreamScanInfo* pStreamScan = (SStreamScanInfo*)param;
|
||||||
#if 1
|
|
||||||
if (pStreamScan->pTableScanOp && pStreamScan->pTableScanOp->info) {
|
if (pStreamScan->pTableScanOp && pStreamScan->pTableScanOp->info) {
|
||||||
STableScanInfo* pTableScanInfo = pStreamScan->pTableScanOp->info;
|
STableScanInfo* pTableScanInfo = pStreamScan->pTableScanOp->info;
|
||||||
destroyTableScanOperatorInfo(pTableScanInfo, numOfOutput);
|
destroyTableScanOperatorInfo(pTableScanInfo, numOfOutput);
|
||||||
taosMemoryFreeClear(pStreamScan->pTableScanOp);
|
taosMemoryFreeClear(pStreamScan->pTableScanOp);
|
||||||
}
|
}
|
||||||
#endif
|
|
||||||
if (pStreamScan->tqReader) {
|
if (pStreamScan->tqReader) {
|
||||||
tqCloseReader(pStreamScan->tqReader);
|
tqCloseReader(pStreamScan->tqReader);
|
||||||
}
|
}
|
||||||
|
@ -2855,7 +2853,8 @@ int32_t stopGroupTableMergeScan(SOperatorInfo* pOperator) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, SSDataBlock* pResBlock, int32_t capacity, SOperatorInfo* pOperator) {
|
SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, SSDataBlock* pResBlock, int32_t capacity,
|
||||||
|
SOperatorInfo* pOperator) {
|
||||||
STableMergeScanInfo* pInfo = pOperator->info;
|
STableMergeScanInfo* pInfo = pOperator->info;
|
||||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
|
|
||||||
|
@ -2874,7 +2873,6 @@ SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, SSDataBlock*
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
qDebug("%s get sorted row blocks, rows:%d", GET_TASKID(pTaskInfo), pResBlock->info.rows);
|
qDebug("%s get sorted row blocks, rows:%d", GET_TASKID(pTaskInfo), pResBlock->info.rows);
|
||||||
return (pResBlock->info.rows > 0) ? pResBlock : NULL;
|
return (pResBlock->info.rows > 0) ? pResBlock : NULL;
|
||||||
}
|
}
|
||||||
|
@ -2905,7 +2903,8 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) {
|
||||||
}
|
}
|
||||||
SSDataBlock* pBlock = NULL;
|
SSDataBlock* pBlock = NULL;
|
||||||
while (pInfo->tableStartIndex < tableListSize) {
|
while (pInfo->tableStartIndex < tableListSize) {
|
||||||
pBlock = getSortedTableMergeScanBlockData(pInfo->pSortHandle, pInfo->pResBlock, pOperator->resultInfo.capacity, pOperator);
|
pBlock = getSortedTableMergeScanBlockData(pInfo->pSortHandle, pInfo->pResBlock, pOperator->resultInfo.capacity,
|
||||||
|
pOperator);
|
||||||
if (pBlock != NULL) {
|
if (pBlock != NULL) {
|
||||||
pBlock->info.groupId = pInfo->groupId;
|
pBlock->info.groupId = pInfo->groupId;
|
||||||
pOperator->resultInfo.totalRows += pBlock->info.rows;
|
pOperator->resultInfo.totalRows += pBlock->info.rows;
|
||||||
|
|
|
@ -2968,8 +2968,8 @@ SSDataBlock* createSpecialDataBlock(EStreamType type) {
|
||||||
pBlock->info.groupId = 0;
|
pBlock->info.groupId = 0;
|
||||||
pBlock->info.rows = 0;
|
pBlock->info.rows = 0;
|
||||||
pBlock->info.type = type;
|
pBlock->info.type = type;
|
||||||
pBlock->info.rowSize = sizeof(TSKEY) + sizeof(TSKEY) + sizeof(uint64_t) +
|
pBlock->info.rowSize =
|
||||||
sizeof(uint64_t) + sizeof(TSKEY) + sizeof(TSKEY);
|
sizeof(TSKEY) + sizeof(TSKEY) + sizeof(uint64_t) + sizeof(uint64_t) + sizeof(TSKEY) + sizeof(TSKEY);
|
||||||
|
|
||||||
pBlock->pDataBlock = taosArrayInit(6, sizeof(SColumnInfoData));
|
pBlock->pDataBlock = taosArrayInit(6, sizeof(SColumnInfoData));
|
||||||
SColumnInfoData infoData = {0};
|
SColumnInfoData infoData = {0};
|
||||||
|
|
|
@ -38,7 +38,7 @@ int32_t streamPipelineExec(SStreamTask* pTask, int32_t batchNum);
|
||||||
int32_t streamDispatch(SStreamTask* pTask, SMsgCb* pMsgCb);
|
int32_t streamDispatch(SStreamTask* pTask, SMsgCb* pMsgCb);
|
||||||
int32_t streamDispatchReqToData(const SStreamDispatchReq* pReq, SStreamDataBlock* pData);
|
int32_t streamDispatchReqToData(const SStreamDispatchReq* pReq, SStreamDataBlock* pData);
|
||||||
int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock* pData);
|
int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock* pData);
|
||||||
int32_t streamBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* data, SRpcMsg* pMsg, SEpSet** ppEpSet);
|
int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* data);
|
||||||
|
|
||||||
int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock);
|
int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock);
|
||||||
|
|
||||||
|
|
|
@ -219,6 +219,12 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp) {
|
||||||
|
|
||||||
qDebug("task %d receive dispatch rsp", pTask->taskId);
|
qDebug("task %d receive dispatch rsp", pTask->taskId);
|
||||||
|
|
||||||
|
if (pTask->dispatchType == TASK_DISPATCH__SHUFFLE) {
|
||||||
|
int32_t leftRsp = atomic_sub_fetch_32(&pTask->shuffleDispatcher.waitingRspCnt, 1);
|
||||||
|
qDebug("task %d is shuffle, left waiting rsp %d", pTask->taskId, leftRsp);
|
||||||
|
if (leftRsp > 0) return 0;
|
||||||
|
}
|
||||||
|
|
||||||
int8_t old = atomic_exchange_8(&pTask->outputStatus, pRsp->inputStatus);
|
int8_t old = atomic_exchange_8(&pTask->outputStatus, pRsp->inputStatus);
|
||||||
ASSERT(old == TASK_OUTPUT_STATUS__WAIT);
|
ASSERT(old == TASK_OUTPUT_STATUS__WAIT);
|
||||||
if (pRsp->inputStatus == TASK_INPUT_STATUS__BLOCKED) {
|
if (pRsp->inputStatus == TASK_INPUT_STATUS__BLOCKED) {
|
||||||
|
|
|
@ -198,6 +198,154 @@ static int32_t streamAddBlockToDispatchMsg(const SSDataBlock* pBlock, SStreamDis
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t streamDispatchOneReq(SStreamTask* pTask, const SStreamDispatchReq* pReq, int32_t vgId, SEpSet* pEpSet) {
|
||||||
|
void* buf = NULL;
|
||||||
|
int32_t code = -1;
|
||||||
|
SRpcMsg msg = {0};
|
||||||
|
|
||||||
|
// serialize
|
||||||
|
int32_t tlen;
|
||||||
|
tEncodeSize(tEncodeStreamDispatchReq, pReq, tlen, code);
|
||||||
|
if (code < 0) goto FAIL;
|
||||||
|
code = -1;
|
||||||
|
buf = rpcMallocCont(sizeof(SMsgHead) + tlen);
|
||||||
|
if (buf == NULL) {
|
||||||
|
goto FAIL;
|
||||||
|
}
|
||||||
|
|
||||||
|
((SMsgHead*)buf)->vgId = htonl(vgId);
|
||||||
|
void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
|
||||||
|
|
||||||
|
SEncoder encoder;
|
||||||
|
tEncoderInit(&encoder, abuf, tlen);
|
||||||
|
if ((code = tEncodeStreamDispatchReq(&encoder, pReq)) < 0) {
|
||||||
|
goto FAIL;
|
||||||
|
}
|
||||||
|
tEncoderClear(&encoder);
|
||||||
|
|
||||||
|
msg.contLen = tlen + sizeof(SMsgHead);
|
||||||
|
msg.pCont = buf;
|
||||||
|
msg.msgType = pTask->dispatchMsgType;
|
||||||
|
|
||||||
|
tmsgSendReq(pEpSet, &msg);
|
||||||
|
|
||||||
|
code = 0;
|
||||||
|
FAIL:
|
||||||
|
if (code < 0 && buf) rpcFreeCont(buf);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pData) {
|
||||||
|
int32_t code = -1;
|
||||||
|
int32_t blockNum = taosArrayGetSize(pData->blocks);
|
||||||
|
ASSERT(blockNum != 0);
|
||||||
|
|
||||||
|
if (pTask->dispatchType == TASK_DISPATCH__FIXED) {
|
||||||
|
SStreamDispatchReq req = {
|
||||||
|
.streamId = pTask->streamId,
|
||||||
|
.dataSrcVgId = pData->srcVgId,
|
||||||
|
.upstreamTaskId = pTask->taskId,
|
||||||
|
.upstreamChildId = pTask->selfChildId,
|
||||||
|
.upstreamNodeId = pTask->nodeId,
|
||||||
|
.blockNum = blockNum,
|
||||||
|
};
|
||||||
|
|
||||||
|
req.data = taosArrayInit(blockNum, sizeof(void*));
|
||||||
|
req.dataLen = taosArrayInit(blockNum, sizeof(int32_t));
|
||||||
|
if (req.data == NULL || req.dataLen == NULL) {
|
||||||
|
goto FAIL_FIXED_DISPATCH;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < blockNum; i++) {
|
||||||
|
SSDataBlock* pDataBlock = taosArrayGet(pData->blocks, i);
|
||||||
|
if (streamAddBlockToDispatchMsg(pDataBlock, &req) < 0) {
|
||||||
|
goto FAIL_FIXED_DISPATCH;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
int32_t vgId = pTask->fixedEpDispatcher.nodeId;
|
||||||
|
SEpSet* pEpSet = &pTask->fixedEpDispatcher.epSet;
|
||||||
|
int32_t downstreamTaskId = pTask->fixedEpDispatcher.taskId;
|
||||||
|
|
||||||
|
req.taskId = downstreamTaskId;
|
||||||
|
|
||||||
|
qDebug("dispatch from task %d (child id %d) to down stream task %d in vnode %d", pTask->taskId, pTask->selfChildId,
|
||||||
|
downstreamTaskId, vgId);
|
||||||
|
|
||||||
|
if (streamDispatchOneReq(pTask, &req, vgId, pEpSet) < 0) {
|
||||||
|
goto FAIL_FIXED_DISPATCH;
|
||||||
|
}
|
||||||
|
code = 0;
|
||||||
|
FAIL_FIXED_DISPATCH:
|
||||||
|
taosArrayDestroy(req.data);
|
||||||
|
taosArrayDestroy(req.dataLen);
|
||||||
|
return code;
|
||||||
|
|
||||||
|
} else if (pTask->dispatchMsgType == TASK_DISPATCH__SHUFFLE) {
|
||||||
|
SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
|
||||||
|
ASSERT(pTask->shuffleDispatcher.waitingRspCnt == 0);
|
||||||
|
int32_t vgSz = taosArrayGetSize(vgInfo);
|
||||||
|
SStreamDispatchReq* pReqs = taosMemoryCalloc(vgSz, sizeof(SStreamDispatchReq));
|
||||||
|
if (pReqs == NULL) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < vgSz; i++) {
|
||||||
|
pReqs[i].streamId = pTask->streamId;
|
||||||
|
pReqs[i].dataSrcVgId = pData->srcVgId;
|
||||||
|
pReqs[i].upstreamTaskId = pTask->taskId;
|
||||||
|
pReqs[i].upstreamChildId = pTask->selfChildId;
|
||||||
|
pReqs[i].upstreamNodeId = pTask->nodeId;
|
||||||
|
pReqs[i].blockNum = 0;
|
||||||
|
pReqs[i].data = taosArrayInit(0, sizeof(void*));
|
||||||
|
pReqs[i].dataLen = taosArrayInit(0, sizeof(int32_t));
|
||||||
|
if (pReqs[i].data == NULL || pReqs[i].dataLen == NULL) {
|
||||||
|
goto FAIL_SHUFFLE_DISPATCH;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for (int32_t i = 0; i < blockNum; i++) {
|
||||||
|
SSDataBlock* pDataBlock = taosArrayGet(pData->blocks, i);
|
||||||
|
char* ctbName = buildCtbNameByGroupId(pTask->shuffleDispatcher.stbFullName, pDataBlock->info.groupId);
|
||||||
|
|
||||||
|
// TODO: get hash function by hashMethod
|
||||||
|
uint32_t hashValue = MurmurHash3_32(ctbName, strlen(ctbName));
|
||||||
|
|
||||||
|
// TODO: optimize search
|
||||||
|
int32_t j;
|
||||||
|
for (j = 0; j < vgSz; j++) {
|
||||||
|
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, j);
|
||||||
|
ASSERT(pVgInfo->vgId > 0);
|
||||||
|
if (hashValue >= pVgInfo->hashBegin && hashValue <= pVgInfo->hashEnd) {
|
||||||
|
if (streamAddBlockToDispatchMsg(pDataBlock, &pReqs[j]) < 0) {
|
||||||
|
goto FAIL_SHUFFLE_DISPATCH;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for (int32_t i = 0; i < vgSz; i++) {
|
||||||
|
if (pReqs[i].blockNum > 0) {
|
||||||
|
// send
|
||||||
|
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
|
||||||
|
if (streamDispatchOneReq(pTask, &pReqs[i], pVgInfo->vgId, &pVgInfo->epSet) < 0) {
|
||||||
|
goto FAIL_SHUFFLE_DISPATCH;
|
||||||
|
}
|
||||||
|
pTask->shuffleDispatcher.waitingRspCnt++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
code = 0;
|
||||||
|
FAIL_SHUFFLE_DISPATCH:
|
||||||
|
if (pReqs) {
|
||||||
|
for (int32_t i = 0; i < vgSz; i++) {
|
||||||
|
taosArrayDestroy(pReqs[i].data);
|
||||||
|
taosArrayDestroy(pReqs[i].dataLen);
|
||||||
|
}
|
||||||
|
taosMemoryFree(pReqs);
|
||||||
|
}
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t streamBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* data, SRpcMsg* pMsg, SEpSet** ppEpSet) {
|
int32_t streamBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* data, SRpcMsg* pMsg, SEpSet** ppEpSet) {
|
||||||
void* buf = NULL;
|
void* buf = NULL;
|
||||||
int32_t code = -1;
|
int32_t code = -1;
|
||||||
|
@ -262,29 +410,7 @@ int32_t streamBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* data,
|
||||||
qDebug("dispatch from task %d (child id %d) to down stream task %d in vnode %d", pTask->taskId, pTask->selfChildId,
|
qDebug("dispatch from task %d (child id %d) to down stream task %d in vnode %d", pTask->taskId, pTask->selfChildId,
|
||||||
downstreamTaskId, vgId);
|
downstreamTaskId, vgId);
|
||||||
|
|
||||||
// serialize
|
streamDispatchOneReq(pTask, &req, vgId, *ppEpSet);
|
||||||
int32_t tlen;
|
|
||||||
tEncodeSize(tEncodeStreamDispatchReq, &req, tlen, code);
|
|
||||||
if (code < 0) goto FAIL;
|
|
||||||
code = -1;
|
|
||||||
buf = rpcMallocCont(sizeof(SMsgHead) + tlen);
|
|
||||||
if (buf == NULL) {
|
|
||||||
goto FAIL;
|
|
||||||
}
|
|
||||||
|
|
||||||
((SMsgHead*)buf)->vgId = htonl(vgId);
|
|
||||||
void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
|
|
||||||
|
|
||||||
SEncoder encoder;
|
|
||||||
tEncoderInit(&encoder, abuf, tlen);
|
|
||||||
if ((code = tEncodeStreamDispatchReq(&encoder, &req)) < 0) {
|
|
||||||
goto FAIL;
|
|
||||||
}
|
|
||||||
tEncoderClear(&encoder);
|
|
||||||
|
|
||||||
pMsg->contLen = tlen + sizeof(SMsgHead);
|
|
||||||
pMsg->pCont = buf;
|
|
||||||
pMsg->msgType = pTask->dispatchMsgType;
|
|
||||||
|
|
||||||
code = 0;
|
code = 0;
|
||||||
FAIL:
|
FAIL:
|
||||||
|
@ -314,6 +440,18 @@ int32_t streamDispatch(SStreamTask* pTask, SMsgCb* pMsgCb) {
|
||||||
|
|
||||||
qDebug("stream continue dispatching: task %d", pTask->taskId);
|
qDebug("stream continue dispatching: task %d", pTask->taskId);
|
||||||
|
|
||||||
|
int32_t code = 0;
|
||||||
|
if (streamDispatchAllBlocks(pTask, pBlock) < 0) {
|
||||||
|
ASSERT(0);
|
||||||
|
code = -1;
|
||||||
|
// TODO set status fail
|
||||||
|
goto FREE;
|
||||||
|
}
|
||||||
|
/*atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL);*/
|
||||||
|
FREE:
|
||||||
|
taosArrayDestroyEx(pBlock->blocks, (FDelete)blockDataFreeRes);
|
||||||
|
taosFreeQitem(pBlock);
|
||||||
|
#if 0
|
||||||
SRpcMsg dispatchMsg = {0};
|
SRpcMsg dispatchMsg = {0};
|
||||||
SEpSet* pEpSet = NULL;
|
SEpSet* pEpSet = NULL;
|
||||||
if (streamBuildDispatchMsg(pTask, pBlock, &dispatchMsg, &pEpSet) < 0) {
|
if (streamBuildDispatchMsg(pTask, pBlock, &dispatchMsg, &pEpSet) < 0) {
|
||||||
|
@ -325,5 +463,6 @@ int32_t streamDispatch(SStreamTask* pTask, SMsgCb* pMsgCb) {
|
||||||
taosFreeQitem(pBlock);
|
taosFreeQitem(pBlock);
|
||||||
|
|
||||||
tmsgSendReq(pEpSet, &dispatchMsg);
|
tmsgSendReq(pEpSet, &dispatchMsg);
|
||||||
return 0;
|
#endif
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
|
@ -83,7 +83,16 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) {
|
||||||
|
|
||||||
// set config
|
// set config
|
||||||
memcpy(&pWal->cfg, pCfg, sizeof(SWalCfg));
|
memcpy(&pWal->cfg, pCfg, sizeof(SWalCfg));
|
||||||
|
|
||||||
pWal->fsyncSeq = pCfg->fsyncPeriod / 1000;
|
pWal->fsyncSeq = pCfg->fsyncPeriod / 1000;
|
||||||
|
if (pWal->cfg.retentionSize > 0) {
|
||||||
|
pWal->cfg.retentionSize *= 1024;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pWal->cfg.segSize > 0) {
|
||||||
|
pWal->cfg.segSize *= 1024;
|
||||||
|
}
|
||||||
|
|
||||||
if (pWal->fsyncSeq <= 0) pWal->fsyncSeq = 1;
|
if (pWal->fsyncSeq <= 0) pWal->fsyncSeq = 1;
|
||||||
|
|
||||||
tstrncpy(pWal->path, path, sizeof(pWal->path));
|
tstrncpy(pWal->path, path, sizeof(pWal->path));
|
||||||
|
|
|
@ -401,6 +401,7 @@ static FORCE_INLINE int32_t walWriteImpl(SWal *pWal, int64_t index, tmsg_t msgTy
|
||||||
pWal->writeHead.head.version = index;
|
pWal->writeHead.head.version = index;
|
||||||
pWal->writeHead.head.bodyLen = bodyLen;
|
pWal->writeHead.head.bodyLen = bodyLen;
|
||||||
pWal->writeHead.head.msgType = msgType;
|
pWal->writeHead.head.msgType = msgType;
|
||||||
|
pWal->writeHead.head.ingestTs = taosGetTimestampMs();
|
||||||
|
|
||||||
// sync info for sync module
|
// sync info for sync module
|
||||||
pWal->writeHead.head.syncMeta = syncMeta;
|
pWal->writeHead.head.syncMeta = syncMeta;
|
||||||
|
@ -457,14 +458,14 @@ int64_t walAppendLog(SWal *pWal, tmsg_t msgType, SWalSyncInfo syncMeta, const vo
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pWal->pIdxFile == NULL || pWal->pIdxFile == NULL || pWal->writeCur < 0) {
|
if (pWal->pLogFile == NULL || pWal->pIdxFile == NULL || pWal->writeCur < 0) {
|
||||||
if (walInitWriteFile(pWal) < 0) {
|
if (walInitWriteFile(pWal) < 0) {
|
||||||
taosThreadMutexUnlock(&pWal->mutex);
|
taosThreadMutexUnlock(&pWal->mutex);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
ASSERT(pWal->pIdxFile != NULL && pWal->pLogFile != NULL && pWal->writeCur >= 0);
|
ASSERT(pWal->pLogFile != NULL && pWal->pIdxFile != NULL && pWal->writeCur >= 0);
|
||||||
|
|
||||||
if (walWriteImpl(pWal, index, msgType, syncMeta, body, bodyLen) < 0) {
|
if (walWriteImpl(pWal, index, msgType, syncMeta, body, bodyLen) < 0) {
|
||||||
taosThreadMutexUnlock(&pWal->mutex);
|
taosThreadMutexUnlock(&pWal->mutex);
|
||||||
|
|
Loading…
Reference in New Issue