Merge pull request #24097 from taosdata/fix/TD-27834

fix stream backend crash when transfer
This commit is contained in:
Haojun Liao 2023-12-18 23:13:08 +08:00 committed by GitHub
commit 612b50c4cd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 54 additions and 45 deletions

View File

@ -2298,11 +2298,13 @@ int streamStateGetCfIdx(SStreamState* pState, const char* funcName) {
} }
if (pState != NULL && idx != -1) { if (pState != NULL && idx != -1) {
STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
rocksdb_column_family_handle_t* cf = NULL; if (wrapper == NULL) {
return -1;
}
taosThreadMutexLock(&wrapper->mutex); taosThreadMutexLock(&wrapper->mutex);
cf = wrapper->pCf[idx]; rocksdb_column_family_handle_t* cf = wrapper->pCf[idx];
if (cf == NULL) { if (cf == NULL) {
char* err = NULL; char* err = NULL;
cf = rocksdb_create_column_family(wrapper->db, wrapper->pCfOpts[idx], ginitDict[idx].key, &err); cf = rocksdb_create_column_family(wrapper->db, wrapper->pCfOpts[idx], ginitDict[idx].key, &err);

View File

@ -262,9 +262,17 @@ int32_t streamTaskSetDb(SStreamMeta* pMeta, void* arg, char* key) {
} }
STaskDbWrapper* pBackend = taskDbOpen(pMeta->path, key, chkpId); STaskDbWrapper* pBackend = taskDbOpen(pMeta->path, key, chkpId);
while (1) {
if (pBackend == NULL) { if (pBackend == NULL) {
taosThreadMutexUnlock(&pMeta->backendMutex); taosThreadMutexUnlock(&pMeta->backendMutex);
return -1; taosMsleep(1000);
stDebug("backed holded by other task, restart later, path: %s, key: %s", pMeta->path, key);
} else {
taosThreadMutexUnlock(&pMeta->backendMutex);
break;
}
taosThreadMutexLock(&pMeta->backendMutex);
pBackend = taskDbOpen(pMeta->path, key, chkpId);
} }
int64_t tref = taosAddRef(taskDbWrapperId, pBackend); int64_t tref = taosAddRef(taskDbWrapperId, pBackend);

View File

@ -14,10 +14,10 @@
*/ */
#include "streamInt.h" #include "streamInt.h"
#include "streamsm.h"
#include "trpc.h" #include "trpc.h"
#include "ttimer.h" #include "ttimer.h"
#include "wal.h" #include "wal.h"
#include "streamsm.h"
#define SCANHISTORY_IDLE_TIME_SLICE 100 // 100ms #define SCANHISTORY_IDLE_TIME_SLICE 100 // 100ms
#define SCANHISTORY_MAX_IDLE_TIME 10 // 10 sec #define SCANHISTORY_MAX_IDLE_TIME 10 // 10 sec
@ -115,8 +115,7 @@ static void doReExecScanhistory(void* param, void* tmrId) {
// release the task. // release the task.
streamMetaReleaseTask(pTask->pMeta, pTask); streamMetaReleaseTask(pTask->pMeta, pTask);
} else { } else {
taosTmrReset(doReExecScanhistory, SCANHISTORY_IDLE_TIME_SLICE, pTask, streamTimer, taosTmrReset(doReExecScanhistory, SCANHISTORY_IDLE_TIME_SLICE, pTask, streamTimer, &pTask->schedHistoryInfo.pTimer);
&pTask->schedHistoryInfo.pTimer);
} }
} }
@ -135,7 +134,7 @@ int32_t streamReExecScanHistoryFuture(SStreamTask* pTask, int32_t idleDuration)
pTask->schedHistoryInfo.numOfTicks = numOfTicks; pTask->schedHistoryInfo.numOfTicks = numOfTicks;
int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s scan-history resumed in %.2fs, ref:%d", pTask->id.idStr, numOfTicks*0.1, ref); stDebug("s-task:%s scan-history resumed in %.2fs, ref:%d", pTask->id.idStr, numOfTicks * 0.1, ref);
if (pTask->schedHistoryInfo.pTimer == NULL) { if (pTask->schedHistoryInfo.pTimer == NULL) {
pTask->schedHistoryInfo.pTimer = taosTmrStart(doReExecScanhistory, SCANHISTORY_IDLE_TIME_SLICE, pTask, streamTimer); pTask->schedHistoryInfo.pTimer = taosTmrStart(doReExecScanhistory, SCANHISTORY_IDLE_TIME_SLICE, pTask, streamTimer);
@ -290,7 +289,8 @@ static void recheckDownstreamTasks(void* param, void* tmrId) {
stDebug("s-task:%s complete send check in timer, ref:%d", pTask->id.idStr, ref); stDebug("s-task:%s complete send check in timer, ref:%d", pTask->id.idStr, ref);
} }
int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_t vgId, int64_t stage, int64_t* oldStage) { int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_t vgId, int64_t stage,
int64_t* oldStage) {
SStreamChildEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, upstreamTaskId); SStreamChildEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, upstreamTaskId);
ASSERT(pInfo != NULL); ASSERT(pInfo != NULL);
@ -374,7 +374,7 @@ int32_t streamTaskOnScanhistoryTaskReady(SStreamTask* pTask) {
void doProcessDownstreamReadyRsp(SStreamTask* pTask) { void doProcessDownstreamReadyRsp(SStreamTask* pTask) {
EStreamTaskEvent event; EStreamTaskEvent event;
if (pTask->info.fillHistory == 0) { if (pTask->info.fillHistory == 0) {
event = HAS_RELATED_FILLHISTORY_TASK(pTask)? TASK_EVENT_INIT_STREAM_SCANHIST:TASK_EVENT_INIT; event = HAS_RELATED_FILLHISTORY_TASK(pTask) ? TASK_EVENT_INIT_STREAM_SCANHIST : TASK_EVENT_INIT;
} else { } else {
event = TASK_EVENT_INIT_SCANHIST; event = TASK_EVENT_INIT_SCANHIST;
} }
@ -418,7 +418,6 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
if (pRsp->status == TASK_DOWNSTREAM_READY) { if (pRsp->status == TASK_DOWNSTREAM_READY) {
if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
bool found = false; bool found = false;
int32_t numOfReqs = taosArrayGetSize(pTask->checkReqIds); int32_t numOfReqs = taosArrayGetSize(pTask->checkReqIds);
for (int32_t i = 0; i < numOfReqs; i++) { for (int32_t i = 0; i < numOfReqs; i++) {
@ -457,8 +456,9 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
} else { // not ready, wait for 100ms and retry } else { // not ready, wait for 100ms and retry
if (pRsp->status == TASK_UPSTREAM_NEW_STAGE || pRsp->status == TASK_DOWNSTREAM_NOT_LEADER) { if (pRsp->status == TASK_UPSTREAM_NEW_STAGE || pRsp->status == TASK_DOWNSTREAM_NOT_LEADER) {
if (pRsp->status == TASK_UPSTREAM_NEW_STAGE) { if (pRsp->status == TASK_UPSTREAM_NEW_STAGE) {
stError( stError("s-task:%s vgId:%d self vnode-transfer/leader-change/restart detected, old stage:%" PRId64
"s-task:%s vgId:%d self vnode-transfer/leader-change/restart detected, old stage:%"PRId64", current stage:%"PRId64", " ", current stage:%" PRId64
", "
"not check wait for downstream task nodeUpdate, and all tasks restart", "not check wait for downstream task nodeUpdate, and all tasks restart",
id, pRsp->upstreamNodeId, pRsp->oldStage, pTask->pMeta->stage); id, pRsp->upstreamNodeId, pRsp->oldStage, pTask->pMeta->stage);
addIntoNodeUpdateList(pTask, pRsp->upstreamNodeId); addIntoNodeUpdateList(pTask, pRsp->upstreamNodeId);
@ -491,7 +491,7 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
STaskRecheckInfo* pInfo = createRecheckInfo(pTask, pRsp); STaskRecheckInfo* pInfo = createRecheckInfo(pTask, pRsp);
int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s downstream taskId:0x%x (vgId:%d) not ready, stage:%"PRId64", retry in 100ms, ref:%d ", id, stDebug("s-task:%s downstream taskId:0x%x (vgId:%d) not ready, stage:%" PRId64 ", retry in 100ms, ref:%d ", id,
pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->oldStage, ref); pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->oldStage, ref);
pInfo->checkTimer = taosTmrStart(recheckDownstreamTasks, CHECK_DOWNSTREAM_INTERVAL, pInfo, streamTimer); pInfo->checkTimer = taosTmrStart(recheckDownstreamTasks, CHECK_DOWNSTREAM_INTERVAL, pInfo, streamTimer);
} }
@ -501,7 +501,7 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
} }
int32_t streamSendCheckRsp(const SStreamMeta* pMeta, const SStreamTaskCheckReq* pReq, SStreamTaskCheckRsp* pRsp, int32_t streamSendCheckRsp(const SStreamMeta* pMeta, const SStreamTaskCheckReq* pReq, SStreamTaskCheckRsp* pRsp,
SRpcHandleInfo *pRpcInfo, int32_t taskId) { SRpcHandleInfo* pRpcInfo, int32_t taskId) {
SEncoder encoder; SEncoder encoder;
int32_t code; int32_t code;
int32_t len; int32_t len;
@ -538,11 +538,11 @@ int32_t streamRestoreParam(SStreamTask* pTask) {
} }
// source // source
int32_t streamSetParamForStreamScannerStep1(SStreamTask* pTask, SVersionRange *pVerRange, STimeWindow* pWindow) { int32_t streamSetParamForStreamScannerStep1(SStreamTask* pTask, SVersionRange* pVerRange, STimeWindow* pWindow) {
return qStreamSourceScanParamForHistoryScanStep1(pTask->exec.pExecutor, pVerRange, pWindow); return qStreamSourceScanParamForHistoryScanStep1(pTask->exec.pExecutor, pVerRange, pWindow);
} }
int32_t streamSetParamForStreamScannerStep2(SStreamTask* pTask, SVersionRange *pVerRange, STimeWindow* pWindow) { int32_t streamSetParamForStreamScannerStep2(SStreamTask* pTask, SVersionRange* pVerRange, STimeWindow* pWindow) {
return qStreamSourceScanParamForHistoryScanStep2(pTask->exec.pExecutor, pVerRange, pWindow); return qStreamSourceScanParamForHistoryScanStep2(pTask->exec.pExecutor, pVerRange, pWindow);
} }
@ -572,7 +572,7 @@ int32_t streamTaskPutTranstateIntoInputQ(SStreamTask* pTask) {
pBlock->info.rows = 1; pBlock->info.rows = 1;
pBlock->info.childId = pTask->info.selfChildId; pBlock->info.childId = pTask->info.selfChildId;
pTranstate->blocks = taosArrayInit(4, sizeof(SSDataBlock));//pBlock; pTranstate->blocks = taosArrayInit(4, sizeof(SSDataBlock)); // pBlock;
taosArrayPush(pTranstate->blocks, pBlock); taosArrayPush(pTranstate->blocks, pBlock);
taosMemoryFree(pBlock); taosMemoryFree(pBlock);
@ -606,8 +606,8 @@ int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, SStreamScanHistory
ETaskStatus status = streamTaskGetStatus(pTask, &p); ETaskStatus status = streamTaskGetStatus(pTask, &p);
if (status != TASK_STATUS__SCAN_HISTORY && status != TASK_STATUS__STREAM_SCAN_HISTORY) { if (status != TASK_STATUS__SCAN_HISTORY && status != TASK_STATUS__STREAM_SCAN_HISTORY) {
stError("s-task:%s not in scan-history status, status:%s return upstream:0x%x scan-history finish directly", stError("s-task:%s not in scan-history status, status:%s return upstream:0x%x scan-history finish directly", id, p,
id, p, pReq->upstreamTaskId); pReq->upstreamTaskId);
void* pBuf = NULL; void* pBuf = NULL;
int32_t len = 0; int32_t len = 0;
@ -617,8 +617,8 @@ int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, SStreamScanHistory
initRpcMsg(&msg, 0, pBuf, sizeof(SMsgHead) + len); initRpcMsg(&msg, 0, pBuf, sizeof(SMsgHead) + len);
tmsgSendRsp(&msg); tmsgSendRsp(&msg);
stDebug("s-task:%s level:%d notify upstream:0x%x(vgId:%d) to continue process data in WAL", id, stDebug("s-task:%s level:%d notify upstream:0x%x(vgId:%d) to continue process data in WAL", id, taskLevel,
taskLevel, pReq->upstreamTaskId, pReq->upstreamNodeId); pReq->upstreamTaskId, pReq->upstreamNodeId);
return 0; return 0;
} }
@ -647,13 +647,13 @@ int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, SStreamScanHistory
// mnode will not send the pause/resume message to the sink task, so no need to enable the pause for sink tasks. // mnode will not send the pause/resume message to the sink task, so no need to enable the pause for sink tasks.
if (taskLevel == TASK_LEVEL__AGG) { if (taskLevel == TASK_LEVEL__AGG) {
/*int32_t code = */streamTaskScanHistoryDataComplete(pTask); /*int32_t code = */ streamTaskScanHistoryDataComplete(pTask);
} else { // for sink task, set normal } else { // for sink task, set normal
streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_SCANHIST_DONE); streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_SCANHIST_DONE);
} }
} else { } else {
stDebug("s-task:%s receive scan-history data finish msg from upstream:0x%x(index:%d), unfinished:%d", stDebug("s-task:%s receive scan-history data finish msg from upstream:0x%x(index:%d), unfinished:%d", id,
id, pReq->upstreamTaskId, pReq->childId, left); pReq->upstreamTaskId, pReq->childId, left);
} }
return 0; return 0;
@ -697,9 +697,9 @@ static void checkFillhistoryTaskStatus(SStreamTask* pTask, SStreamTask* pHTask)
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
stDebug("s-task:%s set the launch condition for fill-history s-task:%s, window:%" PRId64 " - %" PRId64 stDebug("s-task:%s set the launch condition for fill-history s-task:%s, window:%" PRId64 " - %" PRId64
" verRange:%" PRId64 " - %" PRId64", init:%"PRId64, " verRange:%" PRId64 " - %" PRId64 ", init:%" PRId64,
pTask->id.idStr, pHTask->id.idStr, pRange->window.skey, pRange->window.ekey, pTask->id.idStr, pHTask->id.idStr, pRange->window.skey, pRange->window.ekey, pRange->range.minVer,
pRange->range.minVer, pRange->range.maxVer, pHTask->execInfo.init); pRange->range.maxVer, pHTask->execInfo.init);
} else { } else {
stDebug("s-task:%s no fill-history condition for non-source task:%s", pTask->id.idStr, pHTask->id.idStr); stDebug("s-task:%s no fill-history condition for non-source task:%s", pTask->id.idStr, pHTask->id.idStr);
} }
@ -734,7 +734,6 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) {
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pInfo->id.streamId, pInfo->id.taskId); SStreamTask* pTask = streamMetaAcquireTask(pMeta, pInfo->id.streamId, pInfo->id.taskId);
if (pTask != NULL) { if (pTask != NULL) {
SHistoryTaskInfo* pHTaskInfo = &pTask->hTaskInfo; SHistoryTaskInfo* pHTaskInfo = &pTask->hTaskInfo;
pHTaskInfo->tickCount -= 1; pHTaskInfo->tickCount -= 1;