enh(stream): using term from sync module to denote the upstream node is restarted/replica-modified/vnode-transferred or not.

This commit is contained in:
Haojun Liao 2023-08-15 09:25:05 +08:00
parent 1f792f09c4
commit b1d35f43ba
10 changed files with 97 additions and 64 deletions

View File

@ -120,12 +120,11 @@ typedef struct {
typedef struct { typedef struct {
int8_t type; int8_t type;
int64_t nodeId; // nodeId, from SStreamMeta
int32_t srcVgId; int32_t srcVgId;
int32_t srcTaskId; int32_t srcTaskId;
int64_t sourceVer; int64_t sourceVer;
int64_t reqId; int64_t reqId;
SArray* blocks; // SArray<SSDataBlock> SArray* blocks; // SArray<SSDataBlock>
} SStreamDataBlock; } SStreamDataBlock;
@ -250,6 +249,7 @@ typedef struct SStreamChildEpInfo {
int32_t taskId; int32_t taskId;
SEpSet epSet; SEpSet epSet;
bool dataAllowed; // denote if the data from this upstream task is allowed to put into inputQ, not serialize it bool dataAllowed; // denote if the data from this upstream task is allowed to put into inputQ, not serialize it
int64_t stage; // upstream task stage value, to denote if the upstream node has restart/replica changed/transfer
} SStreamChildEpInfo; } SStreamChildEpInfo;
typedef struct SStreamId { typedef struct SStreamId {
@ -272,7 +272,6 @@ typedef struct SStreamStatus {
bool transferState; bool transferState;
int8_t timerActive; // timer is active int8_t timerActive; // timer is active
int8_t pauseAllowed; // allowed task status to be set to be paused int8_t pauseAllowed; // allowed task status to be set to be paused
int32_t stage; // rollback will increase this attribute one for each time
} SStreamStatus; } SStreamStatus;
typedef struct SHistDataRange { typedef struct SHistDataRange {
@ -379,6 +378,7 @@ typedef struct SStreamMeta {
TXN* txn; TXN* txn;
FTaskExpand* expandFunc; FTaskExpand* expandFunc;
int32_t vgId; int32_t vgId;
int64_t stage;
SRWLatch lock; SRWLatch lock;
int32_t walScanCounter; int32_t walScanCounter;
void* streamBackend; void* streamBackend;
@ -420,6 +420,7 @@ typedef struct {
int64_t streamId; int64_t streamId;
int32_t taskId; int32_t taskId;
int32_t type; int32_t type;
int64_t stage; //nodeId from upstream task
int32_t srcVgId; int32_t srcVgId;
int32_t upstreamTaskId; int32_t upstreamTaskId;
int32_t upstreamChildId; int32_t upstreamChildId;
@ -459,13 +460,13 @@ typedef struct {
typedef struct { typedef struct {
int64_t reqId; int64_t reqId;
int64_t stage;
int64_t streamId; int64_t streamId;
int32_t upstreamNodeId; int32_t upstreamNodeId;
int32_t upstreamTaskId; int32_t upstreamTaskId;
int32_t downstreamNodeId; int32_t downstreamNodeId;
int32_t downstreamTaskId; int32_t downstreamTaskId;
int32_t childId; int32_t childId;
int32_t stage;
} SStreamTaskCheckReq; } SStreamTaskCheckReq;
typedef struct { typedef struct {
@ -476,7 +477,7 @@ typedef struct {
int32_t downstreamNodeId; int32_t downstreamNodeId;
int32_t downstreamTaskId; int32_t downstreamTaskId;
int32_t childId; int32_t childId;
int32_t stage; int32_t oldStage;
int8_t status; int8_t status;
} SStreamTaskCheckRsp; } SStreamTaskCheckRsp;
@ -626,7 +627,6 @@ bool streamTaskShouldPause(const SStreamStatus* pStatus);
bool streamTaskIsIdle(const SStreamTask* pTask); bool streamTaskIsIdle(const SStreamTask* pTask);
int32_t streamTaskEndScanWAL(SStreamTask* pTask); int32_t streamTaskEndScanWAL(SStreamTask* pTask);
SStreamChildEpInfo* streamTaskGetUpstreamTaskEpInfo(SStreamTask* pTask, int32_t taskId);
int32_t streamScanExec(SStreamTask* pTask, int32_t batchSize); int32_t streamScanExec(SStreamTask* pTask, int32_t batchSize);
void initRpcMsg(SRpcMsg* pMsg, int32_t msgType, void* pCont, int32_t contLen); void initRpcMsg(SRpcMsg* pMsg, int32_t msgType, void* pCont, int32_t contLen);
@ -634,9 +634,8 @@ char* createStreamTaskIdStr(int64_t streamId, int32_t taskId);
// recover and fill history // recover and fill history
void streamTaskCheckDownstreamTasks(SStreamTask* pTask); void streamTaskCheckDownstreamTasks(SStreamTask* pTask);
int32_t streamTaskDoCheckDownstreamTasks(SStreamTask* pTask);
int32_t streamTaskLaunchScanHistory(SStreamTask* pTask); int32_t streamTaskLaunchScanHistory(SStreamTask* pTask);
int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t stage); int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_t vgId, int64_t stage);
int32_t streamTaskRestart(SStreamTask* pTask, const char* pDir, bool startTask); int32_t streamTaskRestart(SStreamTask* pTask, const char* pDir, bool startTask);
int32_t streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList); int32_t streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList);
int32_t streamTaskStop(SStreamTask* pTask); int32_t streamTaskStop(SStreamTask* pTask);
@ -681,7 +680,7 @@ int32_t streamProcessScanHistoryFinishRsp(SStreamTask* pTask);
// stream task meta // stream task meta
void streamMetaInit(); void streamMetaInit();
void streamMetaCleanup(); void streamMetaCleanup();
SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId); SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId, int64_t stage);
void streamMetaClose(SStreamMeta* streamMeta); void streamMetaClose(SStreamMeta* streamMeta);
// save to stream meta store // save to stream meta store

View File

@ -95,7 +95,7 @@ SSnode *sndOpen(const char *path, const SSnodeOpt *pOption) {
} }
pSnode->msgCb = pOption->msgCb; pSnode->msgCb = pOption->msgCb;
pSnode->pMeta = streamMetaOpen(path, pSnode, (FTaskExpand *)sndExpandTask, SNODE_HANDLE); pSnode->pMeta = streamMetaOpen(path, pSnode, (FTaskExpand *)sndExpandTask, SNODE_HANDLE, -1);
if (pSnode->pMeta == NULL) { if (pSnode->pMeta == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
goto FAIL; goto FAIL;
@ -325,7 +325,7 @@ int32_t sndProcessStreamTaskCheckReq(SSnode *pSnode, SRpcMsg *pMsg) {
SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, req.streamId, taskId); SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, req.streamId, taskId);
if (pTask != NULL) { if (pTask != NULL) {
rsp.status = streamTaskCheckStatus(pTask, req.stage); rsp.status = streamTaskCheckStatus(pTask, req.upstreamTaskId, req.upstreamNodeId, req.stage);
streamMetaReleaseTask(pSnode->pMeta, pTask); streamMetaReleaseTask(pSnode->pMeta, pTask);
const char* pStatus = streamGetTaskStatusStr(pTask->status.taskStatus); const char* pStatus = streamGetTaskStatusStr(pTask->status.taskStatus);

View File

@ -225,6 +225,7 @@ int32_t tqProcessStreamCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessStreamTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessStreamTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqCheckStreamStatus(STQ* pTq); int32_t tqCheckStreamStatus(STQ* pTq);
int64_t tqGetNodeStage(STQ* pTq);
int tqCommit(STQ*); int tqCommit(STQ*);
int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd); int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd);

View File

@ -128,7 +128,8 @@ int32_t tqInitialize(STQ* pTq) {
return -1; return -1;
} }
pTq->pStreamMeta = streamMetaOpen(pTq->path, pTq, (FTaskExpand*)tqExpandTask, pTq->pVnode->config.vgId); int64_t stage = tqGetNodeStage(pTq);
pTq->pStreamMeta = streamMetaOpen(pTq->path, pTq, (FTaskExpand*)tqExpandTask, pTq->pVnode->config.vgId, stage);
if (pTq->pStreamMeta == NULL) { if (pTq->pStreamMeta == NULL) {
return -1; return -1;
} }
@ -1064,16 +1065,14 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) {
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.streamId, taskId); SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.streamId, taskId);
if (pTask != NULL) { if (pTask != NULL) {
rsp.status = streamTaskCheckStatus(pTask, req.stage); rsp.status = streamTaskCheckStatus(pTask, req.upstreamTaskId, req.upstreamNodeId, req.stage);
rsp.stage = pTask->status.stage;
streamMetaReleaseTask(pTq->pStreamMeta, pTask); streamMetaReleaseTask(pTq->pStreamMeta, pTask);
const char* pStatus = streamGetTaskStatusStr(pTask->status.taskStatus); const char* pStatus = streamGetTaskStatusStr(pTask->status.taskStatus);
tqDebug("s-task:%s status:%s, stage:%d recv task check req(reqId:0x%" PRIx64 ") task:0x%x (vgId:%d), ready:%d", tqDebug("s-task:%s status:%s, stage:%d recv task check req(reqId:0x%" PRIx64 ") task:0x%x (vgId:%d), ready:%d",
pTask->id.idStr, pStatus, rsp.stage, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status); pTask->id.idStr, pStatus, rsp.oldStage, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);
} else { } else {
rsp.status = 0; rsp.status = 0;
rsp.stage = 0;
tqDebug("tq recv task check(taskId:0x%" PRIx64 "-0x%x not built yet) req(reqId:0x%" PRIx64 tqDebug("tq recv task check(taskId:0x%" PRIx64 "-0x%x not built yet) req(reqId:0x%" PRIx64
") from task:0x%x (vgId:%d), rsp status %d", ") from task:0x%x (vgId:%d), rsp status %d",
req.streamId, taskId, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status); req.streamId, taskId, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);
@ -1928,8 +1927,11 @@ int32_t tqProcessTaskStopReq(STQ* pTq, SRpcMsg* pMsg) {
} }
streamMetaReleaseTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pTask);
// tDecoderClear(&decoder);
tmsgSendRsp(&rsp); tmsgSendRsp(&rsp);
return 0; return 0;
} }
int64_t tqGetNodeStage(STQ* pTq) {
SSyncState state = syncGetState(pTq->pVnode->sync);
return state.term;
}

View File

@ -26,6 +26,12 @@ int32_t tqStreamTasksScanWal(STQ* pTq) {
SStreamMeta* pMeta = pTq->pStreamMeta; SStreamMeta* pMeta = pTq->pStreamMeta;
int64_t st = taosGetTimestampMs(); int64_t st = taosGetTimestampMs();
// update the cached nodeId when reading data from WAL files.
int64_t nodeStage = tqGetNodeStage(pTq);
if (pMeta->stage != nodeStage) {
pMeta->stage = nodeStage;
}
while (1) { while (1) {
int32_t scan = pMeta->walScanCounter; int32_t scan = pMeta->walScanCounter;
tqDebug("vgId:%d continue check if data in wal are available, walScanCounter:%d", vgId, scan); tqDebug("vgId:%d continue check if data in wal are available, walScanCounter:%d", vgId, scan);
@ -80,13 +86,7 @@ int32_t tqStreamTasksStatusCheck(STQ* pTq) {
continue; continue;
} }
if (pTask->info.fillHistory == 1) { streamTaskCheckDownstreamTasks(pTask);
tqDebug("s-task:%s fill-history task, wait for related stream task:0x%x to launch it", pTask->id.idStr,
pTask->streamTaskId.taskId);
continue;
}
streamTaskDoCheckDownstreamTasks(pTask);
streamMetaReleaseTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pTask);
} }
@ -285,6 +285,7 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) {
continue; continue;
} }
// downstream task has blocked the output, stopped for a while
if (pTask->inputStatus == TASK_INPUT_STATUS__BLOCKED) { if (pTask->inputStatus == TASK_INPUT_STATUS__BLOCKED) {
tqDebug("s-task:%s inputQ is blocked, do nothing", pTask->id.idStr); tqDebug("s-task:%s inputQ is blocked, do nothing", pTask->id.idStr);
streamMetaReleaseTask(pStreamMeta, pTask); streamMetaReleaseTask(pStreamMeta, pTask);

View File

@ -241,14 +241,21 @@ int32_t streamTaskOutputResultBlock(SStreamTask* pTask, SStreamDataBlock* pBlock
int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pRsp, bool exec) { int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pRsp, bool exec) {
qDebug("s-task:%s receive dispatch msg from taskId:0x%x(vgId:%d), msgLen:%" PRId64, pTask->id.idStr, qDebug("s-task:%s receive dispatch msg from taskId:0x%x(vgId:%d), msgLen:%" PRId64, pTask->id.idStr,
pReq->upstreamTaskId, pReq->upstreamNodeId, pReq->totalLen); pReq->upstreamTaskId, pReq->upstreamNodeId, pReq->totalLen);
int32_t status = 0; int32_t status = 0;
SStreamChildEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, pReq->upstreamTaskId); SStreamChildEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, pReq->upstreamTaskId);
ASSERT(pInfo != NULL); ASSERT(pInfo != NULL);
// upstream task has restarted/leader-follower switch/transferred to other dnodes
if (pReq->stage > pInfo->stage) {
qError("s-task:%s upstream task:0x%x (vgId:%d) has restart/leader-switch/vnode-transfer, prev stage:%" PRId64
", current:%" PRId64 " dispatch msg rejected",
pTask->id.idStr, pReq->upstreamTaskId, pReq->upstreamNodeId, pInfo->stage, pReq->stage);
status = TASK_INPUT_STATUS__BLOCKED;
} else {
if (!pInfo->dataAllowed) { if (!pInfo->dataAllowed) {
qWarn("s-task:%s data from task:0x%x is denied, since inputQ is closed for it", pTask->id.idStr, pReq->upstreamTaskId); qWarn("s-task:%s data from task:0x%x is denied, since inputQ is closed for it", pTask->id.idStr,
pReq->upstreamTaskId);
status = TASK_INPUT_STATUS__BLOCKED; status = TASK_INPUT_STATUS__BLOCKED;
} else { } else {
// Current task has received the checkpoint req from the upstream task, from which the message should all be blocked // Current task has received the checkpoint req from the upstream task, from which the message should all be blocked
@ -259,6 +266,7 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S
status = streamTaskAppendInputBlocks(pTask, pReq); status = streamTaskAppendInputBlocks(pTask, pReq);
} }
}
{ {
// do send response with the input status // do send response with the input status

View File

@ -49,6 +49,7 @@ void initRpcMsg(SRpcMsg* pMsg, int32_t msgType, void* pCont, int32_t contLen) {
int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq) { int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq) {
if (tStartDecode(pDecoder) < 0) return -1; if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeI64(pDecoder, &pReq->stage) < 0) return -1;
if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1; if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->taskId) < 0) return -1; if (tDecodeI32(pDecoder, &pReq->taskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->upstreamTaskId) < 0) return -1; if (tDecodeI32(pDecoder, &pReq->upstreamTaskId) < 0) return -1;
@ -81,6 +82,7 @@ static int32_t tInitStreamDispatchReq(SStreamDispatchReq* pReq, const SStreamTas
int32_t numOfBlocks, int64_t dstTaskId, int32_t type) { int32_t numOfBlocks, int64_t dstTaskId, int32_t type) {
pReq->streamId = pTask->id.streamId; pReq->streamId = pTask->id.streamId;
pReq->srcVgId = vgId; pReq->srcVgId = vgId;
pReq->stage = pTask->pMeta->stage;
pReq->upstreamTaskId = pTask->id.taskId; pReq->upstreamTaskId = pTask->id.taskId;
pReq->upstreamChildId = pTask->info.selfChildId; pReq->upstreamChildId = pTask->info.selfChildId;
pReq->upstreamNodeId = pTask->info.nodeId; pReq->upstreamNodeId = pTask->info.nodeId;
@ -457,27 +459,27 @@ int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, S
int32_t streamDispatchStreamBlock(SStreamTask* pTask) { int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
ASSERT((pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH)); ASSERT((pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH));
const char* id = pTask->id.idStr;
int32_t numOfElems = taosQueueItemSize(pTask->outputInfo.queue->queue); int32_t numOfElems = taosQueueItemSize(pTask->outputInfo.queue->queue);
if (numOfElems > 0) { if (numOfElems > 0) {
qDebug("s-task:%s try to dispatch intermediate block to downstream, elem in outputQ:%d", pTask->id.idStr, qDebug("s-task:%s try to dispatch intermediate block to downstream, elem in outputQ:%d", id, numOfElems);
numOfElems);
} }
// to make sure only one dispatch is running // to make sure only one dispatch is running
int8_t old = int8_t old =
atomic_val_compare_exchange_8(&pTask->outputInfo.status, TASK_OUTPUT_STATUS__NORMAL, TASK_OUTPUT_STATUS__WAIT); atomic_val_compare_exchange_8(&pTask->outputInfo.status, TASK_OUTPUT_STATUS__NORMAL, TASK_OUTPUT_STATUS__WAIT);
if (old != TASK_OUTPUT_STATUS__NORMAL) { if (old != TASK_OUTPUT_STATUS__NORMAL) {
qDebug("s-task:%s wait for dispatch rsp, not dispatch now, output status:%d", pTask->id.idStr, old); qDebug("s-task:%s wait for dispatch rsp, not dispatch now, output status:%d", id, old);
return 0; return 0;
} }
ASSERT(pTask->msgInfo.pData == NULL); ASSERT(pTask->msgInfo.pData == NULL);
qDebug("s-task:%s start to dispatch msg, set output status:%d", pTask->id.idStr, pTask->outputInfo.status); qDebug("s-task:%s start to dispatch msg, set output status:%d", id, pTask->outputInfo.status);
SStreamDataBlock* pBlock = streamQueueNextItem(pTask->outputInfo.queue); SStreamDataBlock* pBlock = streamQueueNextItem(pTask->outputInfo.queue);
if (pBlock == NULL) { if (pBlock == NULL) {
atomic_store_8(&pTask->outputInfo.status, TASK_OUTPUT_STATUS__NORMAL); atomic_store_8(&pTask->outputInfo.status, TASK_OUTPUT_STATUS__NORMAL);
qDebug("s-task:%s not dispatch since no elems in outputQ, output status:%d", pTask->id.idStr, pTask->outputInfo.status); qDebug("s-task:%s not dispatch since no elems in outputQ, output status:%d", id, pTask->outputInfo.status);
return 0; return 0;
} }
@ -492,7 +494,7 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
break; break;
} }
qDebug("s-task:%s failed to dispatch msg to downstream, code:%s, output status:%d, retry cnt:%d", pTask->id.idStr, qDebug("s-task:%s failed to dispatch msg to downstream, code:%s, output status:%d, retry cnt:%d", id,
tstrerror(terrno), pTask->outputInfo.status, retryCount); tstrerror(terrno), pTask->outputInfo.status, retryCount);
// todo deal with only partially success dispatch case // todo deal with only partially success dispatch case
@ -581,11 +583,12 @@ int32_t streamTaskSendCheckpointSourceRsp(SStreamTask* pTask) {
int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatchReq* pReq) { int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatchReq* pReq) {
if (tStartEncode(pEncoder) < 0) return -1; if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->stage) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1; if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->srcVgId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->upstreamTaskId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->upstreamTaskId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->type) < 0) return -1; if (tEncodeI32(pEncoder, pReq->type) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->srcVgId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->upstreamChildId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->upstreamChildId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->upstreamNodeId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->upstreamNodeId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->blockNum) < 0) return -1; if (tEncodeI32(pEncoder, pReq->blockNum) < 0) return -1;

View File

@ -13,6 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include <libs/sync/sync.h>
#include "executor.h" #include "executor.h"
#include "streamBackendRocksdb.h" #include "streamBackendRocksdb.h"
#include "streamInt.h" #include "streamInt.h"
@ -38,7 +39,7 @@ void streamMetaCleanup() {
taosCloseRef(streamBackendCfWrapperId); taosCloseRef(streamBackendCfWrapperId);
} }
SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId) { SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId, int64_t stage) {
int32_t code = -1; int32_t code = -1;
SStreamMeta* pMeta = taosMemoryCalloc(1, sizeof(SStreamMeta)); SStreamMeta* pMeta = taosMemoryCalloc(1, sizeof(SStreamMeta));
if (pMeta == NULL) { if (pMeta == NULL) {
@ -92,6 +93,7 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
pMeta->vgId = vgId; pMeta->vgId = vgId;
pMeta->ahandle = ahandle; pMeta->ahandle = ahandle;
pMeta->expandFunc = expandFunc; pMeta->expandFunc = expandFunc;
pMeta->stage = stage;
// send heartbeat every 20sec. // send heartbeat every 20sec.
pMeta->hbTmr = taosTmrStart(metaHbToMnode, 20000, pMeta, streamEnv.timer); pMeta->hbTmr = taosTmrStart(metaHbToMnode, 20000, pMeta, streamEnv.timer);

View File

@ -13,10 +13,11 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include <tstream.h>
#include "streamInt.h" #include "streamInt.h"
#include "trpc.h"
#include "ttimer.h" #include "ttimer.h"
#include "wal.h" #include "wal.h"
#include "trpc.h"
typedef struct SStreamTaskRetryInfo { typedef struct SStreamTaskRetryInfo {
SStreamMeta* pMeta; SStreamMeta* pMeta;
@ -116,7 +117,7 @@ int32_t streamTaskDoCheckDownstreamTasks(SStreamTask* pTask) {
.upstreamTaskId = pTask->id.taskId, .upstreamTaskId = pTask->id.taskId,
.upstreamNodeId = pTask->info.nodeId, .upstreamNodeId = pTask->info.nodeId,
.childId = pTask->info.selfChildId, .childId = pTask->info.selfChildId,
.stage = pTask->status.stage, .stage = pTask->pMeta->stage,
}; };
// serialize // serialize
@ -174,7 +175,7 @@ int32_t streamRecheckDownstream(SStreamTask* pTask, const SStreamTaskCheckRsp* p
.downstreamTaskId = pRsp->downstreamTaskId, .downstreamTaskId = pRsp->downstreamTaskId,
.downstreamNodeId = pRsp->downstreamNodeId, .downstreamNodeId = pRsp->downstreamNodeId,
.childId = pRsp->childId, .childId = pRsp->childId,
.stage = pTask->status.stage, .stage = pTask->pMeta->stage,
}; };
qDebug("s-task:%s (vgId:%d) check downstream task:0x%x (vgId:%d) (recheck)", pTask->id.idStr, pTask->info.nodeId, qDebug("s-task:%s (vgId:%d) check downstream task:0x%x (vgId:%d) (recheck)", pTask->id.idStr, pTask->info.nodeId,
@ -197,8 +198,21 @@ int32_t streamRecheckDownstream(SStreamTask* pTask, const SStreamTaskCheckRsp* p
return 0; return 0;
} }
int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t stage) { int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_t vgId, int64_t stage) {
return ((pTask->status.downstreamReady == 1) && (pTask->status.stage == stage))? 1:0; SStreamChildEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, upstreamTaskId);
ASSERT(pInfo != NULL && pInfo->stage <= stage);
if (pInfo->stage == -1) {
pInfo->stage = stage;
qDebug("s-task:%s receive msg from upstream task:0x%x, init stage value:%"PRId64, pTask->id.idStr, upstreamTaskId, stage);
}
if (pInfo->stage < stage) {
qError("s-task:%s receive msg from upstream task:0x%x(vgId:%d), new stage received:%" PRId64 ", prev:%" PRId64,
pTask->id.idStr, vgId, stage, pInfo->stage);
}
return ((pTask->status.downstreamReady == 1) && (pInfo->stage == upstreamTaskId))? 1:0;
} }
static void doProcessDownstreamReadyRsp(SStreamTask* pTask, int32_t numOfReqs) { static void doProcessDownstreamReadyRsp(SStreamTask* pTask, int32_t numOfReqs) {
@ -266,7 +280,7 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
} }
} else { // not ready, wait for 100ms and retry } else { // not ready, wait for 100ms and retry
qDebug("s-task:%s downstream taskId:0x%x (vgId:%d) not ready, stage:%d, wait for 100ms and retry", id, qDebug("s-task:%s downstream taskId:0x%x (vgId:%d) not ready, stage:%d, wait for 100ms and retry", id,
pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->stage); pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->oldStage);
taosMsleep(100); taosMsleep(100);
streamRecheckDownstream(pTask, pRsp); streamRecheckDownstream(pTask, pRsp);
} }
@ -664,7 +678,7 @@ int32_t tEncodeStreamTaskCheckReq(SEncoder* pEncoder, const SStreamTaskCheckReq*
if (tEncodeI32(pEncoder, pReq->downstreamNodeId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->downstreamNodeId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->downstreamTaskId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->downstreamTaskId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->childId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->childId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->stage) < 0) return -1; if (tEncodeI64(pEncoder, pReq->stage) < 0) return -1;
tEndEncode(pEncoder); tEndEncode(pEncoder);
return pEncoder->pos; return pEncoder->pos;
} }
@ -678,7 +692,7 @@ int32_t tDecodeStreamTaskCheckReq(SDecoder* pDecoder, SStreamTaskCheckReq* pReq)
if (tDecodeI32(pDecoder, &pReq->downstreamNodeId) < 0) return -1; if (tDecodeI32(pDecoder, &pReq->downstreamNodeId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->downstreamTaskId) < 0) return -1; if (tDecodeI32(pDecoder, &pReq->downstreamTaskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->childId) < 0) return -1; if (tDecodeI32(pDecoder, &pReq->childId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->stage) < 0) return -1; if (tDecodeI64(pDecoder, &pReq->stage) < 0) return -1;
tEndDecode(pDecoder); tEndDecode(pDecoder);
return 0; return 0;
} }
@ -692,7 +706,7 @@ int32_t tEncodeStreamTaskCheckRsp(SEncoder* pEncoder, const SStreamTaskCheckRsp*
if (tEncodeI32(pEncoder, pRsp->downstreamNodeId) < 0) return -1; if (tEncodeI32(pEncoder, pRsp->downstreamNodeId) < 0) return -1;
if (tEncodeI32(pEncoder, pRsp->downstreamTaskId) < 0) return -1; if (tEncodeI32(pEncoder, pRsp->downstreamTaskId) < 0) return -1;
if (tEncodeI32(pEncoder, pRsp->childId) < 0) return -1; if (tEncodeI32(pEncoder, pRsp->childId) < 0) return -1;
if (tEncodeI32(pEncoder, pRsp->stage) < 0) return -1; if (tEncodeI32(pEncoder, pRsp->oldStage) < 0) return -1;
if (tEncodeI8(pEncoder, pRsp->status) < 0) return -1; if (tEncodeI8(pEncoder, pRsp->status) < 0) return -1;
tEndEncode(pEncoder); tEndEncode(pEncoder);
return pEncoder->pos; return pEncoder->pos;
@ -707,7 +721,7 @@ int32_t tDecodeStreamTaskCheckRsp(SDecoder* pDecoder, SStreamTaskCheckRsp* pRsp)
if (tDecodeI32(pDecoder, &pRsp->downstreamNodeId) < 0) return -1; if (tDecodeI32(pDecoder, &pRsp->downstreamNodeId) < 0) return -1;
if (tDecodeI32(pDecoder, &pRsp->downstreamTaskId) < 0) return -1; if (tDecodeI32(pDecoder, &pRsp->downstreamTaskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pRsp->childId) < 0) return -1; if (tDecodeI32(pDecoder, &pRsp->childId) < 0) return -1;
if (tDecodeI32(pDecoder, &pRsp->stage) < 0) return -1; if (tDecodeI32(pDecoder, &pRsp->oldStage) < 0) return -1;
if (tDecodeI8(pDecoder, &pRsp->status) < 0) return -1; if (tDecodeI8(pDecoder, &pRsp->status) < 0) return -1;
tEndDecode(pDecoder); tEndDecode(pDecoder);
return 0; return 0;
@ -785,15 +799,13 @@ void launchFillHistoryTask(SStreamTask* pTask) {
streamLaunchFillHistoryTask(pTask); streamLaunchFillHistoryTask(pTask);
} }
// only the downstream tasks are ready, set the task to be ready to work.
void streamTaskCheckDownstreamTasks(SStreamTask* pTask) { void streamTaskCheckDownstreamTasks(SStreamTask* pTask) {
if (pTask->info.fillHistory) { if (pTask->info.fillHistory) {
qDebug("s-task:%s fill history task, wait for being launched", pTask->id.idStr); qDebug("s-task:%s fill history task, wait for being launched", pTask->id.idStr);
return; return;
} }
ASSERT(pTask->status.downstreamReady == 0); ASSERT(pTask->status.downstreamReady == 0);
// check downstream tasks for itself
streamTaskDoCheckDownstreamTasks(pTask); streamTaskDoCheckDownstreamTasks(pTask);
} }

View File

@ -13,9 +13,11 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "tmisce.h" #include "sync.h"
#include "tstream.h"
#include "executor.h" #include "executor.h"
#include "streamInt.h" #include "streamInt.h"
#include "tmisce.h"
#include "tstream.h" #include "tstream.h"
#include "ttimer.h" #include "ttimer.h"
#include "wal.h" #include "wal.h"
@ -60,6 +62,7 @@ int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo)
if (tEncodeI32(pEncoder, pInfo->childId) < 0) return -1; if (tEncodeI32(pEncoder, pInfo->childId) < 0) return -1;
/*if (tEncodeI64(pEncoder, pInfo->processedVer) < 0) return -1;*/ /*if (tEncodeI64(pEncoder, pInfo->processedVer) < 0) return -1;*/
if (tEncodeSEpSet(pEncoder, &pInfo->epSet) < 0) return -1; if (tEncodeSEpSet(pEncoder, &pInfo->epSet) < 0) return -1;
if (tEncodeI64(pEncoder, pInfo->stage) < 0) return -1;
return 0; return 0;
} }
@ -69,6 +72,7 @@ int32_t tDecodeStreamEpInfo(SDecoder* pDecoder, SStreamChildEpInfo* pInfo) {
if (tDecodeI32(pDecoder, &pInfo->childId) < 0) return -1; if (tDecodeI32(pDecoder, &pInfo->childId) < 0) return -1;
/*if (tDecodeI64(pDecoder, &pInfo->processedVer) < 0) return -1;*/ /*if (tDecodeI64(pDecoder, &pInfo->processedVer) < 0) return -1;*/
if (tDecodeSEpSet(pDecoder, &pInfo->epSet) < 0) return -1; if (tDecodeSEpSet(pDecoder, &pInfo->epSet) < 0) return -1;
if (tDecodeI64(pDecoder, &pInfo->stage) < 0) return -1;
return 0; return 0;
} }
@ -382,6 +386,7 @@ static SStreamChildEpInfo* createStreamTaskEpInfo(const SStreamTask* pTask) {
pEpInfo->epSet = pTask->info.epSet; pEpInfo->epSet = pTask->info.epSet;
pEpInfo->nodeId = pTask->info.nodeId; pEpInfo->nodeId = pTask->info.nodeId;
pEpInfo->taskId = pTask->id.taskId; pEpInfo->taskId = pTask->id.taskId;
pEpInfo->stage = -1;
return pEpInfo; return pEpInfo;
} }
@ -467,9 +472,10 @@ int32_t streamTaskStop(SStreamTask* pTask) {
int32_t streamTaskRestart(SStreamTask* pTask, const char* pDir, bool startTask) { int32_t streamTaskRestart(SStreamTask* pTask, const char* pDir, bool startTask) {
const char* id = pTask->id.idStr; const char* id = pTask->id.idStr;
int64_t stage = pTask->pMeta->stage;
int32_t vgId = pTask->pMeta->vgId; int32_t vgId = pTask->pMeta->vgId;
qDebug("s-task:%s vgId:%d restart current task, stage:%d, status:%s, sched-status:%d", id, vgId, pTask->status.stage, qDebug("s-task:%s vgId:%d restart current task, stage:%"PRId64", status:%s, sched-status:%d", id, vgId, stage,
streamGetTaskStatusStr(pTask->status.taskStatus), pTask->status.schedStatus); streamGetTaskStatusStr(pTask->status.taskStatus), pTask->status.schedStatus);
// 1. stop task // 1. stop task
@ -482,8 +488,6 @@ int32_t streamTaskRestart(SStreamTask* pTask, const char* pDir, bool startTask)
taosArrayClear(pTask->pRspMsgList); taosArrayClear(pTask->pRspMsgList);
pTask->status.downstreamReady = 0; pTask->status.downstreamReady = 0;
pTask->status.stage += 1;
streamSetStatusNormal(pTask); streamSetStatusNormal(pTask);
taosWLockLatch(&pTask->pMeta->lock); taosWLockLatch(&pTask->pMeta->lock);
@ -491,8 +495,9 @@ int32_t streamTaskRestart(SStreamTask* pTask, const char* pDir, bool startTask)
streamMetaCommit(pTask->pMeta); streamMetaCommit(pTask->pMeta);
taosWUnLockLatch(&pTask->pMeta->lock); taosWUnLockLatch(&pTask->pMeta->lock);
qDebug("s-task:%s reset downstream status and inc stage to be:%d, status:%s, start to check downstream", id, ASSERT(0);
pTask->status.stage, streamGetTaskStatusStr(pTask->status.taskStatus)); // qDebug("s-task:%s reset downstream status and inc stage to be:%d, status:%s, start to check downstream", id,
// pTask->status.stage, streamGetTaskStatusStr(pTask->status.taskStatus));
// 3. start to check the downstream status // 3. start to check the downstream status
if (startTask) { if (startTask) {