Merge pull request #25510 from taosdata/fix/3_liaohj
fix(stream): handle disorder node Update trans Id.
This commit is contained in:
commit
c565e31638
|
@ -424,7 +424,7 @@ typedef struct STaskOutputInfo {
|
||||||
};
|
};
|
||||||
int8_t type;
|
int8_t type;
|
||||||
STokenBucket* pTokenBucket;
|
STokenBucket* pTokenBucket;
|
||||||
SArray* pDownstreamUpdateList;
|
SArray* pNodeEpsetUpdateList;
|
||||||
} STaskOutputInfo;
|
} STaskOutputInfo;
|
||||||
|
|
||||||
typedef struct SUpstreamInfo {
|
typedef struct SUpstreamInfo {
|
||||||
|
@ -445,6 +445,8 @@ typedef struct STaskCheckInfo {
|
||||||
int32_t notReadyTasks;
|
int32_t notReadyTasks;
|
||||||
int32_t inCheckProcess;
|
int32_t inCheckProcess;
|
||||||
int32_t stopCheckProcess;
|
int32_t stopCheckProcess;
|
||||||
|
int32_t notReadyRetryCount;
|
||||||
|
int32_t timeoutRetryCount;
|
||||||
tmr_h checkRspTmr;
|
tmr_h checkRspTmr;
|
||||||
TdThreadMutex checkInfoLock;
|
TdThreadMutex checkInfoLock;
|
||||||
} STaskCheckInfo;
|
} STaskCheckInfo;
|
||||||
|
@ -848,7 +850,7 @@ int32_t streamTaskSendCheckpointReq(SStreamTask* pTask);
|
||||||
int32_t streamTaskAddReqInfo(STaskCheckInfo* pInfo, int64_t reqId, int32_t taskId, const char* id);
|
int32_t streamTaskAddReqInfo(STaskCheckInfo* pInfo, int64_t reqId, int32_t taskId, const char* id);
|
||||||
int32_t streamTaskUpdateCheckInfo(STaskCheckInfo* pInfo, int32_t taskId, int32_t status, int64_t rspTs, int64_t reqId,
|
int32_t streamTaskUpdateCheckInfo(STaskCheckInfo* pInfo, int32_t taskId, int32_t status, int64_t rspTs, int64_t reqId,
|
||||||
int32_t* pNotReady, const char* id);
|
int32_t* pNotReady, const char* id);
|
||||||
void streamTaskCleanCheckInfo(STaskCheckInfo* pInfo);
|
void streamTaskCleanupCheckInfo(STaskCheckInfo* pInfo);
|
||||||
int32_t streamTaskStartMonitorCheckRsp(SStreamTask* pTask);
|
int32_t streamTaskStartMonitorCheckRsp(SStreamTask* pTask);
|
||||||
int32_t streamTaskStopMonitorCheckRsp(STaskCheckInfo* pInfo, const char* id);
|
int32_t streamTaskStopMonitorCheckRsp(STaskCheckInfo* pInfo, const char* id);
|
||||||
|
|
||||||
|
|
|
@ -827,7 +827,11 @@ TEST(clientCase, projection_query_tables) {
|
||||||
// }
|
// }
|
||||||
// taos_free_result(pRes);
|
// taos_free_result(pRes);
|
||||||
|
|
||||||
TAOS_RES* pRes = taos_query(pConn, "use cache_1");
|
TAOS_RES* pRes = taos_query(pConn, "alter local 'fqdn 127.0.0.1'");
|
||||||
|
if (taos_errno(pRes) != 0) {
|
||||||
|
printf("failed to exec query, %s\n", taos_errstr(pRes));
|
||||||
|
}
|
||||||
|
|
||||||
taos_free_result(pRes);
|
taos_free_result(pRes);
|
||||||
|
|
||||||
pRes = taos_query(pConn, "select last(ts), ts from cache_1.t1");
|
pRes = taos_query(pConn, "select last(ts), ts from cache_1.t1");
|
||||||
|
|
|
@ -271,7 +271,7 @@ int32_t tsTtlBatchDropNum = 10000; // number of tables dropped per batch
|
||||||
int32_t tsTransPullupInterval = 2;
|
int32_t tsTransPullupInterval = 2;
|
||||||
int32_t tsCompactPullupInterval = 10;
|
int32_t tsCompactPullupInterval = 10;
|
||||||
int32_t tsMqRebalanceInterval = 2;
|
int32_t tsMqRebalanceInterval = 2;
|
||||||
int32_t tsStreamCheckpointInterval = 60;
|
int32_t tsStreamCheckpointInterval = 300;
|
||||||
float tsSinkDataRate = 2.0;
|
float tsSinkDataRate = 2.0;
|
||||||
int32_t tsStreamNodeCheckInterval = 16;
|
int32_t tsStreamNodeCheckInterval = 16;
|
||||||
int32_t tsTtlUnit = 86400;
|
int32_t tsTtlUnit = 86400;
|
||||||
|
|
|
@ -195,13 +195,22 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM
|
||||||
const char* idstr = pTask->id.idStr;
|
const char* idstr = pTask->id.idStr;
|
||||||
|
|
||||||
if (pMeta->updateInfo.transId != req.transId) {
|
if (pMeta->updateInfo.transId != req.transId) {
|
||||||
ASSERT(req.transId > pMeta->updateInfo.transId);
|
if (req.transId < pMeta->updateInfo.transId) {
|
||||||
tqInfo("s-task:%s vgId:%d receive new trans to update nodeEp msg from mnode, transId:%d, prev transId:%d", idstr,
|
tqError("s-task:%s vgId:%d disorder update nodeEp msg recv, discarded, newest transId:%d, recv:%d", idstr, vgId,
|
||||||
vgId, req.transId, pMeta->updateInfo.transId);
|
pMeta->updateInfo.transId, req.transId);
|
||||||
|
rsp.code = TSDB_CODE_SUCCESS;
|
||||||
|
streamMetaWUnLock(pMeta);
|
||||||
|
|
||||||
// info needs to be kept till the new trans to update the nodeEp arrived.
|
taosArrayDestroy(req.pNodeList);
|
||||||
taosHashClear(pMeta->updateInfo.pTasks);
|
return rsp.code;
|
||||||
pMeta->updateInfo.transId = req.transId;
|
} else {
|
||||||
|
tqInfo("s-task:%s vgId:%d receive new trans to update nodeEp msg from mnode, transId:%d, prev transId:%d", idstr,
|
||||||
|
vgId, req.transId, pMeta->updateInfo.transId);
|
||||||
|
|
||||||
|
// info needs to be kept till the new trans to update the nodeEp arrived.
|
||||||
|
taosHashClear(pMeta->updateInfo.pTasks);
|
||||||
|
pMeta->updateInfo.transId = req.transId;
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
tqDebug("s-task:%s vgId:%d recv trans to update nodeEp from mnode, transId:%d", idstr, vgId, req.transId);
|
tqDebug("s-task:%s vgId:%d recv trans to update nodeEp from mnode, transId:%d", idstr, vgId, req.transId);
|
||||||
}
|
}
|
||||||
|
|
|
@ -1054,14 +1054,30 @@ static void copyNumericCols(const SColData* pData, SFileBlockDumpInfo* pDumpInfo
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static void blockInfoToRecord(SBrinRecord* record, SFileDataBlockInfo* pBlockInfo) {
|
static void blockInfoToRecord(SBrinRecord* record, SFileDataBlockInfo* pBlockInfo, SBlockLoadSuppInfo* pSupp) {
|
||||||
record->uid = pBlockInfo->uid;
|
record->uid = pBlockInfo->uid;
|
||||||
record->firstKey = (STsdbRowKey){
|
record->firstKey = (STsdbRowKey){.key = {.ts = pBlockInfo->firstKey, .numOfPKs = pSupp->numOfPks}};
|
||||||
.key = {.ts = pBlockInfo->firstKey, .numOfPKs = 0},
|
record->lastKey = (STsdbRowKey){.key = {.ts = pBlockInfo->lastKey, .numOfPKs = pSupp->numOfPks}};
|
||||||
};
|
|
||||||
record->lastKey = (STsdbRowKey){
|
if (pSupp->numOfPks > 0) {
|
||||||
.key = {.ts = pBlockInfo->lastKey, .numOfPKs = 0},
|
SValue* pFirst = &record->firstKey.key.pks[0];
|
||||||
};
|
SValue* pLast = &record->lastKey.key.pks[0];
|
||||||
|
|
||||||
|
pFirst->type = pSupp->pk.type;
|
||||||
|
pLast->type = pSupp->pk.type;
|
||||||
|
|
||||||
|
if (IS_VAR_DATA_TYPE(pFirst->type)) {
|
||||||
|
pFirst->pData = (uint8_t*) varDataVal(pBlockInfo->firstPk.pData);
|
||||||
|
pFirst->nData = varDataLen(pBlockInfo->firstPk.pData);
|
||||||
|
|
||||||
|
pLast->pData = (uint8_t*) varDataVal(pBlockInfo->lastPk.pData);
|
||||||
|
pLast->nData = varDataLen(pBlockInfo->lastPk.pData);
|
||||||
|
} else {
|
||||||
|
pFirst->val = pBlockInfo->firstPk.val;
|
||||||
|
pLast->val = pBlockInfo->lastPk.val;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
record->minVer = pBlockInfo->minVer;
|
record->minVer = pBlockInfo->minVer;
|
||||||
record->maxVer = pBlockInfo->maxVer;
|
record->maxVer = pBlockInfo->maxVer;
|
||||||
record->blockOffset = pBlockInfo->blockOffset;
|
record->blockOffset = pBlockInfo->blockOffset;
|
||||||
|
@ -1091,7 +1107,7 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, SRowKey* pLastPro
|
||||||
int32_t step = asc ? 1 : -1;
|
int32_t step = asc ? 1 : -1;
|
||||||
|
|
||||||
SBrinRecord tmp;
|
SBrinRecord tmp;
|
||||||
blockInfoToRecord(&tmp, pBlockInfo);
|
blockInfoToRecord(&tmp, pBlockInfo, pSupInfo);
|
||||||
SBrinRecord* pRecord = &tmp;
|
SBrinRecord* pRecord = &tmp;
|
||||||
|
|
||||||
// no data exists, return directly.
|
// no data exists, return directly.
|
||||||
|
@ -1290,7 +1306,7 @@ static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockI
|
||||||
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
|
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
|
||||||
|
|
||||||
SBrinRecord tmp;
|
SBrinRecord tmp;
|
||||||
blockInfoToRecord(&tmp, pBlockInfo);
|
blockInfoToRecord(&tmp, pBlockInfo, pSup);
|
||||||
SBrinRecord* pRecord = &tmp;
|
SBrinRecord* pRecord = &tmp;
|
||||||
code = tsdbDataFileReadBlockDataByColumn(pReader->pFileReader, pRecord, pBlockData, pSchema, &pSup->colId[1],
|
code = tsdbDataFileReadBlockDataByColumn(pReader->pFileReader, pRecord, pBlockData, pSchema, &pSup->colId[1],
|
||||||
pSup->numOfCols - 1);
|
pSup->numOfCols - 1);
|
||||||
|
@ -1327,7 +1343,7 @@ static int32_t dataBlockPartiallyRequired(STimeWindow* pWindow, SVersionRange* p
|
||||||
|
|
||||||
static bool getNeighborBlockOfTable(SDataBlockIter* pBlockIter, SFileDataBlockInfo* pBlockInfo,
|
static bool getNeighborBlockOfTable(SDataBlockIter* pBlockIter, SFileDataBlockInfo* pBlockInfo,
|
||||||
STableBlockScanInfo* pScanInfo, int32_t* nextIndex, int32_t order,
|
STableBlockScanInfo* pScanInfo, int32_t* nextIndex, int32_t order,
|
||||||
SBrinRecord* pRecord) {
|
SBrinRecord* pRecord, SBlockLoadSuppInfo* pSupInfo) {
|
||||||
bool asc = ASCENDING_TRAVERSE(order);
|
bool asc = ASCENDING_TRAVERSE(order);
|
||||||
int32_t step = asc ? 1 : -1;
|
int32_t step = asc ? 1 : -1;
|
||||||
|
|
||||||
|
@ -1341,7 +1357,7 @@ static bool getNeighborBlockOfTable(SDataBlockIter* pBlockIter, SFileDataBlockIn
|
||||||
|
|
||||||
STableDataBlockIdx* pTableDataBlockIdx = taosArrayGet(pScanInfo->pBlockIdxList, pBlockInfo->tbBlockIdx + step);
|
STableDataBlockIdx* pTableDataBlockIdx = taosArrayGet(pScanInfo->pBlockIdxList, pBlockInfo->tbBlockIdx + step);
|
||||||
SFileDataBlockInfo* p = taosArrayGet(pBlockIter->blockList, pTableDataBlockIdx->globalIndex);
|
SFileDataBlockInfo* p = taosArrayGet(pBlockIter->blockList, pTableDataBlockIdx->globalIndex);
|
||||||
blockInfoToRecord(pRecord, p);
|
blockInfoToRecord(pRecord, p, pSupInfo);
|
||||||
|
|
||||||
*nextIndex = pBlockInfo->tbBlockIdx + step;
|
*nextIndex = pBlockInfo->tbBlockIdx + step;
|
||||||
return true;
|
return true;
|
||||||
|
@ -1464,7 +1480,7 @@ static void getBlockToLoadInfo(SDataBlockToLoadInfo* pInfo, SFileDataBlockInfo*
|
||||||
SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
|
SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
|
||||||
|
|
||||||
bool hasNeighbor =
|
bool hasNeighbor =
|
||||||
getNeighborBlockOfTable(&pReader->status.blockIter, pBlockInfo, pScanInfo, &neighborIndex, order, &rec);
|
getNeighborBlockOfTable(&pReader->status.blockIter, pBlockInfo, pScanInfo, &neighborIndex, order, &rec, pSupInfo);
|
||||||
|
|
||||||
// overlap with neighbor
|
// overlap with neighbor
|
||||||
if (hasNeighbor) {
|
if (hasNeighbor) {
|
||||||
|
@ -1473,7 +1489,7 @@ static void getBlockToLoadInfo(SDataBlockToLoadInfo* pInfo, SFileDataBlockInfo*
|
||||||
}
|
}
|
||||||
|
|
||||||
SBrinRecord pRecord;
|
SBrinRecord pRecord;
|
||||||
blockInfoToRecord(&pRecord, pBlockInfo);
|
blockInfoToRecord(&pRecord, pBlockInfo, pSupInfo);
|
||||||
|
|
||||||
// has duplicated ts of different version in this block
|
// has duplicated ts of different version in this block
|
||||||
pInfo->hasDupTs = (pBlockInfo->numRow > pBlockInfo->count) || (pBlockInfo->count <= 0);
|
pInfo->hasDupTs = (pBlockInfo->numRow > pBlockInfo->count) || (pBlockInfo->count <= 0);
|
||||||
|
@ -2411,15 +2427,16 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI
|
||||||
|
|
||||||
static int32_t loadNeighborIfOverlap(SFileDataBlockInfo* pBlockInfo, STableBlockScanInfo* pBlockScanInfo,
|
static int32_t loadNeighborIfOverlap(SFileDataBlockInfo* pBlockInfo, STableBlockScanInfo* pBlockScanInfo,
|
||||||
STsdbReader* pReader, bool* loadNeighbor) {
|
STsdbReader* pReader, bool* loadNeighbor) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
int32_t order = pReader->info.order;
|
int32_t order = pReader->info.order;
|
||||||
SDataBlockIter* pIter = &pReader->status.blockIter;
|
SDataBlockIter* pIter = &pReader->status.blockIter;
|
||||||
int32_t step = ASCENDING_TRAVERSE(order) ? 1 : -1;
|
SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
|
||||||
int32_t nextIndex = -1;
|
int32_t step = ASCENDING_TRAVERSE(order) ? 1 : -1;
|
||||||
SBrinRecord rec = {0};
|
int32_t nextIndex = -1;
|
||||||
|
SBrinRecord rec = {0};
|
||||||
|
|
||||||
*loadNeighbor = false;
|
*loadNeighbor = false;
|
||||||
bool hasNeighbor = getNeighborBlockOfTable(pIter, pBlockInfo, pBlockScanInfo, &nextIndex, order, &rec);
|
bool hasNeighbor = getNeighborBlockOfTable(pIter, pBlockInfo, pBlockScanInfo, &nextIndex, order, &rec, pSupInfo);
|
||||||
if (!hasNeighbor) { // do nothing
|
if (!hasNeighbor) { // do nothing
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -4868,11 +4885,11 @@ int32_t tsdbRetrieveDatablockSMA2(STsdbReader* pReader, SSDataBlock* pDataBlock,
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(&pReader->status.blockIter);
|
SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter);
|
||||||
SBlockLoadSuppInfo* pSup = &pReader->suppInfo;
|
SBlockLoadSuppInfo* pSup = &pReader->suppInfo;
|
||||||
|
|
||||||
SSDataBlock* pResBlock = pReader->resBlockInfo.pResBlock;
|
SSDataBlock* pResBlock = pReader->resBlockInfo.pResBlock;
|
||||||
if (pResBlock->info.id.uid != pFBlock->uid) {
|
if (pResBlock->info.id.uid != pBlockInfo->uid) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -4880,10 +4897,10 @@ int32_t tsdbRetrieveDatablockSMA2(STsdbReader* pReader, SSDataBlock* pDataBlock,
|
||||||
TARRAY2_CLEAR(&pSup->colAggArray, 0);
|
TARRAY2_CLEAR(&pSup->colAggArray, 0);
|
||||||
|
|
||||||
SBrinRecord pRecord;
|
SBrinRecord pRecord;
|
||||||
blockInfoToRecord(&pRecord, pFBlock);
|
blockInfoToRecord(&pRecord, pBlockInfo, pSup);
|
||||||
code = tsdbDataFileReadBlockSma(pReader->pFileReader, &pRecord, &pSup->colAggArray);
|
code = tsdbDataFileReadBlockSma(pReader->pFileReader, &pRecord, &pSup->colAggArray);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
tsdbDebug("vgId:%d, failed to load block SMA for uid %" PRIu64 ", code:%s, %s", 0, pFBlock->uid, tstrerror(code),
|
tsdbDebug("vgId:%d, failed to load block SMA for uid %" PRIu64 ", code:%s, %s", 0, pBlockInfo->uid, tstrerror(code),
|
||||||
pReader->idStr);
|
pReader->idStr);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -4912,7 +4929,7 @@ int32_t tsdbRetrieveDatablockSMA2(STsdbReader* pReader, SSDataBlock* pDataBlock,
|
||||||
}
|
}
|
||||||
|
|
||||||
// do fill all null column value SMA info
|
// do fill all null column value SMA info
|
||||||
doFillNullColSMA(pSup, pFBlock->numRow, numOfCols, pTsAgg);
|
doFillNullColSMA(pSup, pBlockInfo->numRow, numOfCols, pTsAgg);
|
||||||
|
|
||||||
size_t size = pSup->colAggArray.size;
|
size_t size = pSup->colAggArray.size;
|
||||||
|
|
||||||
|
@ -4938,7 +4955,7 @@ int32_t tsdbRetrieveDatablockSMA2(STsdbReader* pReader, SSDataBlock* pDataBlock,
|
||||||
// double elapsedTime = (taosGetTimestampUs() - st) / 1000.0;
|
// double elapsedTime = (taosGetTimestampUs() - st) / 1000.0;
|
||||||
pReader->cost.smaLoadTime += 0; // elapsedTime;
|
pReader->cost.smaLoadTime += 0; // elapsedTime;
|
||||||
|
|
||||||
tsdbDebug("vgId:%d, succeed to load block SMA for uid %" PRIu64 ", %s", 0, pFBlock->uid, pReader->idStr);
|
tsdbDebug("vgId:%d, succeed to load block SMA for uid %" PRIu64 ", %s", 0, pBlockInfo->uid, pReader->idStr);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,665 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "cos.h"
|
||||||
|
#include "rsync.h"
|
||||||
|
#include "streamBackendRocksdb.h"
|
||||||
|
#include "streamInt.h"
|
||||||
|
|
||||||
|
#define CHECK_NOT_RSP_DURATION 10*1000 // 10 sec
|
||||||
|
|
||||||
|
static void processDownstreamReadyRsp(SStreamTask* pTask);
|
||||||
|
static void addIntoNodeUpdateList(SStreamTask* pTask, int32_t nodeId);
|
||||||
|
static void rspMonitorFn(void* param, void* tmrId);
|
||||||
|
static int32_t streamTaskInitTaskCheckInfo(STaskCheckInfo* pInfo, STaskOutputInfo* pOutputInfo, int64_t startTs);
|
||||||
|
static int32_t streamTaskStartCheckDownstream(STaskCheckInfo* pInfo, const char* id);
|
||||||
|
static int32_t streamTaskCompleteCheckRsp(STaskCheckInfo* pInfo, bool lock, const char* id);
|
||||||
|
static void doSendCheckMsg(SStreamTask* pTask, SDownstreamStatusInfo* p);
|
||||||
|
static void getCheckRspStatus(STaskCheckInfo* pInfo, int64_t el, int32_t* numOfReady, int32_t* numOfFault,
|
||||||
|
int32_t* numOfNotRsp, SArray* pTimeoutList, SArray* pNotReadyList, const char* id);
|
||||||
|
|
||||||
|
static SDownstreamStatusInfo* findCheckRspStatus(STaskCheckInfo* pInfo, int32_t taskId);
|
||||||
|
|
||||||
|
// check status
|
||||||
|
void streamTaskCheckDownstream(SStreamTask* pTask) {
|
||||||
|
SDataRange* pRange = &pTask->dataRange;
|
||||||
|
STimeWindow* pWindow = &pRange->window;
|
||||||
|
|
||||||
|
SStreamTaskCheckReq req = {
|
||||||
|
.streamId = pTask->id.streamId,
|
||||||
|
.upstreamTaskId = pTask->id.taskId,
|
||||||
|
.upstreamNodeId = pTask->info.nodeId,
|
||||||
|
.childId = pTask->info.selfChildId,
|
||||||
|
.stage = pTask->pMeta->stage,
|
||||||
|
};
|
||||||
|
|
||||||
|
ASSERT(pTask->status.downstreamReady == 0);
|
||||||
|
|
||||||
|
// serialize streamProcessScanHistoryFinishRsp
|
||||||
|
if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
|
||||||
|
streamTaskStartMonitorCheckRsp(pTask);
|
||||||
|
|
||||||
|
req.reqId = tGenIdPI64();
|
||||||
|
req.downstreamNodeId = pTask->outputInfo.fixedDispatcher.nodeId;
|
||||||
|
req.downstreamTaskId = pTask->outputInfo.fixedDispatcher.taskId;
|
||||||
|
|
||||||
|
streamTaskAddReqInfo(&pTask->taskCheckInfo, req.reqId, req.downstreamTaskId, pTask->id.idStr);
|
||||||
|
|
||||||
|
stDebug("s-task:%s (vgId:%d) stage:%" PRId64 " check single downstream task:0x%x(vgId:%d) ver:%" PRId64 "-%" PRId64
|
||||||
|
" window:%" PRId64 "-%" PRId64 " reqId:0x%" PRIx64,
|
||||||
|
pTask->id.idStr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId,
|
||||||
|
pRange->range.minVer, pRange->range.maxVer, pWindow->skey, pWindow->ekey, req.reqId);
|
||||||
|
|
||||||
|
streamSendCheckMsg(pTask, &req, pTask->outputInfo.fixedDispatcher.nodeId, &pTask->outputInfo.fixedDispatcher.epSet);
|
||||||
|
|
||||||
|
} else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
||||||
|
streamTaskStartMonitorCheckRsp(pTask);
|
||||||
|
|
||||||
|
SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos;
|
||||||
|
|
||||||
|
int32_t numOfVgs = taosArrayGetSize(vgInfo);
|
||||||
|
stDebug("s-task:%s check %d downstream tasks, ver:%" PRId64 "-%" PRId64 " window:%" PRId64 "-%" PRId64,
|
||||||
|
pTask->id.idStr, numOfVgs, pRange->range.minVer, pRange->range.maxVer, pWindow->skey, pWindow->ekey);
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < numOfVgs; i++) {
|
||||||
|
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
|
||||||
|
req.reqId = tGenIdPI64();
|
||||||
|
req.downstreamNodeId = pVgInfo->vgId;
|
||||||
|
req.downstreamTaskId = pVgInfo->taskId;
|
||||||
|
|
||||||
|
streamTaskAddReqInfo(&pTask->taskCheckInfo, req.reqId, req.downstreamTaskId, pTask->id.idStr);
|
||||||
|
|
||||||
|
stDebug("s-task:%s (vgId:%d) stage:%" PRId64
|
||||||
|
" check downstream task:0x%x (vgId:%d) (shuffle), idx:%d, reqId:0x%" PRIx64,
|
||||||
|
pTask->id.idStr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, i, req.reqId);
|
||||||
|
streamSendCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
|
||||||
|
}
|
||||||
|
} else { // for sink task, set it ready directly.
|
||||||
|
stDebug("s-task:%s (vgId:%d) set downstream ready, since no downstream", pTask->id.idStr, pTask->info.nodeId);
|
||||||
|
streamTaskStopMonitorCheckRsp(&pTask->taskCheckInfo, pTask->id.idStr);
|
||||||
|
processDownstreamReadyRsp(pTask);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp) {
|
||||||
|
ASSERT(pTask->id.taskId == pRsp->upstreamTaskId);
|
||||||
|
|
||||||
|
int64_t now = taosGetTimestampMs();
|
||||||
|
const char* id = pTask->id.idStr;
|
||||||
|
STaskCheckInfo* pInfo = &pTask->taskCheckInfo;
|
||||||
|
int32_t total = streamTaskGetNumOfDownstream(pTask);
|
||||||
|
int32_t left = -1;
|
||||||
|
|
||||||
|
if (streamTaskShouldStop(pTask)) {
|
||||||
|
stDebug("s-task:%s should stop, do not do check downstream again", id);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pRsp->status == TASK_DOWNSTREAM_READY) {
|
||||||
|
int32_t code = streamTaskUpdateCheckInfo(pInfo, pRsp->downstreamTaskId, pRsp->status, now, pRsp->reqId, &left, id);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (left == 0) {
|
||||||
|
processDownstreamReadyRsp(pTask); // all downstream tasks are ready, set the complete check downstream flag
|
||||||
|
streamTaskStopMonitorCheckRsp(pInfo, id);
|
||||||
|
} else {
|
||||||
|
stDebug("s-task:%s (vgId:%d) recv check rsp from task:0x%x (vgId:%d) status:%d, total:%d not ready:%d", id,
|
||||||
|
pRsp->upstreamNodeId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->status, total, left);
|
||||||
|
}
|
||||||
|
} else { // not ready, wait for 100ms and retry
|
||||||
|
int32_t code = streamTaskUpdateCheckInfo(pInfo, pRsp->downstreamTaskId, pRsp->status, now, pRsp->reqId, &left, id);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
return TSDB_CODE_SUCCESS; // return success in any cases.
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pRsp->status == TASK_UPSTREAM_NEW_STAGE || pRsp->status == TASK_DOWNSTREAM_NOT_LEADER) {
|
||||||
|
if (pRsp->status == TASK_UPSTREAM_NEW_STAGE) {
|
||||||
|
stError("s-task:%s vgId:%d self vnode-transfer/leader-change/restart detected, old stage:%" PRId64
|
||||||
|
", current stage:%" PRId64 ", not check wait for downstream task nodeUpdate, and all tasks restart",
|
||||||
|
id, pRsp->upstreamNodeId, pRsp->oldStage, pTask->pMeta->stage);
|
||||||
|
addIntoNodeUpdateList(pTask, pRsp->upstreamNodeId);
|
||||||
|
} else {
|
||||||
|
stError(
|
||||||
|
"s-task:%s downstream taskId:0x%x (vgId:%d) not leader, self dispatch epset needs to be updated, not check "
|
||||||
|
"downstream again, nodeUpdate needed",
|
||||||
|
id, pRsp->downstreamTaskId, pRsp->downstreamNodeId);
|
||||||
|
addIntoNodeUpdateList(pTask, pRsp->downstreamNodeId);
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t startTs = pTask->execInfo.checkTs;
|
||||||
|
streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, startTs, now, false);
|
||||||
|
|
||||||
|
// automatically set the related fill-history task to be failed.
|
||||||
|
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
|
||||||
|
STaskId* pId = &pTask->hTaskInfo.id;
|
||||||
|
streamMetaAddTaskLaunchResult(pTask->pMeta, pId->streamId, pId->taskId, startTs, now, false);
|
||||||
|
}
|
||||||
|
} else { // TASK_DOWNSTREAM_NOT_READY, let's retry in 100ms
|
||||||
|
ASSERT(left > 0);
|
||||||
|
stDebug("s-task:%s (vgId:%d) recv check rsp from task:0x%x (vgId:%d) status:%d, total:%d not ready:%d", id,
|
||||||
|
pRsp->upstreamNodeId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->status, total, left);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t streamTaskAddReqInfo(STaskCheckInfo* pInfo, int64_t reqId, int32_t taskId, const char* id) {
|
||||||
|
SDownstreamStatusInfo info = {.taskId = taskId, .status = -1, .reqId = reqId, .rspTs = 0};
|
||||||
|
|
||||||
|
taosThreadMutexLock(&pInfo->checkInfoLock);
|
||||||
|
|
||||||
|
SDownstreamStatusInfo* p = findCheckRspStatus(pInfo, taskId);
|
||||||
|
if (p != NULL) {
|
||||||
|
stDebug("s-task:%s check info to task:0x%x already sent", id, taskId);
|
||||||
|
taosThreadMutexUnlock(&pInfo->checkInfoLock);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
taosArrayPush(pInfo->pList, &info);
|
||||||
|
|
||||||
|
taosThreadMutexUnlock(&pInfo->checkInfoLock);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t streamTaskStartMonitorCheckRsp(SStreamTask* pTask) {
|
||||||
|
STaskCheckInfo* pInfo = &pTask->taskCheckInfo;
|
||||||
|
|
||||||
|
taosThreadMutexLock(&pInfo->checkInfoLock);
|
||||||
|
int32_t code = streamTaskStartCheckDownstream(pInfo, pTask->id.idStr);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
|
||||||
|
taosThreadMutexUnlock(&pInfo->checkInfoLock);
|
||||||
|
return TSDB_CODE_FAILED;
|
||||||
|
}
|
||||||
|
|
||||||
|
streamTaskInitTaskCheckInfo(pInfo, &pTask->outputInfo, taosGetTimestampMs());
|
||||||
|
|
||||||
|
int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
|
||||||
|
stDebug("s-task:%s start check rsp monit, ref:%d ", pTask->id.idStr, ref);
|
||||||
|
|
||||||
|
if (pInfo->checkRspTmr == NULL) {
|
||||||
|
pInfo->checkRspTmr = taosTmrStart(rspMonitorFn, CHECK_RSP_INTERVAL, pTask, streamTimer);
|
||||||
|
} else {
|
||||||
|
taosTmrReset(rspMonitorFn, CHECK_RSP_INTERVAL, pTask, streamTimer, &pInfo->checkRspTmr);
|
||||||
|
}
|
||||||
|
|
||||||
|
taosThreadMutexUnlock(&pInfo->checkInfoLock);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t streamTaskStopMonitorCheckRsp(STaskCheckInfo* pInfo, const char* id) {
|
||||||
|
taosThreadMutexLock(&pInfo->checkInfoLock);
|
||||||
|
streamTaskCompleteCheckRsp(pInfo, false, id);
|
||||||
|
|
||||||
|
pInfo->stopCheckProcess = 1;
|
||||||
|
taosThreadMutexUnlock(&pInfo->checkInfoLock);
|
||||||
|
|
||||||
|
stDebug("s-task:%s set stop check rsp mon", id);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
void streamTaskCleanupCheckInfo(STaskCheckInfo* pInfo) {
|
||||||
|
ASSERT(pInfo->inCheckProcess == 0);
|
||||||
|
|
||||||
|
pInfo->pList = taosArrayDestroy(pInfo->pList);
|
||||||
|
if (pInfo->checkRspTmr != NULL) {
|
||||||
|
/*bool ret = */ taosTmrStop(pInfo->checkRspTmr);
|
||||||
|
pInfo->checkRspTmr = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
taosThreadMutexDestroy(&pInfo->checkInfoLock);
|
||||||
|
}
|
||||||
|
|
||||||
|
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
void processDownstreamReadyRsp(SStreamTask* pTask) {
|
||||||
|
EStreamTaskEvent event = (pTask->info.fillHistory == 0) ? TASK_EVENT_INIT : TASK_EVENT_INIT_SCANHIST;
|
||||||
|
streamTaskOnHandleEventSuccess(pTask->status.pSM, event, NULL, NULL);
|
||||||
|
|
||||||
|
int64_t checkTs = pTask->execInfo.checkTs;
|
||||||
|
int64_t readyTs = pTask->execInfo.readyTs;
|
||||||
|
streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, checkTs, readyTs, true);
|
||||||
|
|
||||||
|
if (pTask->status.taskStatus == TASK_STATUS__HALT) {
|
||||||
|
ASSERT(HAS_RELATED_FILLHISTORY_TASK(pTask) && (pTask->info.fillHistory == 0));
|
||||||
|
|
||||||
|
// halt it self for count window stream task until the related fill history task completed.
|
||||||
|
stDebug("s-task:%s level:%d initial status is %s from mnode, set it to be halt", pTask->id.idStr,
|
||||||
|
pTask->info.taskLevel, streamTaskGetStatusStr(pTask->status.taskStatus));
|
||||||
|
streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_HALT);
|
||||||
|
}
|
||||||
|
|
||||||
|
// start the related fill-history task, when current task is ready
|
||||||
|
// not invoke in success callback due to the deadlock.
|
||||||
|
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
|
||||||
|
stDebug("s-task:%s try to launch related fill-history task", pTask->id.idStr);
|
||||||
|
streamLaunchFillHistoryTask(pTask);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void addIntoNodeUpdateList(SStreamTask* pTask, int32_t nodeId) {
|
||||||
|
int32_t vgId = pTask->pMeta->vgId;
|
||||||
|
|
||||||
|
taosThreadMutexLock(&pTask->lock);
|
||||||
|
int32_t num = taosArrayGetSize(pTask->outputInfo.pNodeEpsetUpdateList);
|
||||||
|
bool existed = false;
|
||||||
|
for (int i = 0; i < num; ++i) {
|
||||||
|
SDownstreamTaskEpset* p = taosArrayGet(pTask->outputInfo.pNodeEpsetUpdateList, i);
|
||||||
|
if (p->nodeId == nodeId) {
|
||||||
|
existed = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!existed) {
|
||||||
|
SDownstreamTaskEpset t = {.nodeId = nodeId};
|
||||||
|
taosArrayPush(pTask->outputInfo.pNodeEpsetUpdateList, &t);
|
||||||
|
|
||||||
|
stInfo("s-task:%s vgId:%d downstream nodeId:%d needs to be updated, total needs updated:%d", pTask->id.idStr, vgId,
|
||||||
|
t.nodeId, (num + 1));
|
||||||
|
}
|
||||||
|
|
||||||
|
taosThreadMutexUnlock(&pTask->lock);
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t streamTaskInitTaskCheckInfo(STaskCheckInfo* pInfo, STaskOutputInfo* pOutputInfo, int64_t startTs) {
|
||||||
|
taosArrayClear(pInfo->pList);
|
||||||
|
|
||||||
|
if (pOutputInfo->type == TASK_OUTPUT__FIXED_DISPATCH) {
|
||||||
|
pInfo->notReadyTasks = 1;
|
||||||
|
} else if (pOutputInfo->type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
||||||
|
pInfo->notReadyTasks = taosArrayGetSize(pOutputInfo->shuffleDispatcher.dbInfo.pVgroupInfos);
|
||||||
|
ASSERT(pInfo->notReadyTasks == pOutputInfo->shuffleDispatcher.dbInfo.vgNum);
|
||||||
|
}
|
||||||
|
|
||||||
|
pInfo->startTs = startTs;
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
SDownstreamStatusInfo* findCheckRspStatus(STaskCheckInfo* pInfo, int32_t taskId) {
|
||||||
|
for (int32_t j = 0; j < taosArrayGetSize(pInfo->pList); ++j) {
|
||||||
|
SDownstreamStatusInfo* p = taosArrayGet(pInfo->pList, j);
|
||||||
|
if (p->taskId == taskId) {
|
||||||
|
return p;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t streamTaskUpdateCheckInfo(STaskCheckInfo* pInfo, int32_t taskId, int32_t status, int64_t rspTs, int64_t reqId,
|
||||||
|
int32_t* pNotReady, const char* id) {
|
||||||
|
taosThreadMutexLock(&pInfo->checkInfoLock);
|
||||||
|
|
||||||
|
SDownstreamStatusInfo* p = findCheckRspStatus(pInfo, taskId);
|
||||||
|
if (p != NULL) {
|
||||||
|
|
||||||
|
if (reqId != p->reqId) {
|
||||||
|
stError("s-task:%s reqId:%" PRIx64 " expected:%" PRIx64
|
||||||
|
" expired check-rsp recv from downstream task:0x%x, discarded",
|
||||||
|
id, reqId, p->reqId, taskId);
|
||||||
|
taosThreadMutexUnlock(&pInfo->checkInfoLock);
|
||||||
|
return TSDB_CODE_FAILED;
|
||||||
|
}
|
||||||
|
|
||||||
|
// subtract one not-ready-task, since it is ready now
|
||||||
|
if ((p->status != TASK_DOWNSTREAM_READY) && (status == TASK_DOWNSTREAM_READY)) {
|
||||||
|
*pNotReady = atomic_sub_fetch_32(&pInfo->notReadyTasks, 1);
|
||||||
|
} else {
|
||||||
|
*pNotReady = pInfo->notReadyTasks;
|
||||||
|
}
|
||||||
|
|
||||||
|
p->status = status;
|
||||||
|
p->rspTs = rspTs;
|
||||||
|
|
||||||
|
taosThreadMutexUnlock(&pInfo->checkInfoLock);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
taosThreadMutexUnlock(&pInfo->checkInfoLock);
|
||||||
|
stError("s-task:%s unexpected check rsp msg, invalid downstream task:0x%x, reqId:%" PRIx64 " discarded", id, taskId,
|
||||||
|
reqId);
|
||||||
|
return TSDB_CODE_FAILED;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t streamTaskStartCheckDownstream(STaskCheckInfo* pInfo, const char* id) {
|
||||||
|
if (pInfo->inCheckProcess == 0) {
|
||||||
|
pInfo->inCheckProcess = 1;
|
||||||
|
} else {
|
||||||
|
ASSERT(pInfo->startTs > 0);
|
||||||
|
stError("s-task:%s already in check procedure, checkTs:%"PRId64", start monitor check rsp failed", id, pInfo->startTs);
|
||||||
|
return TSDB_CODE_FAILED;
|
||||||
|
}
|
||||||
|
|
||||||
|
stDebug("s-task:%s set the in-check-procedure flag", id);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t streamTaskCompleteCheckRsp(STaskCheckInfo* pInfo, bool lock, const char* id) {
|
||||||
|
if (lock) {
|
||||||
|
taosThreadMutexLock(&pInfo->checkInfoLock);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!pInfo->inCheckProcess) {
|
||||||
|
stWarn("s-task:%s already not in-check-procedure", id);
|
||||||
|
}
|
||||||
|
|
||||||
|
int64_t el = (pInfo->startTs != 0) ? (taosGetTimestampMs() - pInfo->startTs) : 0;
|
||||||
|
stDebug("s-task:%s clear the in-check-procedure flag, not in-check-procedure elapsed time:%" PRId64 " ms", id, el);
|
||||||
|
|
||||||
|
pInfo->startTs = 0;
|
||||||
|
pInfo->notReadyTasks = 0;
|
||||||
|
pInfo->inCheckProcess = 0;
|
||||||
|
pInfo->stopCheckProcess = 0;
|
||||||
|
|
||||||
|
pInfo->notReadyRetryCount = 0;
|
||||||
|
pInfo->timeoutRetryCount = 0;
|
||||||
|
|
||||||
|
taosArrayClear(pInfo->pList);
|
||||||
|
|
||||||
|
if (lock) {
|
||||||
|
taosThreadMutexUnlock(&pInfo->checkInfoLock);
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
void doSendCheckMsg(SStreamTask* pTask, SDownstreamStatusInfo* p) {
|
||||||
|
SStreamTaskCheckReq req = {
|
||||||
|
.streamId = pTask->id.streamId,
|
||||||
|
.upstreamTaskId = pTask->id.taskId,
|
||||||
|
.upstreamNodeId = pTask->info.nodeId,
|
||||||
|
.childId = pTask->info.selfChildId,
|
||||||
|
.stage = pTask->pMeta->stage,
|
||||||
|
};
|
||||||
|
|
||||||
|
STaskOutputInfo* pOutputInfo = &pTask->outputInfo;
|
||||||
|
if (pOutputInfo->type == TASK_OUTPUT__FIXED_DISPATCH) {
|
||||||
|
req.reqId = p->reqId;
|
||||||
|
req.downstreamNodeId = pOutputInfo->fixedDispatcher.nodeId;
|
||||||
|
req.downstreamTaskId = pOutputInfo->fixedDispatcher.taskId;
|
||||||
|
stDebug("s-task:%s (vgId:%d) stage:%" PRId64 " re-send check downstream task:0x%x(vgId:%d) reqId:0x%" PRIx64,
|
||||||
|
pTask->id.idStr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, req.reqId);
|
||||||
|
|
||||||
|
streamSendCheckMsg(pTask, &req, pOutputInfo->fixedDispatcher.nodeId, &pOutputInfo->fixedDispatcher.epSet);
|
||||||
|
} else if (pOutputInfo->type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
||||||
|
SArray* vgInfo = pOutputInfo->shuffleDispatcher.dbInfo.pVgroupInfos;
|
||||||
|
int32_t numOfVgs = taosArrayGetSize(vgInfo);
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < numOfVgs; i++) {
|
||||||
|
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
|
||||||
|
|
||||||
|
if (p->taskId == pVgInfo->taskId) {
|
||||||
|
req.reqId = p->reqId;
|
||||||
|
req.downstreamNodeId = pVgInfo->vgId;
|
||||||
|
req.downstreamTaskId = pVgInfo->taskId;
|
||||||
|
|
||||||
|
stDebug("s-task:%s (vgId:%d) stage:%" PRId64
|
||||||
|
" re-send check downstream task:0x%x(vgId:%d) (shuffle), idx:%d reqId:0x%" PRIx64,
|
||||||
|
pTask->id.idStr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, i,
|
||||||
|
p->reqId);
|
||||||
|
streamSendCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
ASSERT(0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void getCheckRspStatus(STaskCheckInfo* pInfo, int64_t el, int32_t* numOfReady, int32_t* numOfFault,
|
||||||
|
int32_t* numOfNotRsp, SArray* pTimeoutList, SArray* pNotReadyList, const char* id) {
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < taosArrayGetSize(pInfo->pList); ++i) {
|
||||||
|
SDownstreamStatusInfo* p = taosArrayGet(pInfo->pList, i);
|
||||||
|
if (p->status == TASK_DOWNSTREAM_READY) {
|
||||||
|
(*numOfReady) += 1;
|
||||||
|
} else if (p->status == TASK_UPSTREAM_NEW_STAGE || p->status == TASK_DOWNSTREAM_NOT_LEADER) {
|
||||||
|
stDebug("s-task:%s recv status:NEW_STAGE/NOT_LEADER from downstream, task:0x%x, quit from check downstream", id,
|
||||||
|
p->taskId);
|
||||||
|
(*numOfFault) += 1;
|
||||||
|
} else { // TASK_DOWNSTREAM_NOT_READY
|
||||||
|
if (p->rspTs == 0) { // not response yet
|
||||||
|
ASSERT(p->status == -1);
|
||||||
|
if (el >= CHECK_NOT_RSP_DURATION) { // not receive info for 10 sec.
|
||||||
|
taosArrayPush(pTimeoutList, &p->taskId);
|
||||||
|
} else { // el < CHECK_NOT_RSP_DURATION
|
||||||
|
(*numOfNotRsp) += 1; // do nothing and continue waiting for their rsp
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
taosArrayPush(pNotReadyList, &p->taskId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void rspMonitorFn(void* param, void* tmrId) {
|
||||||
|
SStreamTask* pTask = param;
|
||||||
|
SStreamTaskState* pStat = streamTaskGetStatus(pTask);
|
||||||
|
STaskCheckInfo* pInfo = &pTask->taskCheckInfo;
|
||||||
|
int32_t vgId = pTask->pMeta->vgId;
|
||||||
|
int64_t now = taosGetTimestampMs();
|
||||||
|
int64_t el = now - pInfo->startTs;
|
||||||
|
ETaskStatus state = pStat->state;
|
||||||
|
const char* id = pTask->id.idStr;
|
||||||
|
int32_t numOfReady = 0;
|
||||||
|
int32_t numOfFault = 0;
|
||||||
|
int32_t numOfNotRsp = 0;
|
||||||
|
int32_t numOfNotReady = 0;
|
||||||
|
int32_t numOfTimeout = 0;
|
||||||
|
|
||||||
|
stDebug("s-task:%s start to do check-downstream-rsp check in tmr", id);
|
||||||
|
|
||||||
|
if (state == TASK_STATUS__STOP) {
|
||||||
|
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
|
||||||
|
stDebug("s-task:%s status:%s vgId:%d quit from monitor check-rsp tmr, ref:%d", id, pStat->name, vgId, ref);
|
||||||
|
|
||||||
|
streamTaskCompleteCheckRsp(pInfo, true, id);
|
||||||
|
|
||||||
|
streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, pInfo->startTs, now, false);
|
||||||
|
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
|
||||||
|
STaskId* pHId = &pTask->hTaskInfo.id;
|
||||||
|
streamMetaAddTaskLaunchResult(pTask->pMeta, pHId->streamId, pHId->taskId, pInfo->startTs, now, false);
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (state == TASK_STATUS__DROPPING || state == TASK_STATUS__READY || state == TASK_STATUS__PAUSE) {
|
||||||
|
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
|
||||||
|
stDebug("s-task:%s status:%s vgId:%d quit from monitor check-rsp tmr, ref:%d", id, pStat->name, vgId, ref);
|
||||||
|
|
||||||
|
streamTaskCompleteCheckRsp(pInfo, true, id);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
taosThreadMutexLock(&pInfo->checkInfoLock);
|
||||||
|
if (pInfo->notReadyTasks == 0) {
|
||||||
|
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
|
||||||
|
stDebug("s-task:%s status:%s vgId:%d all downstream ready, quit from monitor rsp tmr, ref:%d", id, pStat->name,
|
||||||
|
vgId, ref);
|
||||||
|
|
||||||
|
streamTaskCompleteCheckRsp(pInfo, false, id);
|
||||||
|
taosThreadMutexUnlock(&pInfo->checkInfoLock);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
SArray* pNotReadyList = taosArrayInit(4, sizeof(int64_t));
|
||||||
|
SArray* pTimeoutList = taosArrayInit(4, sizeof(int64_t));
|
||||||
|
|
||||||
|
if (pStat->state == TASK_STATUS__UNINIT) {
|
||||||
|
getCheckRspStatus(pInfo, el, &numOfReady, &numOfFault, &numOfNotRsp, pTimeoutList, pNotReadyList, id);
|
||||||
|
} else { // unexpected status
|
||||||
|
stError("s-task:%s unexpected task status:%s during waiting for check rsp", id, pStat->name);
|
||||||
|
}
|
||||||
|
|
||||||
|
numOfNotReady = (int32_t)taosArrayGetSize(pNotReadyList);
|
||||||
|
numOfTimeout = (int32_t)taosArrayGetSize(pTimeoutList);
|
||||||
|
|
||||||
|
// fault tasks detected, not try anymore
|
||||||
|
ASSERT((numOfReady + numOfFault + numOfNotReady + numOfTimeout + numOfNotRsp) == taosArrayGetSize(pInfo->pList));
|
||||||
|
if (numOfFault > 0) {
|
||||||
|
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
|
||||||
|
stDebug(
|
||||||
|
"s-task:%s status:%s vgId:%d all rsp. quit from monitor rsp tmr, since vnode-transfer/leader-change/restart "
|
||||||
|
"detected, notRsp:%d, notReady:%d, fault:%d, timeout:%d, ready:%d ref:%d",
|
||||||
|
id, pStat->name, vgId, numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady, ref);
|
||||||
|
|
||||||
|
streamTaskCompleteCheckRsp(pInfo, false, id);
|
||||||
|
taosThreadMutexUnlock(&pInfo->checkInfoLock);
|
||||||
|
|
||||||
|
taosArrayDestroy(pNotReadyList);
|
||||||
|
taosArrayDestroy(pTimeoutList);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// checking of downstream tasks has been stopped by other threads
|
||||||
|
if (pInfo->stopCheckProcess == 1) {
|
||||||
|
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
|
||||||
|
stDebug(
|
||||||
|
"s-task:%s status:%s vgId:%d stopped by other threads to check downstream process, notRsp:%d, notReady:%d, "
|
||||||
|
"fault:%d, timeout:%d, ready:%d ref:%d",
|
||||||
|
id, pStat->name, vgId, numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady, ref);
|
||||||
|
|
||||||
|
streamTaskCompleteCheckRsp(pInfo, false, id);
|
||||||
|
taosThreadMutexUnlock(&pInfo->checkInfoLock);
|
||||||
|
|
||||||
|
// add the not-ready tasks into the final task status result buf, along with related fill-history task if exists.
|
||||||
|
streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, pInfo->startTs, now, false);
|
||||||
|
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
|
||||||
|
STaskId* pHId = &pTask->hTaskInfo.id;
|
||||||
|
streamMetaAddTaskLaunchResult(pTask->pMeta, pHId->streamId, pHId->taskId, pInfo->startTs, now, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
taosArrayDestroy(pNotReadyList);
|
||||||
|
taosArrayDestroy(pTimeoutList);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (numOfNotReady > 0) { // check to make sure not in recheck timer
|
||||||
|
ASSERT(pTask->status.downstreamReady == 0);
|
||||||
|
|
||||||
|
// reset the info, and send the check msg to failure downstream again
|
||||||
|
for (int32_t i = 0; i < numOfNotReady; ++i) {
|
||||||
|
int32_t taskId = *(int32_t*)taosArrayGet(pNotReadyList, i);
|
||||||
|
|
||||||
|
SDownstreamStatusInfo* p = findCheckRspStatus(pInfo, taskId);
|
||||||
|
if (p != NULL) {
|
||||||
|
p->rspTs = 0;
|
||||||
|
p->status = -1;
|
||||||
|
doSendCheckMsg(pTask, p);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pInfo->notReadyRetryCount += 1;
|
||||||
|
stDebug("s-task:%s %d downstream task(s) not ready, send check msg again, retry:%d start time:%" PRId64, id,
|
||||||
|
numOfNotReady, pInfo->notReadyRetryCount, pInfo->startTs);
|
||||||
|
}
|
||||||
|
|
||||||
|
// todo add into node update list and send to mnode
|
||||||
|
if (numOfTimeout > 0) {
|
||||||
|
ASSERT(pTask->status.downstreamReady == 0);
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < numOfTimeout; ++i) {
|
||||||
|
int32_t taskId = *(int32_t*)taosArrayGet(pTimeoutList, i);
|
||||||
|
|
||||||
|
SDownstreamStatusInfo* p = findCheckRspStatus(pInfo, taskId);
|
||||||
|
if (p != NULL) {
|
||||||
|
ASSERT(p->status == -1 && p->rspTs == 0);
|
||||||
|
doSendCheckMsg(pTask, p);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pInfo->timeoutRetryCount += 1;
|
||||||
|
|
||||||
|
// timeout more than 100 sec, add into node update list
|
||||||
|
if (pInfo->timeoutRetryCount > 10) {
|
||||||
|
pInfo->timeoutRetryCount = 0;
|
||||||
|
stDebug("s-task:%s vgId:%d %d downstream task(s) timeout more than 100sec, add into nodeUpate list", id, vgId,
|
||||||
|
numOfTimeout);
|
||||||
|
} else {
|
||||||
|
stDebug("s-task:%s vgId:%d %d downstream task(s) timeout, send check msg again, retry:%d start time:%" PRId64, id,
|
||||||
|
vgId, numOfTimeout, pInfo->timeoutRetryCount, pInfo->startTs);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
taosTmrReset(rspMonitorFn, CHECK_RSP_INTERVAL, pTask, streamTimer, &pInfo->checkRspTmr);
|
||||||
|
taosThreadMutexUnlock(&pInfo->checkInfoLock);
|
||||||
|
|
||||||
|
stDebug("s-task:%s continue checking rsp in 300ms, notRsp:%d, notReady:%d, fault:%d, timeout:%d, ready:%d", id,
|
||||||
|
numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady);
|
||||||
|
|
||||||
|
taosArrayDestroy(pNotReadyList);
|
||||||
|
taosArrayDestroy(pTimeoutList);
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tEncodeStreamTaskCheckReq(SEncoder* pEncoder, const SStreamTaskCheckReq* pReq) {
|
||||||
|
if (tStartEncode(pEncoder) < 0) return -1;
|
||||||
|
if (tEncodeI64(pEncoder, pReq->reqId) < 0) return -1;
|
||||||
|
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
|
||||||
|
if (tEncodeI32(pEncoder, pReq->upstreamNodeId) < 0) return -1;
|
||||||
|
if (tEncodeI32(pEncoder, pReq->upstreamTaskId) < 0) return -1;
|
||||||
|
if (tEncodeI32(pEncoder, pReq->downstreamNodeId) < 0) return -1;
|
||||||
|
if (tEncodeI32(pEncoder, pReq->downstreamTaskId) < 0) return -1;
|
||||||
|
if (tEncodeI32(pEncoder, pReq->childId) < 0) return -1;
|
||||||
|
if (tEncodeI64(pEncoder, pReq->stage) < 0) return -1;
|
||||||
|
tEndEncode(pEncoder);
|
||||||
|
return pEncoder->pos;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tDecodeStreamTaskCheckReq(SDecoder* pDecoder, SStreamTaskCheckReq* pReq) {
|
||||||
|
if (tStartDecode(pDecoder) < 0) return -1;
|
||||||
|
if (tDecodeI64(pDecoder, &pReq->reqId) < 0) return -1;
|
||||||
|
if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1;
|
||||||
|
if (tDecodeI32(pDecoder, &pReq->upstreamNodeId) < 0) return -1;
|
||||||
|
if (tDecodeI32(pDecoder, &pReq->upstreamTaskId) < 0) return -1;
|
||||||
|
if (tDecodeI32(pDecoder, &pReq->downstreamNodeId) < 0) return -1;
|
||||||
|
if (tDecodeI32(pDecoder, &pReq->downstreamTaskId) < 0) return -1;
|
||||||
|
if (tDecodeI32(pDecoder, &pReq->childId) < 0) return -1;
|
||||||
|
if (tDecodeI64(pDecoder, &pReq->stage) < 0) return -1;
|
||||||
|
tEndDecode(pDecoder);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tEncodeStreamTaskCheckRsp(SEncoder* pEncoder, const SStreamTaskCheckRsp* pRsp) {
|
||||||
|
if (tStartEncode(pEncoder) < 0) return -1;
|
||||||
|
if (tEncodeI64(pEncoder, pRsp->reqId) < 0) return -1;
|
||||||
|
if (tEncodeI64(pEncoder, pRsp->streamId) < 0) return -1;
|
||||||
|
if (tEncodeI32(pEncoder, pRsp->upstreamNodeId) < 0) return -1;
|
||||||
|
if (tEncodeI32(pEncoder, pRsp->upstreamTaskId) < 0) return -1;
|
||||||
|
if (tEncodeI32(pEncoder, pRsp->downstreamNodeId) < 0) return -1;
|
||||||
|
if (tEncodeI32(pEncoder, pRsp->downstreamTaskId) < 0) return -1;
|
||||||
|
if (tEncodeI32(pEncoder, pRsp->childId) < 0) return -1;
|
||||||
|
if (tEncodeI64(pEncoder, pRsp->oldStage) < 0) return -1;
|
||||||
|
if (tEncodeI8(pEncoder, pRsp->status) < 0) return -1;
|
||||||
|
tEndEncode(pEncoder);
|
||||||
|
return pEncoder->pos;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tDecodeStreamTaskCheckRsp(SDecoder* pDecoder, SStreamTaskCheckRsp* pRsp) {
|
||||||
|
if (tStartDecode(pDecoder) < 0) return -1;
|
||||||
|
if (tDecodeI64(pDecoder, &pRsp->reqId) < 0) return -1;
|
||||||
|
if (tDecodeI64(pDecoder, &pRsp->streamId) < 0) return -1;
|
||||||
|
if (tDecodeI32(pDecoder, &pRsp->upstreamNodeId) < 0) return -1;
|
||||||
|
if (tDecodeI32(pDecoder, &pRsp->upstreamTaskId) < 0) return -1;
|
||||||
|
if (tDecodeI32(pDecoder, &pRsp->downstreamNodeId) < 0) return -1;
|
||||||
|
if (tDecodeI32(pDecoder, &pRsp->downstreamTaskId) < 0) return -1;
|
||||||
|
if (tDecodeI32(pDecoder, &pRsp->childId) < 0) return -1;
|
||||||
|
if (tDecodeI64(pDecoder, &pRsp->oldStage) < 0) return -1;
|
||||||
|
if (tDecodeI8(pDecoder, &pRsp->status) < 0) return -1;
|
||||||
|
tEndDecode(pDecoder);
|
||||||
|
return 0;
|
||||||
|
}
|
|
@ -1073,9 +1073,9 @@ static void addUpdateNodeIntoHbMsg(SStreamTask* pTask, SStreamHbMsg* pMsg) {
|
||||||
|
|
||||||
taosThreadMutexLock(&pTask->lock);
|
taosThreadMutexLock(&pTask->lock);
|
||||||
|
|
||||||
int32_t num = taosArrayGetSize(pTask->outputInfo.pDownstreamUpdateList);
|
int32_t num = taosArrayGetSize(pTask->outputInfo.pNodeEpsetUpdateList);
|
||||||
for (int j = 0; j < num; ++j) {
|
for (int j = 0; j < num; ++j) {
|
||||||
SDownstreamTaskEpset* pTaskEpset = taosArrayGet(pTask->outputInfo.pDownstreamUpdateList, j);
|
SDownstreamTaskEpset* pTaskEpset = taosArrayGet(pTask->outputInfo.pNodeEpsetUpdateList, j);
|
||||||
|
|
||||||
bool exist = existInHbMsg(pMsg, pTaskEpset);
|
bool exist = existInHbMsg(pMsg, pTaskEpset);
|
||||||
if (!exist) {
|
if (!exist) {
|
||||||
|
@ -1085,7 +1085,7 @@ static void addUpdateNodeIntoHbMsg(SStreamTask* pTask, SStreamHbMsg* pMsg) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
taosArrayClear(pTask->outputInfo.pDownstreamUpdateList);
|
taosArrayClear(pTask->outputInfo.pNodeEpsetUpdateList);
|
||||||
taosThreadMutexUnlock(&pTask->lock);
|
taosThreadMutexUnlock(&pTask->lock);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -32,12 +32,12 @@ typedef struct SLaunchHTaskInfo {
|
||||||
static int32_t streamSetParamForScanHistory(SStreamTask* pTask);
|
static int32_t streamSetParamForScanHistory(SStreamTask* pTask);
|
||||||
static void streamTaskSetRangeStreamCalc(SStreamTask* pTask);
|
static void streamTaskSetRangeStreamCalc(SStreamTask* pTask);
|
||||||
static int32_t initScanHistoryReq(SStreamTask* pTask, SStreamScanHistoryReq* pReq, int8_t igUntreated);
|
static int32_t initScanHistoryReq(SStreamTask* pTask, SStreamScanHistoryReq* pReq, int8_t igUntreated);
|
||||||
static SLaunchHTaskInfo* createHTaskLaunchInfo(SStreamMeta* pMeta, STaskId* pTaskId, int64_t hStreamId, int32_t hTaskId);
|
static SLaunchHTaskInfo* createHTaskLaunchInfo(SStreamMeta* pMeta, STaskId* pTaskId, int64_t hStreamId,
|
||||||
|
int32_t hTaskId);
|
||||||
static void tryLaunchHistoryTask(void* param, void* tmrId);
|
static void tryLaunchHistoryTask(void* param, void* tmrId);
|
||||||
static void doProcessDownstreamReadyRsp(SStreamTask* pTask);
|
static void doExecScanhistoryInFuture(void* param, void* tmrId);
|
||||||
static void doExecScanhistoryInFuture(void* param, void* tmrId);
|
static int32_t doStartScanHistoryTask(SStreamTask* pTask);
|
||||||
static int32_t doStartScanHistoryTask(SStreamTask* pTask);
|
static int32_t streamTaskStartScanHistory(SStreamTask* pTask);
|
||||||
static int32_t streamTaskStartScanHistory(SStreamTask* pTask);
|
|
||||||
|
|
||||||
int32_t streamTaskSetReady(SStreamTask* pTask) {
|
int32_t streamTaskSetReady(SStreamTask* pTask) {
|
||||||
int32_t numOfDowns = streamTaskGetNumOfDownstream(pTask);
|
int32_t numOfDowns = streamTaskGetNumOfDownstream(pTask);
|
||||||
|
@ -165,67 +165,6 @@ int32_t streamTaskStartScanHistory(SStreamTask* pTask) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
// check status
|
|
||||||
void streamTaskCheckDownstream(SStreamTask* pTask) {
|
|
||||||
SDataRange* pRange = &pTask->dataRange;
|
|
||||||
STimeWindow* pWindow = &pRange->window;
|
|
||||||
|
|
||||||
SStreamTaskCheckReq req = {
|
|
||||||
.streamId = pTask->id.streamId,
|
|
||||||
.upstreamTaskId = pTask->id.taskId,
|
|
||||||
.upstreamNodeId = pTask->info.nodeId,
|
|
||||||
.childId = pTask->info.selfChildId,
|
|
||||||
.stage = pTask->pMeta->stage,
|
|
||||||
};
|
|
||||||
|
|
||||||
ASSERT(pTask->status.downstreamReady == 0);
|
|
||||||
|
|
||||||
// serialize streamProcessScanHistoryFinishRsp
|
|
||||||
if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
|
|
||||||
streamTaskStartMonitorCheckRsp(pTask);
|
|
||||||
|
|
||||||
req.reqId = tGenIdPI64();
|
|
||||||
req.downstreamNodeId = pTask->outputInfo.fixedDispatcher.nodeId;
|
|
||||||
req.downstreamTaskId = pTask->outputInfo.fixedDispatcher.taskId;
|
|
||||||
|
|
||||||
streamTaskAddReqInfo(&pTask->taskCheckInfo, req.reqId, req.downstreamTaskId, pTask->id.idStr);
|
|
||||||
|
|
||||||
stDebug("s-task:%s (vgId:%d) stage:%" PRId64 " check single downstream task:0x%x(vgId:%d) ver:%" PRId64 "-%" PRId64
|
|
||||||
" window:%" PRId64 "-%" PRId64 " reqId:0x%" PRIx64,
|
|
||||||
pTask->id.idStr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId,
|
|
||||||
pRange->range.minVer, pRange->range.maxVer, pWindow->skey, pWindow->ekey, req.reqId);
|
|
||||||
|
|
||||||
streamSendCheckMsg(pTask, &req, pTask->outputInfo.fixedDispatcher.nodeId, &pTask->outputInfo.fixedDispatcher.epSet);
|
|
||||||
|
|
||||||
} else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
|
||||||
streamTaskStartMonitorCheckRsp(pTask);
|
|
||||||
|
|
||||||
SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos;
|
|
||||||
|
|
||||||
int32_t numOfVgs = taosArrayGetSize(vgInfo);
|
|
||||||
stDebug("s-task:%s check %d downstream tasks, ver:%" PRId64 "-%" PRId64 " window:%" PRId64 "-%" PRId64,
|
|
||||||
pTask->id.idStr, numOfVgs, pRange->range.minVer, pRange->range.maxVer, pWindow->skey, pWindow->ekey);
|
|
||||||
|
|
||||||
for (int32_t i = 0; i < numOfVgs; i++) {
|
|
||||||
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
|
|
||||||
req.reqId = tGenIdPI64();
|
|
||||||
req.downstreamNodeId = pVgInfo->vgId;
|
|
||||||
req.downstreamTaskId = pVgInfo->taskId;
|
|
||||||
|
|
||||||
streamTaskAddReqInfo(&pTask->taskCheckInfo, req.reqId, req.downstreamTaskId, pTask->id.idStr);
|
|
||||||
|
|
||||||
stDebug("s-task:%s (vgId:%d) stage:%" PRId64
|
|
||||||
" check downstream task:0x%x (vgId:%d) (shuffle), idx:%d, reqId:0x%" PRIx64,
|
|
||||||
pTask->id.idStr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, i, req.reqId);
|
|
||||||
streamSendCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
|
|
||||||
}
|
|
||||||
} else { // for sink task, set it ready directly.
|
|
||||||
stDebug("s-task:%s (vgId:%d) set downstream ready, since no downstream", pTask->id.idStr, pTask->info.nodeId);
|
|
||||||
streamTaskStopMonitorCheckRsp(&pTask->taskCheckInfo, pTask->id.idStr);
|
|
||||||
doProcessDownstreamReadyRsp(pTask);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_t vgId, int64_t stage,
|
int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_t vgId, int64_t stage,
|
||||||
int64_t* oldStage) {
|
int64_t* oldStage) {
|
||||||
SStreamChildEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, upstreamTaskId);
|
SStreamChildEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, upstreamTaskId);
|
||||||
|
@ -327,122 +266,6 @@ int32_t streamTaskOnScanhistoryTaskReady(SStreamTask* pTask) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
void doProcessDownstreamReadyRsp(SStreamTask* pTask) {
|
|
||||||
EStreamTaskEvent event = (pTask->info.fillHistory == 0) ? TASK_EVENT_INIT : TASK_EVENT_INIT_SCANHIST;
|
|
||||||
streamTaskOnHandleEventSuccess(pTask->status.pSM, event, NULL, NULL);
|
|
||||||
|
|
||||||
int64_t checkTs = pTask->execInfo.checkTs;
|
|
||||||
int64_t readyTs = pTask->execInfo.readyTs;
|
|
||||||
streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, checkTs, readyTs, true);
|
|
||||||
|
|
||||||
if (pTask->status.taskStatus == TASK_STATUS__HALT) {
|
|
||||||
ASSERT(HAS_RELATED_FILLHISTORY_TASK(pTask) && (pTask->info.fillHistory == 0));
|
|
||||||
|
|
||||||
// halt it self for count window stream task until the related fill history task completed.
|
|
||||||
stDebug("s-task:%s level:%d initial status is %s from mnode, set it to be halt", pTask->id.idStr,
|
|
||||||
pTask->info.taskLevel, streamTaskGetStatusStr(pTask->status.taskStatus));
|
|
||||||
streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_HALT);
|
|
||||||
}
|
|
||||||
|
|
||||||
// start the related fill-history task, when current task is ready
|
|
||||||
// not invoke in success callback due to the deadlock.
|
|
||||||
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
|
|
||||||
stDebug("s-task:%s try to launch related fill-history task", pTask->id.idStr);
|
|
||||||
streamLaunchFillHistoryTask(pTask);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static void addIntoNodeUpdateList(SStreamTask* pTask, int32_t nodeId) {
|
|
||||||
int32_t vgId = pTask->pMeta->vgId;
|
|
||||||
|
|
||||||
taosThreadMutexLock(&pTask->lock);
|
|
||||||
int32_t num = taosArrayGetSize(pTask->outputInfo.pDownstreamUpdateList);
|
|
||||||
bool existed = false;
|
|
||||||
for (int i = 0; i < num; ++i) {
|
|
||||||
SDownstreamTaskEpset* p = taosArrayGet(pTask->outputInfo.pDownstreamUpdateList, i);
|
|
||||||
if (p->nodeId == nodeId) {
|
|
||||||
existed = true;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!existed) {
|
|
||||||
SDownstreamTaskEpset t = {.nodeId = nodeId};
|
|
||||||
taosArrayPush(pTask->outputInfo.pDownstreamUpdateList, &t);
|
|
||||||
|
|
||||||
stInfo("s-task:%s vgId:%d downstream nodeId:%d needs to be updated, total needs updated:%d", pTask->id.idStr, vgId,
|
|
||||||
t.nodeId, (int32_t)taosArrayGetSize(pTask->outputInfo.pDownstreamUpdateList));
|
|
||||||
}
|
|
||||||
|
|
||||||
taosThreadMutexUnlock(&pTask->lock);
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp) {
|
|
||||||
ASSERT(pTask->id.taskId == pRsp->upstreamTaskId);
|
|
||||||
|
|
||||||
int64_t now = taosGetTimestampMs();
|
|
||||||
const char* id = pTask->id.idStr;
|
|
||||||
STaskCheckInfo* pInfo = &pTask->taskCheckInfo;
|
|
||||||
int32_t total = streamTaskGetNumOfDownstream(pTask);
|
|
||||||
int32_t left = -1;
|
|
||||||
|
|
||||||
if (streamTaskShouldStop(pTask)) {
|
|
||||||
stDebug("s-task:%s should stop, do not do check downstream again", id);
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pRsp->status == TASK_DOWNSTREAM_READY) {
|
|
||||||
int32_t code = streamTaskUpdateCheckInfo(pInfo, pRsp->downstreamTaskId, pRsp->status, now, pRsp->reqId, &left, id);
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (left == 0) {
|
|
||||||
doProcessDownstreamReadyRsp(pTask); // all downstream tasks are ready, set the complete check downstream flag
|
|
||||||
streamTaskStopMonitorCheckRsp(pInfo, id);
|
|
||||||
} else {
|
|
||||||
stDebug("s-task:%s (vgId:%d) recv check rsp from task:0x%x (vgId:%d) status:%d, total:%d not ready:%d", id,
|
|
||||||
pRsp->upstreamNodeId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->status, total, left);
|
|
||||||
}
|
|
||||||
} else { // not ready, wait for 100ms and retry
|
|
||||||
int32_t code = streamTaskUpdateCheckInfo(pInfo, pRsp->downstreamTaskId, pRsp->status, now, pRsp->reqId, &left, id);
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
|
||||||
return TSDB_CODE_SUCCESS; // return success in any cases.
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pRsp->status == TASK_UPSTREAM_NEW_STAGE || pRsp->status == TASK_DOWNSTREAM_NOT_LEADER) {
|
|
||||||
if (pRsp->status == TASK_UPSTREAM_NEW_STAGE) {
|
|
||||||
stError("s-task:%s vgId:%d self vnode-transfer/leader-change/restart detected, old stage:%" PRId64
|
|
||||||
", current stage:%" PRId64 ", not check wait for downstream task nodeUpdate, and all tasks restart",
|
|
||||||
id, pRsp->upstreamNodeId, pRsp->oldStage, pTask->pMeta->stage);
|
|
||||||
addIntoNodeUpdateList(pTask, pRsp->upstreamNodeId);
|
|
||||||
} else {
|
|
||||||
stError(
|
|
||||||
"s-task:%s downstream taskId:0x%x (vgId:%d) not leader, self dispatch epset needs to be updated, not check "
|
|
||||||
"downstream again, nodeUpdate needed",
|
|
||||||
id, pRsp->downstreamTaskId, pRsp->downstreamNodeId);
|
|
||||||
addIntoNodeUpdateList(pTask, pRsp->downstreamNodeId);
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t startTs = pTask->execInfo.checkTs;
|
|
||||||
streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, startTs, now, false);
|
|
||||||
|
|
||||||
// automatically set the related fill-history task to be failed.
|
|
||||||
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
|
|
||||||
STaskId* pId = &pTask->hTaskInfo.id;
|
|
||||||
streamMetaAddTaskLaunchResult(pTask->pMeta, pId->streamId, pId->taskId, startTs, now, false);
|
|
||||||
}
|
|
||||||
|
|
||||||
} else { // TASK_DOWNSTREAM_NOT_READY, let's retry in 100ms
|
|
||||||
ASSERT(left > 0);
|
|
||||||
stDebug("s-task:%s (vgId:%d) recv check rsp from task:0x%x (vgId:%d) status:%d, total:%d not ready:%d", id,
|
|
||||||
pRsp->upstreamNodeId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->status, total, left);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t streamSendCheckRsp(const SStreamMeta* pMeta, const SStreamTaskCheckReq* pReq, SStreamTaskCheckRsp* pRsp,
|
int32_t streamSendCheckRsp(const SStreamMeta* pMeta, const SStreamTaskCheckReq* pReq, SStreamTaskCheckRsp* pRsp,
|
||||||
SRpcHandleInfo* pRpcInfo, int32_t taskId) {
|
SRpcHandleInfo* pRpcInfo, int32_t taskId) {
|
||||||
SEncoder encoder;
|
SEncoder encoder;
|
||||||
|
@ -798,64 +621,6 @@ bool streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t nextProcessVe
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tEncodeStreamTaskCheckReq(SEncoder* pEncoder, const SStreamTaskCheckReq* pReq) {
|
|
||||||
if (tStartEncode(pEncoder) < 0) return -1;
|
|
||||||
if (tEncodeI64(pEncoder, pReq->reqId) < 0) return -1;
|
|
||||||
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
|
|
||||||
if (tEncodeI32(pEncoder, pReq->upstreamNodeId) < 0) return -1;
|
|
||||||
if (tEncodeI32(pEncoder, pReq->upstreamTaskId) < 0) return -1;
|
|
||||||
if (tEncodeI32(pEncoder, pReq->downstreamNodeId) < 0) return -1;
|
|
||||||
if (tEncodeI32(pEncoder, pReq->downstreamTaskId) < 0) return -1;
|
|
||||||
if (tEncodeI32(pEncoder, pReq->childId) < 0) return -1;
|
|
||||||
if (tEncodeI64(pEncoder, pReq->stage) < 0) return -1;
|
|
||||||
tEndEncode(pEncoder);
|
|
||||||
return pEncoder->pos;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t tDecodeStreamTaskCheckReq(SDecoder* pDecoder, SStreamTaskCheckReq* pReq) {
|
|
||||||
if (tStartDecode(pDecoder) < 0) return -1;
|
|
||||||
if (tDecodeI64(pDecoder, &pReq->reqId) < 0) return -1;
|
|
||||||
if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1;
|
|
||||||
if (tDecodeI32(pDecoder, &pReq->upstreamNodeId) < 0) return -1;
|
|
||||||
if (tDecodeI32(pDecoder, &pReq->upstreamTaskId) < 0) return -1;
|
|
||||||
if (tDecodeI32(pDecoder, &pReq->downstreamNodeId) < 0) return -1;
|
|
||||||
if (tDecodeI32(pDecoder, &pReq->downstreamTaskId) < 0) return -1;
|
|
||||||
if (tDecodeI32(pDecoder, &pReq->childId) < 0) return -1;
|
|
||||||
if (tDecodeI64(pDecoder, &pReq->stage) < 0) return -1;
|
|
||||||
tEndDecode(pDecoder);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t tEncodeStreamTaskCheckRsp(SEncoder* pEncoder, const SStreamTaskCheckRsp* pRsp) {
|
|
||||||
if (tStartEncode(pEncoder) < 0) return -1;
|
|
||||||
if (tEncodeI64(pEncoder, pRsp->reqId) < 0) return -1;
|
|
||||||
if (tEncodeI64(pEncoder, pRsp->streamId) < 0) return -1;
|
|
||||||
if (tEncodeI32(pEncoder, pRsp->upstreamNodeId) < 0) return -1;
|
|
||||||
if (tEncodeI32(pEncoder, pRsp->upstreamTaskId) < 0) return -1;
|
|
||||||
if (tEncodeI32(pEncoder, pRsp->downstreamNodeId) < 0) return -1;
|
|
||||||
if (tEncodeI32(pEncoder, pRsp->downstreamTaskId) < 0) return -1;
|
|
||||||
if (tEncodeI32(pEncoder, pRsp->childId) < 0) return -1;
|
|
||||||
if (tEncodeI64(pEncoder, pRsp->oldStage) < 0) return -1;
|
|
||||||
if (tEncodeI8(pEncoder, pRsp->status) < 0) return -1;
|
|
||||||
tEndEncode(pEncoder);
|
|
||||||
return pEncoder->pos;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t tDecodeStreamTaskCheckRsp(SDecoder* pDecoder, SStreamTaskCheckRsp* pRsp) {
|
|
||||||
if (tStartDecode(pDecoder) < 0) return -1;
|
|
||||||
if (tDecodeI64(pDecoder, &pRsp->reqId) < 0) return -1;
|
|
||||||
if (tDecodeI64(pDecoder, &pRsp->streamId) < 0) return -1;
|
|
||||||
if (tDecodeI32(pDecoder, &pRsp->upstreamNodeId) < 0) return -1;
|
|
||||||
if (tDecodeI32(pDecoder, &pRsp->upstreamTaskId) < 0) return -1;
|
|
||||||
if (tDecodeI32(pDecoder, &pRsp->downstreamNodeId) < 0) return -1;
|
|
||||||
if (tDecodeI32(pDecoder, &pRsp->downstreamTaskId) < 0) return -1;
|
|
||||||
if (tDecodeI32(pDecoder, &pRsp->childId) < 0) return -1;
|
|
||||||
if (tDecodeI64(pDecoder, &pRsp->oldStage) < 0) return -1;
|
|
||||||
if (tDecodeI8(pDecoder, &pRsp->status) < 0) return -1;
|
|
||||||
tEndDecode(pDecoder);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
void streamTaskSetRangeStreamCalc(SStreamTask* pTask) {
|
void streamTaskSetRangeStreamCalc(SStreamTask* pTask) {
|
||||||
SDataRange* pRange = &pTask->dataRange;
|
SDataRange* pRange = &pTask->dataRange;
|
||||||
|
|
|
@ -21,8 +21,6 @@
|
||||||
#include "ttimer.h"
|
#include "ttimer.h"
|
||||||
#include "wal.h"
|
#include "wal.h"
|
||||||
|
|
||||||
#define CHECK_NOT_RSP_DURATION 10*1000 // 10 sec
|
|
||||||
|
|
||||||
static void streamTaskDestroyUpstreamInfo(SUpstreamInfo* pUpstreamInfo);
|
static void streamTaskDestroyUpstreamInfo(SUpstreamInfo* pUpstreamInfo);
|
||||||
static void streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet, bool* pUpdated);
|
static void streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet, bool* pUpdated);
|
||||||
static void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet, bool* pUpdate);
|
static void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet, bool* pUpdate);
|
||||||
|
@ -261,8 +259,8 @@ int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
|
||||||
if (tDecodeI32(pDecoder, &taskId)) return -1;
|
if (tDecodeI32(pDecoder, &taskId)) return -1;
|
||||||
pTask->streamTaskId.taskId = taskId;
|
pTask->streamTaskId.taskId = taskId;
|
||||||
|
|
||||||
if (tDecodeU64(pDecoder, &pTask->dataRange.range.minVer)) return -1;
|
if (tDecodeU64(pDecoder, (uint64_t*)&pTask->dataRange.range.minVer)) return -1;
|
||||||
if (tDecodeU64(pDecoder, &pTask->dataRange.range.maxVer)) return -1;
|
if (tDecodeU64(pDecoder, (uint64_t*)&pTask->dataRange.range.maxVer)) return -1;
|
||||||
if (tDecodeI64(pDecoder, &pTask->dataRange.window.skey)) return -1;
|
if (tDecodeI64(pDecoder, &pTask->dataRange.window.skey)) return -1;
|
||||||
if (tDecodeI64(pDecoder, &pTask->dataRange.window.ekey)) return -1;
|
if (tDecodeI64(pDecoder, &pTask->dataRange.window.ekey)) return -1;
|
||||||
|
|
||||||
|
@ -442,7 +440,7 @@ void tFreeStreamTask(SStreamTask* pTask) {
|
||||||
taosArrayDestroy(pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos);
|
taosArrayDestroy(pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos);
|
||||||
}
|
}
|
||||||
|
|
||||||
streamTaskCleanCheckInfo(&pTask->taskCheckInfo);
|
streamTaskCleanupCheckInfo(&pTask->taskCheckInfo);
|
||||||
|
|
||||||
if (pTask->pState) {
|
if (pTask->pState) {
|
||||||
stDebug("s-task:0x%x start to free task state", taskId);
|
stDebug("s-task:0x%x start to free task state", taskId);
|
||||||
|
@ -470,7 +468,7 @@ void tFreeStreamTask(SStreamTask* pTask) {
|
||||||
taosMemoryFree(pTask->outputInfo.pTokenBucket);
|
taosMemoryFree(pTask->outputInfo.pTokenBucket);
|
||||||
taosThreadMutexDestroy(&pTask->lock);
|
taosThreadMutexDestroy(&pTask->lock);
|
||||||
|
|
||||||
pTask->outputInfo.pDownstreamUpdateList = taosArrayDestroy(pTask->outputInfo.pDownstreamUpdateList);
|
pTask->outputInfo.pNodeEpsetUpdateList = taosArrayDestroy(pTask->outputInfo.pNodeEpsetUpdateList);
|
||||||
|
|
||||||
taosMemoryFree(pTask);
|
taosMemoryFree(pTask);
|
||||||
stDebug("s-task:0x%x free task completed", taskId);
|
stDebug("s-task:0x%x free task completed", taskId);
|
||||||
|
@ -571,8 +569,8 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i
|
||||||
// 2MiB per second for sink task
|
// 2MiB per second for sink task
|
||||||
// 50 times sink operator per second
|
// 50 times sink operator per second
|
||||||
streamTaskInitTokenBucket(pOutputInfo->pTokenBucket, 35, 35, tsSinkDataRate, pTask->id.idStr);
|
streamTaskInitTokenBucket(pOutputInfo->pTokenBucket, 35, 35, tsSinkDataRate, pTask->id.idStr);
|
||||||
pOutputInfo->pDownstreamUpdateList = taosArrayInit(4, sizeof(SDownstreamTaskEpset));
|
pOutputInfo->pNodeEpsetUpdateList = taosArrayInit(4, sizeof(SDownstreamTaskEpset));
|
||||||
if (pOutputInfo->pDownstreamUpdateList == NULL) {
|
if (pOutputInfo->pNodeEpsetUpdateList == NULL) {
|
||||||
stError("s-task:%s failed to prepare downstreamUpdateList, code:%s", pTask->id.idStr, tstrerror(TSDB_CODE_OUT_OF_MEMORY));
|
stError("s-task:%s failed to prepare downstreamUpdateList, code:%s", pTask->id.idStr, tstrerror(TSDB_CODE_OUT_OF_MEMORY));
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
|
@ -994,379 +992,3 @@ int32_t streamTaskSendCheckpointReq(SStreamTask* pTask) {
|
||||||
tmsgSendReq(&pTask->info.mnodeEpset, &msg);
|
tmsgSendReq(&pTask->info.mnodeEpset, &msg);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t streamTaskInitTaskCheckInfo(STaskCheckInfo* pInfo, STaskOutputInfo* pOutputInfo, int64_t startTs) {
|
|
||||||
taosArrayClear(pInfo->pList);
|
|
||||||
|
|
||||||
if (pOutputInfo->type == TASK_OUTPUT__FIXED_DISPATCH) {
|
|
||||||
pInfo->notReadyTasks = 1;
|
|
||||||
} else if (pOutputInfo->type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
|
||||||
pInfo->notReadyTasks = taosArrayGetSize(pOutputInfo->shuffleDispatcher.dbInfo.pVgroupInfos);
|
|
||||||
ASSERT(pInfo->notReadyTasks == pOutputInfo->shuffleDispatcher.dbInfo.vgNum);
|
|
||||||
}
|
|
||||||
|
|
||||||
pInfo->startTs = startTs;
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
static SDownstreamStatusInfo* findCheckRspStatus(STaskCheckInfo* pInfo, int32_t taskId) {
|
|
||||||
for (int32_t j = 0; j < taosArrayGetSize(pInfo->pList); ++j) {
|
|
||||||
SDownstreamStatusInfo* p = taosArrayGet(pInfo->pList, j);
|
|
||||||
if (p->taskId == taskId) {
|
|
||||||
return p;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t streamTaskAddReqInfo(STaskCheckInfo* pInfo, int64_t reqId, int32_t taskId, const char* id) {
|
|
||||||
SDownstreamStatusInfo info = {.taskId = taskId, .status = -1, .reqId = reqId, .rspTs = 0};
|
|
||||||
|
|
||||||
taosThreadMutexLock(&pInfo->checkInfoLock);
|
|
||||||
|
|
||||||
SDownstreamStatusInfo* p = findCheckRspStatus(pInfo, taskId);
|
|
||||||
if (p != NULL) {
|
|
||||||
stDebug("s-task:%s check info to task:0x%x already sent", id, taskId);
|
|
||||||
taosThreadMutexUnlock(&pInfo->checkInfoLock);
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
taosArrayPush(pInfo->pList, &info);
|
|
||||||
|
|
||||||
taosThreadMutexUnlock(&pInfo->checkInfoLock);
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t streamTaskUpdateCheckInfo(STaskCheckInfo* pInfo, int32_t taskId, int32_t status, int64_t rspTs, int64_t reqId,
|
|
||||||
int32_t* pNotReady, const char* id) {
|
|
||||||
taosThreadMutexLock(&pInfo->checkInfoLock);
|
|
||||||
|
|
||||||
SDownstreamStatusInfo* p = findCheckRspStatus(pInfo, taskId);
|
|
||||||
if (p != NULL) {
|
|
||||||
|
|
||||||
if (reqId != p->reqId) {
|
|
||||||
stError("s-task:%s reqId:%" PRIx64 " expected:%" PRIx64
|
|
||||||
" expired check-rsp recv from downstream task:0x%x, discarded",
|
|
||||||
id, reqId, p->reqId, taskId);
|
|
||||||
taosThreadMutexUnlock(&pInfo->checkInfoLock);
|
|
||||||
return TSDB_CODE_FAILED;
|
|
||||||
}
|
|
||||||
|
|
||||||
// subtract one not-ready-task, since it is ready now
|
|
||||||
if ((p->status != TASK_DOWNSTREAM_READY) && (status == TASK_DOWNSTREAM_READY)) {
|
|
||||||
*pNotReady = atomic_sub_fetch_32(&pInfo->notReadyTasks, 1);
|
|
||||||
} else {
|
|
||||||
*pNotReady = pInfo->notReadyTasks;
|
|
||||||
}
|
|
||||||
|
|
||||||
p->status = status;
|
|
||||||
p->rspTs = rspTs;
|
|
||||||
|
|
||||||
taosThreadMutexUnlock(&pInfo->checkInfoLock);
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
taosThreadMutexUnlock(&pInfo->checkInfoLock);
|
|
||||||
stError("s-task:%s unexpected check rsp msg, invalid downstream task:0x%x, reqId:%" PRIx64 " discarded", id, taskId,
|
|
||||||
reqId);
|
|
||||||
return TSDB_CODE_FAILED;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t streamTaskStartCheckDownstream(STaskCheckInfo* pInfo, const char* id) {
|
|
||||||
if (pInfo->inCheckProcess == 0) {
|
|
||||||
pInfo->inCheckProcess = 1;
|
|
||||||
} else {
|
|
||||||
ASSERT(pInfo->startTs > 0);
|
|
||||||
stError("s-task:%s already in check procedure, checkTs:%"PRId64", start monitor check rsp failed", id, pInfo->startTs);
|
|
||||||
return TSDB_CODE_FAILED;
|
|
||||||
}
|
|
||||||
|
|
||||||
stDebug("s-task:%s set the in-check-procedure flag", id);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t streamTaskCompleteCheckRsp(STaskCheckInfo* pInfo, const char* id) {
|
|
||||||
if (!pInfo->inCheckProcess) {
|
|
||||||
stWarn("s-task:%s already not in-check-procedure", id);
|
|
||||||
}
|
|
||||||
|
|
||||||
int64_t el = (pInfo->startTs != 0) ? (taosGetTimestampMs() - pInfo->startTs) : 0;
|
|
||||||
stDebug("s-task:%s clear the in-check-procedure flag, not in-check-procedure elapsed time:%" PRId64 " ms", id, el);
|
|
||||||
|
|
||||||
pInfo->startTs = 0;
|
|
||||||
pInfo->notReadyTasks = 0;
|
|
||||||
pInfo->inCheckProcess = 0;
|
|
||||||
pInfo->stopCheckProcess = 0;
|
|
||||||
taosArrayClear(pInfo->pList);
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static void doSendCheckMsg(SStreamTask* pTask, SDownstreamStatusInfo* p) {
|
|
||||||
SStreamTaskCheckReq req = {
|
|
||||||
.streamId = pTask->id.streamId,
|
|
||||||
.upstreamTaskId = pTask->id.taskId,
|
|
||||||
.upstreamNodeId = pTask->info.nodeId,
|
|
||||||
.childId = pTask->info.selfChildId,
|
|
||||||
.stage = pTask->pMeta->stage,
|
|
||||||
};
|
|
||||||
|
|
||||||
STaskOutputInfo* pOutputInfo = &pTask->outputInfo;
|
|
||||||
if (pOutputInfo->type == TASK_OUTPUT__FIXED_DISPATCH) {
|
|
||||||
req.reqId = p->reqId;
|
|
||||||
req.downstreamNodeId = pOutputInfo->fixedDispatcher.nodeId;
|
|
||||||
req.downstreamTaskId = pOutputInfo->fixedDispatcher.taskId;
|
|
||||||
stDebug("s-task:%s (vgId:%d) stage:%" PRId64 " re-send check downstream task:0x%x(vgId:%d) reqId:0x%" PRIx64,
|
|
||||||
pTask->id.idStr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, req.reqId);
|
|
||||||
|
|
||||||
streamSendCheckMsg(pTask, &req, pOutputInfo->fixedDispatcher.nodeId, &pOutputInfo->fixedDispatcher.epSet);
|
|
||||||
} else if (pOutputInfo->type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
|
||||||
SArray* vgInfo = pOutputInfo->shuffleDispatcher.dbInfo.pVgroupInfos;
|
|
||||||
int32_t numOfVgs = taosArrayGetSize(vgInfo);
|
|
||||||
|
|
||||||
for (int32_t i = 0; i < numOfVgs; i++) {
|
|
||||||
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
|
|
||||||
|
|
||||||
if (p->taskId == pVgInfo->taskId) {
|
|
||||||
req.reqId = p->reqId;
|
|
||||||
req.downstreamNodeId = pVgInfo->vgId;
|
|
||||||
req.downstreamTaskId = pVgInfo->taskId;
|
|
||||||
|
|
||||||
stDebug("s-task:%s (vgId:%d) stage:%" PRId64
|
|
||||||
" re-send check downstream task:0x%x(vgId:%d) (shuffle), idx:%d reqId:0x%" PRIx64,
|
|
||||||
pTask->id.idStr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, i,
|
|
||||||
p->reqId);
|
|
||||||
streamSendCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
ASSERT(0);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static void getCheckRspStatus(STaskCheckInfo* pInfo, int64_t el, int32_t* numOfReady, int32_t* numOfFault,
|
|
||||||
int32_t* numOfNotRsp, SArray* pTimeoutList, SArray* pNotReadyList, const char* id) {
|
|
||||||
for (int32_t i = 0; i < taosArrayGetSize(pInfo->pList); ++i) {
|
|
||||||
SDownstreamStatusInfo* p = taosArrayGet(pInfo->pList, i);
|
|
||||||
if (p->status == TASK_DOWNSTREAM_READY) {
|
|
||||||
(*numOfReady) += 1;
|
|
||||||
} else if (p->status == TASK_UPSTREAM_NEW_STAGE || p->status == TASK_DOWNSTREAM_NOT_LEADER) {
|
|
||||||
stDebug("s-task:%s recv status:NEW_STAGE/NOT_LEADER from downstream, task:0x%x, quit from check downstream", id,
|
|
||||||
p->taskId);
|
|
||||||
(*numOfFault) += 1;
|
|
||||||
} else { // TASK_DOWNSTREAM_NOT_READY
|
|
||||||
if (p->rspTs == 0) { // not response yet
|
|
||||||
ASSERT(p->status == -1);
|
|
||||||
if (el >= CHECK_NOT_RSP_DURATION) { // not receive info for 10 sec.
|
|
||||||
taosArrayPush(pTimeoutList, &p->taskId);
|
|
||||||
} else { // el < CHECK_NOT_RSP_DURATION
|
|
||||||
(*numOfNotRsp) += 1; // do nothing and continue waiting for their rsp
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
taosArrayPush(pNotReadyList, &p->taskId);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static void rspMonitorFn(void* param, void* tmrId) {
|
|
||||||
SStreamTask* pTask = param;
|
|
||||||
SStreamTaskState* pStat = streamTaskGetStatus(pTask);
|
|
||||||
STaskCheckInfo* pInfo = &pTask->taskCheckInfo;
|
|
||||||
int32_t vgId = pTask->pMeta->vgId;
|
|
||||||
int64_t now = taosGetTimestampMs();
|
|
||||||
int64_t el = now - pInfo->startTs;
|
|
||||||
ETaskStatus state = pStat->state;
|
|
||||||
const char* id = pTask->id.idStr;
|
|
||||||
int32_t numOfReady = 0;
|
|
||||||
int32_t numOfFault = 0;
|
|
||||||
int32_t numOfNotRsp = 0;
|
|
||||||
int32_t numOfNotReady = 0;
|
|
||||||
int32_t numOfTimeout = 0;
|
|
||||||
|
|
||||||
stDebug("s-task:%s start to do check-downstream-rsp check in tmr", id);
|
|
||||||
|
|
||||||
if (state == TASK_STATUS__STOP) {
|
|
||||||
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
|
|
||||||
stDebug("s-task:%s status:%s vgId:%d quit from monitor check-rsp tmr, ref:%d", id, pStat->name, vgId, ref);
|
|
||||||
|
|
||||||
taosThreadMutexLock(&pInfo->checkInfoLock);
|
|
||||||
streamTaskCompleteCheckRsp(pInfo, id);
|
|
||||||
taosThreadMutexUnlock(&pInfo->checkInfoLock);
|
|
||||||
|
|
||||||
streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, pInfo->startTs, now, false);
|
|
||||||
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
|
|
||||||
STaskId* pHId = &pTask->hTaskInfo.id;
|
|
||||||
streamMetaAddTaskLaunchResult(pTask->pMeta, pHId->streamId, pHId->taskId, pInfo->startTs, now, false);
|
|
||||||
}
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (state == TASK_STATUS__DROPPING || state == TASK_STATUS__READY || state == TASK_STATUS__PAUSE) {
|
|
||||||
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
|
|
||||||
stDebug("s-task:%s status:%s vgId:%d quit from monitor check-rsp tmr, ref:%d", id, pStat->name, vgId, ref);
|
|
||||||
|
|
||||||
taosThreadMutexLock(&pInfo->checkInfoLock);
|
|
||||||
streamTaskCompleteCheckRsp(pInfo, id);
|
|
||||||
taosThreadMutexUnlock(&pInfo->checkInfoLock);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
taosThreadMutexLock(&pInfo->checkInfoLock);
|
|
||||||
if (pInfo->notReadyTasks == 0) {
|
|
||||||
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
|
|
||||||
stDebug("s-task:%s status:%s vgId:%d all downstream ready, quit from monitor rsp tmr, ref:%d", id, pStat->name,
|
|
||||||
vgId, ref);
|
|
||||||
|
|
||||||
streamTaskCompleteCheckRsp(pInfo, id);
|
|
||||||
taosThreadMutexUnlock(&pInfo->checkInfoLock);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
SArray* pNotReadyList = taosArrayInit(4, sizeof(int64_t));
|
|
||||||
SArray* pTimeoutList = taosArrayInit(4, sizeof(int64_t));
|
|
||||||
|
|
||||||
if (pStat->state == TASK_STATUS__UNINIT) {
|
|
||||||
getCheckRspStatus(pInfo, el, &numOfReady, &numOfFault, &numOfNotRsp, pTimeoutList, pNotReadyList, id);
|
|
||||||
} else { // unexpected status
|
|
||||||
stError("s-task:%s unexpected task status:%s during waiting for check rsp", id, pStat->name);
|
|
||||||
}
|
|
||||||
|
|
||||||
numOfNotReady = (int32_t)taosArrayGetSize(pNotReadyList);
|
|
||||||
numOfTimeout = (int32_t)taosArrayGetSize(pTimeoutList);
|
|
||||||
|
|
||||||
// fault tasks detected, not try anymore
|
|
||||||
ASSERT((numOfReady + numOfFault + numOfNotReady + numOfTimeout + numOfNotRsp) == taosArrayGetSize(pInfo->pList));
|
|
||||||
if (numOfFault > 0) {
|
|
||||||
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
|
|
||||||
stDebug(
|
|
||||||
"s-task:%s status:%s vgId:%d all rsp. quit from monitor rsp tmr, since vnode-transfer/leader-change/restart "
|
|
||||||
"detected, notRsp:%d, notReady:%d, fault:%d, timeout:%d, ready:%d ref:%d",
|
|
||||||
id, pStat->name, vgId, numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady, ref);
|
|
||||||
|
|
||||||
streamTaskCompleteCheckRsp(pInfo, id);
|
|
||||||
taosThreadMutexUnlock(&pInfo->checkInfoLock);
|
|
||||||
|
|
||||||
taosArrayDestroy(pNotReadyList);
|
|
||||||
taosArrayDestroy(pTimeoutList);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
// checking of downstream tasks has been stopped by other threads
|
|
||||||
if (pInfo->stopCheckProcess == 1) {
|
|
||||||
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
|
|
||||||
stDebug(
|
|
||||||
"s-task:%s status:%s vgId:%d stopped by other threads to check downstream process, notRsp:%d, notReady:%d, "
|
|
||||||
"fault:%d, timeout:%d, ready:%d ref:%d",
|
|
||||||
id, pStat->name, vgId, numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady, ref);
|
|
||||||
|
|
||||||
streamTaskCompleteCheckRsp(pInfo, id);
|
|
||||||
taosThreadMutexUnlock(&pInfo->checkInfoLock);
|
|
||||||
|
|
||||||
// add the not-ready tasks into the final task status result buf, along with related fill-history task if exists.
|
|
||||||
streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, pInfo->startTs, now, false);
|
|
||||||
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
|
|
||||||
STaskId* pHId = &pTask->hTaskInfo.id;
|
|
||||||
streamMetaAddTaskLaunchResult(pTask->pMeta, pHId->streamId, pHId->taskId, pInfo->startTs, now, false);
|
|
||||||
}
|
|
||||||
|
|
||||||
taosArrayDestroy(pNotReadyList);
|
|
||||||
taosArrayDestroy(pTimeoutList);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (numOfNotReady > 0) { // check to make sure not in recheck timer
|
|
||||||
ASSERT(pTask->status.downstreamReady == 0);
|
|
||||||
|
|
||||||
// reset the info, and send the check msg to failure downstream again
|
|
||||||
for (int32_t i = 0; i < numOfNotReady; ++i) {
|
|
||||||
int32_t taskId = *(int32_t*)taosArrayGet(pNotReadyList, i);
|
|
||||||
|
|
||||||
SDownstreamStatusInfo* p = findCheckRspStatus(pInfo, taskId);
|
|
||||||
if (p != NULL) {
|
|
||||||
p->rspTs = 0;
|
|
||||||
p->status = -1;
|
|
||||||
doSendCheckMsg(pTask, p);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
stDebug("s-task:%s %d downstream task(s) not ready, send check msg again", id, numOfNotReady);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (numOfTimeout > 0) {
|
|
||||||
pInfo->startTs = now;
|
|
||||||
ASSERT(pTask->status.downstreamReady == 0);
|
|
||||||
|
|
||||||
for (int32_t i = 0; i < numOfTimeout; ++i) {
|
|
||||||
int32_t taskId = *(int32_t*)taosArrayGet(pTimeoutList, i);
|
|
||||||
|
|
||||||
SDownstreamStatusInfo* p = findCheckRspStatus(pInfo, taskId);
|
|
||||||
if (p != NULL) {
|
|
||||||
ASSERT(p->status == -1 && p->rspTs == 0);
|
|
||||||
doSendCheckMsg(pTask, p);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
stDebug("s-task:%s %d downstream tasks timeout, send check msg again, start ts:%" PRId64, id, numOfTimeout, now);
|
|
||||||
}
|
|
||||||
|
|
||||||
taosTmrReset(rspMonitorFn, CHECK_RSP_INTERVAL, pTask, streamTimer, &pInfo->checkRspTmr);
|
|
||||||
taosThreadMutexUnlock(&pInfo->checkInfoLock);
|
|
||||||
|
|
||||||
stDebug("s-task:%s continue checking rsp in 300ms, notRsp:%d, notReady:%d, fault:%d, timeout:%d, ready:%d", id,
|
|
||||||
numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady);
|
|
||||||
|
|
||||||
taosArrayDestroy(pNotReadyList);
|
|
||||||
taosArrayDestroy(pTimeoutList);
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t streamTaskStartMonitorCheckRsp(SStreamTask* pTask) {
|
|
||||||
STaskCheckInfo* pInfo = &pTask->taskCheckInfo;
|
|
||||||
|
|
||||||
taosThreadMutexLock(&pInfo->checkInfoLock);
|
|
||||||
int32_t code = streamTaskStartCheckDownstream(pInfo, pTask->id.idStr);
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
|
||||||
|
|
||||||
taosThreadMutexUnlock(&pInfo->checkInfoLock);
|
|
||||||
return TSDB_CODE_FAILED;
|
|
||||||
}
|
|
||||||
|
|
||||||
streamTaskInitTaskCheckInfo(pInfo, &pTask->outputInfo, taosGetTimestampMs());
|
|
||||||
|
|
||||||
int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
|
|
||||||
stDebug("s-task:%s start check rsp monit, ref:%d ", pTask->id.idStr, ref);
|
|
||||||
|
|
||||||
if (pInfo->checkRspTmr == NULL) {
|
|
||||||
pInfo->checkRspTmr = taosTmrStart(rspMonitorFn, CHECK_RSP_INTERVAL, pTask, streamTimer);
|
|
||||||
} else {
|
|
||||||
taosTmrReset(rspMonitorFn, CHECK_RSP_INTERVAL, pTask, streamTimer, &pInfo->checkRspTmr);
|
|
||||||
}
|
|
||||||
|
|
||||||
taosThreadMutexUnlock(&pInfo->checkInfoLock);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t streamTaskStopMonitorCheckRsp(STaskCheckInfo* pInfo, const char* id) {
|
|
||||||
taosThreadMutexLock(&pInfo->checkInfoLock);
|
|
||||||
streamTaskCompleteCheckRsp(pInfo, id);
|
|
||||||
|
|
||||||
pInfo->stopCheckProcess = 1;
|
|
||||||
taosThreadMutexUnlock(&pInfo->checkInfoLock);
|
|
||||||
|
|
||||||
stDebug("s-task:%s set stop check rsp mon", id);
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
void streamTaskCleanCheckInfo(STaskCheckInfo* pInfo) {
|
|
||||||
ASSERT(pInfo->inCheckProcess == 0);
|
|
||||||
|
|
||||||
pInfo->pList = taosArrayDestroy(pInfo->pList);
|
|
||||||
if (pInfo->checkRspTmr != NULL) {
|
|
||||||
/*bool ret = */ taosTmrStop(pInfo->checkRspTmr);
|
|
||||||
pInfo->checkRspTmr = NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
taosThreadMutexDestroy(&pInfo->checkInfoLock);
|
|
||||||
}
|
|
Loading…
Reference in New Issue