Merge pull request #15454 from taosdata/feature/stream

fix(stream): dispatch
This commit is contained in:
Liu Jicong 2022-07-27 12:39:37 +08:00 committed by GitHub
commit 3167f158d1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 297 additions and 170 deletions

View File

@ -171,8 +171,8 @@ typedef struct {
} STaskDispatcherFixedEp; } STaskDispatcherFixedEp;
typedef struct { typedef struct {
// int8_t hashMethod;
char stbFullName[TSDB_TABLE_FNAME_LEN]; char stbFullName[TSDB_TABLE_FNAME_LEN];
int32_t waitingRspCnt;
SUseDbRsp dbInfo; SUseDbRsp dbInfo;
} STaskDispatcherShuffle; } STaskDispatcherShuffle;

View File

@ -114,18 +114,26 @@ int32_t mndAddSinkToTask(SMnode* pMnode, SStreamObj* pStream, SStreamTask* pTask
int32_t mndAddDispatcherToInnerTask(SMnode* pMnode, SStreamObj* pStream, SStreamTask* pTask) { int32_t mndAddDispatcherToInnerTask(SMnode* pMnode, SStreamObj* pStream, SStreamTask* pTask) {
pTask->sinkType = TASK_SINK__NONE; pTask->sinkType = TASK_SINK__NONE;
bool isShuffle = false;
if (pStream->fixedSinkVgId == 0) { if (pStream->fixedSinkVgId == 0) {
pTask->dispatchType = TASK_DISPATCH__SHUFFLE;
pTask->dispatchMsgType = TDMT_STREAM_TASK_DISPATCH;
SDbObj* pDb = mndAcquireDb(pMnode, pStream->targetDb); SDbObj* pDb = mndAcquireDb(pMnode, pStream->targetDb);
ASSERT(pDb); ASSERT(pDb);
if (pDb->cfg.numOfVgroups > 1) {
isShuffle = true;
pTask->dispatchType = TASK_DISPATCH__SHUFFLE;
pTask->dispatchMsgType = TDMT_STREAM_TASK_DISPATCH;
if (mndExtractDbInfo(pMnode, pDb, &pTask->shuffleDispatcher.dbInfo, NULL) < 0) { if (mndExtractDbInfo(pMnode, pDb, &pTask->shuffleDispatcher.dbInfo, NULL) < 0) {
ASSERT(0); ASSERT(0);
return -1; return -1;
} }
sdbRelease(pMnode->pSdb, pDb); }
sdbRelease(pMnode->pSdb, pDb);
}
if (isShuffle) {
memcpy(pTask->shuffleDispatcher.stbFullName, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN); memcpy(pTask->shuffleDispatcher.stbFullName, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN);
SArray* pVgs = pTask->shuffleDispatcher.dbInfo.pVgroupInfos; SArray* pVgs = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
int32_t sz = taosArrayGetSize(pVgs); int32_t sz = taosArrayGetSize(pVgs);

View File

@ -468,7 +468,6 @@ typedef struct SStreamScanInfo {
SSDataBlock* pUpdateDataRes; SSDataBlock* pUpdateDataRes;
// status for tmq // status for tmq
// SSchemaWrapper schema; // SSchemaWrapper schema;
STqOffset offset;
SNodeList* pGroupTags; SNodeList* pGroupTags;
SNode* pTagCond; SNode* pTagCond;
SNode* pTagIndexCond; SNode* pTagIndexCond;

View File

@ -14,12 +14,12 @@
*/ */
#include "executor.h" #include "executor.h"
#include "tref.h"
#include "executorimpl.h" #include "executorimpl.h"
#include "planner.h" #include "planner.h"
#include "tdatablock.h" #include "tdatablock.h"
#include "vnode.h" #include "tref.h"
#include "tudf.h" #include "tudf.h"
#include "vnode.h"
static TdThreadOnce initPoolOnce = PTHREAD_ONCE_INIT; static TdThreadOnce initPoolOnce = PTHREAD_ONCE_INIT;
int32_t exchangeObjRefPool = -1; int32_t exchangeObjRefPool = -1;
@ -95,16 +95,6 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
} }
} }
#if 0
int32_t qStreamScanSnapshot(qTaskInfo_t tinfo) {
if (tinfo == NULL) {
return TSDB_CODE_QRY_APP_ERROR;
}
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
return doSetStreamBlock(pTaskInfo->pRoot, NULL, 0, STREAM_INPUT__TABLE_SCAN, 0, NULL);
}
#endif
int32_t qSetStreamInput(qTaskInfo_t tinfo, const void* input, int32_t type, bool assignUid) { int32_t qSetStreamInput(qTaskInfo_t tinfo, const void* input, int32_t type, bool assignUid) {
return qSetMultiStreamInput(tinfo, input, 1, type, assignUid); return qSetMultiStreamInput(tinfo, input, 1, type, assignUid);
} }
@ -258,7 +248,7 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo
} }
// todo refactor STableList // todo refactor STableList
size_t bufLen = (pScanInfo->pGroupTags != NULL)? getTableTagsBufLen(pScanInfo->pGroupTags):0; size_t bufLen = (pScanInfo->pGroupTags != NULL) ? getTableTagsBufLen(pScanInfo->pGroupTags) : 0;
char* keyBuf = NULL; char* keyBuf = NULL;
if (bufLen > 0) { if (bufLen > 0) {
keyBuf = taosMemoryMalloc(bufLen); keyBuf = taosMemoryMalloc(bufLen);
@ -267,7 +257,7 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo
} }
} }
for(int32_t i = 0; i < taosArrayGetSize(qa); ++i) { for (int32_t i = 0; i < taosArrayGetSize(qa); ++i) {
uint64_t* uid = taosArrayGet(qa, i); uint64_t* uid = taosArrayGet(qa, i);
STableKeyInfo keyInfo = {.uid = *uid, .groupId = 0}; STableKeyInfo keyInfo = {.uid = *uid, .groupId = 0};
@ -352,7 +342,7 @@ int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId,
} }
} }
_error: _error:
// if failed to add ref for all tables in this query, abort current query // if failed to add ref for all tables in this query, abort current query
return code; return code;
} }
@ -576,11 +566,6 @@ const SSchemaWrapper* qExtractSchemaFromStreamScanner(void* scanner) {
return pInfo->tqReader->pSchemaWrapper; return pInfo->tqReader->pSchemaWrapper;
} }
const STqOffset* qExtractStatusFromStreamScanner(void* scanner) {
SStreamScanInfo* pInfo = scanner;
return &pInfo->offset;
}
void* qStreamExtractMetaMsg(qTaskInfo_t tinfo) { void* qStreamExtractMetaMsg(qTaskInfo_t tinfo) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE); ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE);
@ -603,7 +588,12 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, const STqOffsetVal* pOffset) {
while (1) { while (1) {
uint8_t type = pOperator->operatorType; uint8_t type = pOperator->operatorType;
pOperator->status = OP_OPENED; pOperator->status = OP_OPENED;
if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { // TODO add more check
if (type != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
ASSERT(pOperator->numOfDownstream == 1);
pOperator = pOperator->pDownstream[0];
}
SStreamScanInfo* pInfo = pOperator->info; SStreamScanInfo* pInfo = pOperator->info;
if (pOffset->type == TMQ_OFFSET__LOG) { if (pOffset->type == TMQ_OFFSET__LOG) {
STableScanInfo* pTSInfo = pInfo->pTableScanOp->info; STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
@ -684,35 +674,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, const STqOffsetVal* pOffset) {
ASSERT(0); ASSERT(0);
} }
return 0; return 0;
} else {
ASSERT(pOperator->numOfDownstream == 1);
pOperator = pOperator->pDownstream[0];
}
} }
} }
return 0; return 0;
} }
#if 0
int32_t qStreamPrepareTsdbScan(qTaskInfo_t tinfo, uint64_t uid, int64_t ts) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
if (uid == 0) {
if (taosArrayGetSize(pTaskInfo->tableqinfoList.pTableList) != 0) {
STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, 0);
uid = pTableInfo->uid;
ts = INT64_MIN;
}
}
return doPrepareScan(pTaskInfo->pRoot, uid, ts);
}
int32_t qGetStreamScanStatus(qTaskInfo_t tinfo, uint64_t* uid, int64_t* ts) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
return doGetScanStatus(pTaskInfo->pRoot, uid, ts);
}
#endif

View File

@ -1396,13 +1396,11 @@ SOperatorInfo* createRawScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNo
static void destroyStreamScanOperatorInfo(void* param, int32_t numOfOutput) { static void destroyStreamScanOperatorInfo(void* param, int32_t numOfOutput) {
SStreamScanInfo* pStreamScan = (SStreamScanInfo*)param; SStreamScanInfo* pStreamScan = (SStreamScanInfo*)param;
#if 1
if (pStreamScan->pTableScanOp && pStreamScan->pTableScanOp->info) { if (pStreamScan->pTableScanOp && pStreamScan->pTableScanOp->info) {
STableScanInfo* pTableScanInfo = pStreamScan->pTableScanOp->info; STableScanInfo* pTableScanInfo = pStreamScan->pTableScanOp->info;
destroyTableScanOperatorInfo(pTableScanInfo, numOfOutput); destroyTableScanOperatorInfo(pTableScanInfo, numOfOutput);
taosMemoryFreeClear(pStreamScan->pTableScanOp); taosMemoryFreeClear(pStreamScan->pTableScanOp);
} }
#endif
if (pStreamScan->tqReader) { if (pStreamScan->tqReader) {
tqCloseReader(pStreamScan->tqReader); tqCloseReader(pStreamScan->tqReader);
} }
@ -2855,7 +2853,8 @@ int32_t stopGroupTableMergeScan(SOperatorInfo* pOperator) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, SSDataBlock* pResBlock, int32_t capacity, SOperatorInfo* pOperator) { SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, SSDataBlock* pResBlock, int32_t capacity,
SOperatorInfo* pOperator) {
STableMergeScanInfo* pInfo = pOperator->info; STableMergeScanInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
@ -2874,7 +2873,6 @@ SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, SSDataBlock*
} }
} }
qDebug("%s get sorted row blocks, rows:%d", GET_TASKID(pTaskInfo), pResBlock->info.rows); qDebug("%s get sorted row blocks, rows:%d", GET_TASKID(pTaskInfo), pResBlock->info.rows);
return (pResBlock->info.rows > 0) ? pResBlock : NULL; return (pResBlock->info.rows > 0) ? pResBlock : NULL;
} }
@ -2905,7 +2903,8 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) {
} }
SSDataBlock* pBlock = NULL; SSDataBlock* pBlock = NULL;
while (pInfo->tableStartIndex < tableListSize) { while (pInfo->tableStartIndex < tableListSize) {
pBlock = getSortedTableMergeScanBlockData(pInfo->pSortHandle, pInfo->pResBlock, pOperator->resultInfo.capacity, pOperator); pBlock = getSortedTableMergeScanBlockData(pInfo->pSortHandle, pInfo->pResBlock, pOperator->resultInfo.capacity,
pOperator);
if (pBlock != NULL) { if (pBlock != NULL) {
pBlock->info.groupId = pInfo->groupId; pBlock->info.groupId = pInfo->groupId;
pOperator->resultInfo.totalRows += pBlock->info.rows; pOperator->resultInfo.totalRows += pBlock->info.rows;

View File

@ -775,7 +775,7 @@ int32_t binarySearch(void* keyList, int num, TSKEY key, int order, __get_value_f
int32_t comparePullWinKey(void* pKey, void* data, int32_t index) { int32_t comparePullWinKey(void* pKey, void* data, int32_t index) {
SArray* res = (SArray*)data; SArray* res = (SArray*)data;
SPullWindowInfo* pos = taosArrayGet(res, index); SPullWindowInfo* pos = taosArrayGet(res, index);
SPullWindowInfo* pData = (SPullWindowInfo*) pKey; SPullWindowInfo* pData = (SPullWindowInfo*)pKey;
if (pData->window.skey == pos->window.skey) { if (pData->window.skey == pos->window.skey) {
if (pData->groupId > pos->groupId) { if (pData->groupId > pos->groupId) {
return 1; return 1;
@ -810,7 +810,7 @@ static int32_t savePullWindow(SPullWindowInfo* pPullInfo, SArray* pPullWins) {
int32_t compareResKey(void* pKey, void* data, int32_t index) { int32_t compareResKey(void* pKey, void* data, int32_t index) {
SArray* res = (SArray*)data; SArray* res = (SArray*)data;
SResKeyPos* pos = taosArrayGetP(res, index); SResKeyPos* pos = taosArrayGetP(res, index);
SWinRes* pData = (SWinRes*) pKey; SWinRes* pData = (SWinRes*)pKey;
if (pData->ts == *(int64_t*)pos->key) { if (pData->ts == *(int64_t*)pos->key) {
if (pData->groupId > pos->groupId) { if (pData->groupId > pos->groupId) {
return 1; return 1;
@ -880,7 +880,7 @@ int64_t getWinReskey(void* data, int32_t index) {
int32_t compareWinRes(void* pKey, void* data, int32_t index) { int32_t compareWinRes(void* pKey, void* data, int32_t index) {
SArray* res = (SArray*)data; SArray* res = (SArray*)data;
SWinRes* pos = taosArrayGetP(res, index); SWinRes* pos = taosArrayGetP(res, index);
SResKeyPos* pData = (SResKeyPos*) pKey; SResKeyPos* pData = (SResKeyPos*)pKey;
if (*(int64_t*)pData->key == pos->ts) { if (*(int64_t*)pData->key == pos->ts) {
if (pData->groupId > pos->groupId) { if (pData->groupId > pos->groupId) {
return 1; return 1;
@ -1596,7 +1596,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
} }
if (pBlock->info.type == STREAM_NORMAL) { if (pBlock->info.type == STREAM_NORMAL) {
//set input version // set input version
pTaskInfo->version = pBlock->info.version; pTaskInfo->version = pBlock->info.version;
} }
@ -1644,7 +1644,7 @@ static void destroyStateWindowOperatorInfo(void* param, int32_t numOfOutput) {
} }
static void freeItem(void* param) { static void freeItem(void* param) {
SGroupKeys *pKey = (SGroupKeys*) param; SGroupKeys* pKey = (SGroupKeys*)param;
taosMemoryFree(pKey->pData); taosMemoryFree(pKey->pData);
} }
@ -2970,8 +2970,8 @@ SSDataBlock* createSpecialDataBlock(EStreamType type) {
pBlock->info.groupId = 0; pBlock->info.groupId = 0;
pBlock->info.rows = 0; pBlock->info.rows = 0;
pBlock->info.type = type; pBlock->info.type = type;
pBlock->info.rowSize = sizeof(TSKEY) + sizeof(TSKEY) + sizeof(uint64_t) + pBlock->info.rowSize =
sizeof(uint64_t) + sizeof(TSKEY) + sizeof(TSKEY); sizeof(TSKEY) + sizeof(TSKEY) + sizeof(uint64_t) + sizeof(uint64_t) + sizeof(TSKEY) + sizeof(TSKEY);
pBlock->pDataBlock = taosArrayInit(6, sizeof(SColumnInfoData)); pBlock->pDataBlock = taosArrayInit(6, sizeof(SColumnInfoData));
SColumnInfoData infoData = {0}; SColumnInfoData infoData = {0};

View File

@ -38,7 +38,7 @@ int32_t streamPipelineExec(SStreamTask* pTask, int32_t batchNum);
int32_t streamDispatch(SStreamTask* pTask, SMsgCb* pMsgCb); int32_t streamDispatch(SStreamTask* pTask, SMsgCb* pMsgCb);
int32_t streamDispatchReqToData(const SStreamDispatchReq* pReq, SStreamDataBlock* pData); int32_t streamDispatchReqToData(const SStreamDispatchReq* pReq, SStreamDataBlock* pData);
int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock* pData); int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock* pData);
int32_t streamBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* data, SRpcMsg* pMsg, SEpSet** ppEpSet); int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* data);
int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock); int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock);

View File

@ -219,6 +219,12 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp) {
qDebug("task %d receive dispatch rsp", pTask->taskId); qDebug("task %d receive dispatch rsp", pTask->taskId);
if (pTask->dispatchType == TASK_DISPATCH__SHUFFLE) {
int32_t leftRsp = atomic_sub_fetch_32(&pTask->shuffleDispatcher.waitingRspCnt, 1);
qDebug("task %d is shuffle, left waiting rsp %d", pTask->taskId, leftRsp);
if (leftRsp > 0) return 0;
}
int8_t old = atomic_exchange_8(&pTask->outputStatus, pRsp->inputStatus); int8_t old = atomic_exchange_8(&pTask->outputStatus, pRsp->inputStatus);
ASSERT(old == TASK_OUTPUT_STATUS__WAIT); ASSERT(old == TASK_OUTPUT_STATUS__WAIT);
if (pRsp->inputStatus == TASK_INPUT_STATUS__BLOCKED) { if (pRsp->inputStatus == TASK_INPUT_STATUS__BLOCKED) {

View File

@ -198,6 +198,158 @@ static int32_t streamAddBlockToDispatchMsg(const SSDataBlock* pBlock, SStreamDis
return 0; return 0;
} }
int32_t streamDispatchOneReq(SStreamTask* pTask, const SStreamDispatchReq* pReq, int32_t vgId, SEpSet* pEpSet) {
void* buf = NULL;
int32_t code = -1;
SRpcMsg msg = {0};
// serialize
int32_t tlen;
tEncodeSize(tEncodeStreamDispatchReq, pReq, tlen, code);
if (code < 0) goto FAIL;
code = -1;
buf = rpcMallocCont(sizeof(SMsgHead) + tlen);
if (buf == NULL) {
goto FAIL;
}
((SMsgHead*)buf)->vgId = htonl(vgId);
void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
SEncoder encoder;
tEncoderInit(&encoder, abuf, tlen);
if ((code = tEncodeStreamDispatchReq(&encoder, pReq)) < 0) {
goto FAIL;
}
tEncoderClear(&encoder);
msg.contLen = tlen + sizeof(SMsgHead);
msg.pCont = buf;
msg.msgType = pTask->dispatchMsgType;
tmsgSendReq(pEpSet, &msg);
code = 0;
FAIL:
if (code < 0 && buf) rpcFreeCont(buf);
return 0;
}
int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pData) {
int32_t code = -1;
int32_t blockNum = taosArrayGetSize(pData->blocks);
ASSERT(blockNum != 0);
if (pTask->dispatchType == TASK_DISPATCH__FIXED) {
SStreamDispatchReq req = {
.streamId = pTask->streamId,
.dataSrcVgId = pData->srcVgId,
.upstreamTaskId = pTask->taskId,
.upstreamChildId = pTask->selfChildId,
.upstreamNodeId = pTask->nodeId,
.blockNum = blockNum,
};
req.data = taosArrayInit(blockNum, sizeof(void*));
req.dataLen = taosArrayInit(blockNum, sizeof(int32_t));
if (req.data == NULL || req.dataLen == NULL) {
goto FAIL_FIXED_DISPATCH;
}
for (int32_t i = 0; i < blockNum; i++) {
SSDataBlock* pDataBlock = taosArrayGet(pData->blocks, i);
if (streamAddBlockToDispatchMsg(pDataBlock, &req) < 0) {
goto FAIL_FIXED_DISPATCH;
}
}
int32_t vgId = pTask->fixedEpDispatcher.nodeId;
SEpSet* pEpSet = &pTask->fixedEpDispatcher.epSet;
int32_t downstreamTaskId = pTask->fixedEpDispatcher.taskId;
req.taskId = downstreamTaskId;
qDebug("dispatch from task %d (child id %d) to down stream task %d in vnode %d", pTask->taskId, pTask->selfChildId,
downstreamTaskId, vgId);
if (streamDispatchOneReq(pTask, &req, vgId, pEpSet) < 0) {
goto FAIL_FIXED_DISPATCH;
}
code = 0;
FAIL_FIXED_DISPATCH:
taosArrayDestroy(req.data);
taosArrayDestroy(req.dataLen);
return code;
} else if (pTask->dispatchType == TASK_DISPATCH__SHUFFLE) {
SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
ASSERT(pTask->shuffleDispatcher.waitingRspCnt == 0);
int32_t vgSz = taosArrayGetSize(vgInfo);
SStreamDispatchReq* pReqs = taosMemoryCalloc(vgSz, sizeof(SStreamDispatchReq));
if (pReqs == NULL) {
return -1;
}
for (int32_t i = 0; i < vgSz; i++) {
pReqs[i].streamId = pTask->streamId;
pReqs[i].dataSrcVgId = pData->srcVgId;
pReqs[i].upstreamTaskId = pTask->taskId;
pReqs[i].upstreamChildId = pTask->selfChildId;
pReqs[i].upstreamNodeId = pTask->nodeId;
pReqs[i].blockNum = 0;
pReqs[i].data = taosArrayInit(0, sizeof(void*));
pReqs[i].dataLen = taosArrayInit(0, sizeof(int32_t));
if (pReqs[i].data == NULL || pReqs[i].dataLen == NULL) {
goto FAIL_SHUFFLE_DISPATCH;
}
}
for (int32_t i = 0; i < blockNum; i++) {
SSDataBlock* pDataBlock = taosArrayGet(pData->blocks, i);
char* ctbName = buildCtbNameByGroupId(pTask->shuffleDispatcher.stbFullName, pDataBlock->info.groupId);
// TODO: get hash function by hashMethod
uint32_t hashValue = MurmurHash3_32(ctbName, strlen(ctbName));
// TODO: optimize search
int32_t j;
for (j = 0; j < vgSz; j++) {
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, j);
ASSERT(pVgInfo->vgId > 0);
if (hashValue >= pVgInfo->hashBegin && hashValue <= pVgInfo->hashEnd) {
if (streamAddBlockToDispatchMsg(pDataBlock, &pReqs[j]) < 0) {
goto FAIL_SHUFFLE_DISPATCH;
}
pReqs[j].taskId = pVgInfo->taskId;
pReqs[j].blockNum++;
break;
}
}
}
for (int32_t i = 0; i < vgSz; i++) {
if (pReqs[i].blockNum > 0) {
// send
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
if (streamDispatchOneReq(pTask, &pReqs[i], pVgInfo->vgId, &pVgInfo->epSet) < 0) {
goto FAIL_SHUFFLE_DISPATCH;
}
pTask->shuffleDispatcher.waitingRspCnt++;
}
}
code = 0;
FAIL_SHUFFLE_DISPATCH:
if (pReqs) {
for (int32_t i = 0; i < vgSz; i++) {
taosArrayDestroy(pReqs[i].data);
taosArrayDestroy(pReqs[i].dataLen);
}
taosMemoryFree(pReqs);
}
return code;
} else {
ASSERT(0);
}
return 0;
}
int32_t streamBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* data, SRpcMsg* pMsg, SEpSet** ppEpSet) { int32_t streamBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* data, SRpcMsg* pMsg, SEpSet** ppEpSet) {
void* buf = NULL; void* buf = NULL;
int32_t code = -1; int32_t code = -1;
@ -262,29 +414,7 @@ int32_t streamBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* data,
qDebug("dispatch from task %d (child id %d) to down stream task %d in vnode %d", pTask->taskId, pTask->selfChildId, qDebug("dispatch from task %d (child id %d) to down stream task %d in vnode %d", pTask->taskId, pTask->selfChildId,
downstreamTaskId, vgId); downstreamTaskId, vgId);
// serialize streamDispatchOneReq(pTask, &req, vgId, *ppEpSet);
int32_t tlen;
tEncodeSize(tEncodeStreamDispatchReq, &req, tlen, code);
if (code < 0) goto FAIL;
code = -1;
buf = rpcMallocCont(sizeof(SMsgHead) + tlen);
if (buf == NULL) {
goto FAIL;
}
((SMsgHead*)buf)->vgId = htonl(vgId);
void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
SEncoder encoder;
tEncoderInit(&encoder, abuf, tlen);
if ((code = tEncodeStreamDispatchReq(&encoder, &req)) < 0) {
goto FAIL;
}
tEncoderClear(&encoder);
pMsg->contLen = tlen + sizeof(SMsgHead);
pMsg->pCont = buf;
pMsg->msgType = pTask->dispatchMsgType;
code = 0; code = 0;
FAIL: FAIL:
@ -314,6 +444,18 @@ int32_t streamDispatch(SStreamTask* pTask, SMsgCb* pMsgCb) {
qDebug("stream continue dispatching: task %d", pTask->taskId); qDebug("stream continue dispatching: task %d", pTask->taskId);
int32_t code = 0;
if (streamDispatchAllBlocks(pTask, pBlock) < 0) {
ASSERT(0);
code = -1;
// TODO set status fail
goto FREE;
}
/*atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL);*/
FREE:
taosArrayDestroyEx(pBlock->blocks, (FDelete)blockDataFreeRes);
taosFreeQitem(pBlock);
#if 0
SRpcMsg dispatchMsg = {0}; SRpcMsg dispatchMsg = {0};
SEpSet* pEpSet = NULL; SEpSet* pEpSet = NULL;
if (streamBuildDispatchMsg(pTask, pBlock, &dispatchMsg, &pEpSet) < 0) { if (streamBuildDispatchMsg(pTask, pBlock, &dispatchMsg, &pEpSet) < 0) {
@ -325,5 +467,6 @@ int32_t streamDispatch(SStreamTask* pTask, SMsgCb* pMsgCb) {
taosFreeQitem(pBlock); taosFreeQitem(pBlock);
tmsgSendReq(pEpSet, &dispatchMsg); tmsgSendReq(pEpSet, &dispatchMsg);
return 0; #endif
return code;
} }

View File

@ -83,7 +83,16 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) {
// set config // set config
memcpy(&pWal->cfg, pCfg, sizeof(SWalCfg)); memcpy(&pWal->cfg, pCfg, sizeof(SWalCfg));
pWal->fsyncSeq = pCfg->fsyncPeriod / 1000; pWal->fsyncSeq = pCfg->fsyncPeriod / 1000;
if (pWal->cfg.retentionSize > 0) {
pWal->cfg.retentionSize *= 1024;
}
if (pWal->cfg.segSize > 0) {
pWal->cfg.segSize *= 1024;
}
if (pWal->fsyncSeq <= 0) pWal->fsyncSeq = 1; if (pWal->fsyncSeq <= 0) pWal->fsyncSeq = 1;
tstrncpy(pWal->path, path, sizeof(pWal->path)); tstrncpy(pWal->path, path, sizeof(pWal->path));

View File

@ -401,6 +401,7 @@ static FORCE_INLINE int32_t walWriteImpl(SWal *pWal, int64_t index, tmsg_t msgTy
pWal->writeHead.head.version = index; pWal->writeHead.head.version = index;
pWal->writeHead.head.bodyLen = bodyLen; pWal->writeHead.head.bodyLen = bodyLen;
pWal->writeHead.head.msgType = msgType; pWal->writeHead.head.msgType = msgType;
pWal->writeHead.head.ingestTs = taosGetTimestampMs();
// sync info for sync module // sync info for sync module
pWal->writeHead.head.syncMeta = syncMeta; pWal->writeHead.head.syncMeta = syncMeta;
@ -457,14 +458,14 @@ int64_t walAppendLog(SWal *pWal, tmsg_t msgType, SWalSyncInfo syncMeta, const vo
return -1; return -1;
} }
if (pWal->pIdxFile == NULL || pWal->pIdxFile == NULL || pWal->writeCur < 0) { if (pWal->pLogFile == NULL || pWal->pIdxFile == NULL || pWal->writeCur < 0) {
if (walInitWriteFile(pWal) < 0) { if (walInitWriteFile(pWal) < 0) {
taosThreadMutexUnlock(&pWal->mutex); taosThreadMutexUnlock(&pWal->mutex);
return -1; return -1;
} }
} }
ASSERT(pWal->pIdxFile != NULL && pWal->pLogFile != NULL && pWal->writeCur >= 0); ASSERT(pWal->pLogFile != NULL && pWal->pIdxFile != NULL && pWal->writeCur >= 0);
if (walWriteImpl(pWal, index, msgType, syncMeta, body, bodyLen) < 0) { if (walWriteImpl(pWal, index, msgType, syncMeta, body, bodyLen) < 0) {
taosThreadMutexUnlock(&pWal->mutex); taosThreadMutexUnlock(&pWal->mutex);