diff --git a/include/libs/executor/storageapi.h b/include/libs/executor/storageapi.h index 2402182eae..1dbc8f2f76 100644 --- a/include/libs/executor/storageapi.h +++ b/include/libs/executor/storageapi.h @@ -38,8 +38,8 @@ extern "C" { #define META_READER_NOLOCK 0x1 -#define STREAM_STATE_BUFF_HASH 1 -#define STREAM_STATE_BUFF_SORT 2 +#define STREAM_STATE_BUFF_HASH 1 +#define STREAM_STATE_BUFF_SORT 2 typedef struct SMeta SMeta; typedef TSKEY (*GetTsFun)(void*); @@ -102,14 +102,14 @@ typedef struct SMTbCursor { } SMTbCursor; typedef struct SMCtbCursor { - SMeta *pMeta; - void *pCur; + SMeta* pMeta; + void* pCur; tb_uid_t suid; - void *pKey; - void *pVal; + void* pKey; + void* pVal; int kLen; int vLen; - int8_t paused; + int8_t paused; int lock; } SMCtbCursor; @@ -263,22 +263,23 @@ typedef struct SStoreMeta { void* (*storeGetIndexInfo)(); void* (*getInvertIndex)(void* pVnode); // support filter and non-filter cases. [vnodeGetCtbIdList & vnodeGetCtbIdListByFilter] - int32_t (*getChildTableList)( void* pVnode, int64_t suid, SArray* list); + int32_t (*getChildTableList)(void* pVnode, int64_t suid, SArray* list); int32_t (*storeGetTableList)(void* pVnode, int8_t type, SArray* pList); void* storeGetVersionRange; void* storeGetLastTimestamp; int32_t (*getTableSchema)(void* pVnode, int64_t uid, STSchema** pSchema, int64_t* suid); // tsdbGetTableSchema - int32_t (*getNumOfChildTables)( void* pVnode, int64_t uid, int64_t* numOfTables, int32_t* numOfCols); - void (*getBasicInfo)(void* pVnode, const char** dbname, int32_t* vgId, int64_t* numOfTables, int64_t* numOfNormalTables); + int32_t (*getNumOfChildTables)(void* pVnode, int64_t uid, int64_t* numOfTables, int32_t* numOfCols); + void (*getBasicInfo)(void* pVnode, const char** dbname, int32_t* vgId, int64_t* numOfTables, + int64_t* numOfNormalTables); int64_t (*getNumOfRowsInMem)(void* pVnode); - SMCtbCursor* (*openCtbCursor)(void *pVnode, tb_uid_t uid, int lock); - int32_t (*resumeCtbCursor)(SMCtbCursor* pCtbCur, int8_t first); - void (*pauseCtbCursor)(SMCtbCursor* pCtbCur); - void (*closeCtbCursor)(SMCtbCursor *pCtbCur); - tb_uid_t (*ctbCursorNext)(SMCtbCursor* pCur); + SMCtbCursor* (*openCtbCursor)(void* pVnode, tb_uid_t uid, int lock); + int32_t (*resumeCtbCursor)(SMCtbCursor* pCtbCur, int8_t first); + void (*pauseCtbCursor)(SMCtbCursor* pCtbCur); + void (*closeCtbCursor)(SMCtbCursor* pCtbCur); + tb_uid_t (*ctbCursorNext)(SMCtbCursor* pCur); } SStoreMeta; typedef struct SStoreMetaReader { @@ -363,14 +364,14 @@ typedef struct SStateStore { const SSessionKey* pKey, void** pVal, int32_t* pVLen); SUpdateInfo* (*updateInfoInit)(int64_t interval, int32_t precision, int64_t watermark, bool igUp); - TSKEY (*updateInfoFillBlockData)(SUpdateInfo* pInfo, SSDataBlock* pBlock, int32_t primaryTsCol); - bool (*updateInfoIsUpdated)(SUpdateInfo* pInfo, uint64_t tableId, TSKEY ts); - bool (*updateInfoIsTableInserted)(SUpdateInfo* pInfo, int64_t tbUid); - bool (*isIncrementalTimeStamp)(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts); + TSKEY (*updateInfoFillBlockData)(SUpdateInfo* pInfo, SSDataBlock* pBlock, int32_t primaryTsCol); + bool (*updateInfoIsUpdated)(SUpdateInfo* pInfo, uint64_t tableId, TSKEY ts); + bool (*updateInfoIsTableInserted)(SUpdateInfo* pInfo, int64_t tbUid); + bool (*isIncrementalTimeStamp)(SUpdateInfo* pInfo, uint64_t tableId, TSKEY ts); void (*updateInfoDestroy)(SUpdateInfo* pInfo); - void (*windowSBfDelete)(SUpdateInfo *pInfo, uint64_t count); - void (*windowSBfAdd)(SUpdateInfo *pInfo, uint64_t count); + void (*windowSBfDelete)(SUpdateInfo* pInfo, uint64_t count); + void (*windowSBfAdd)(SUpdateInfo* pInfo, uint64_t count); SUpdateInfo* (*updateInfoInitP)(SInterval* pInterval, int64_t watermark, bool igUp); void (*updateInfoAddCloseWindowSBF)(SUpdateInfo* pInfo); @@ -397,6 +398,7 @@ typedef struct SStateStore { void (*streamStateDestroy)(SStreamState* pState, bool remove); int32_t (*streamStateDeleteCheckPoint)(SStreamState* pState, TSKEY mark); void (*streamStateReloadInfo)(SStreamState* pState, TSKEY ts); + void (*streamStateCopyBackend)(SStreamState* src, SStreamState* dst); } SStateStore; typedef struct SStorageAPI { diff --git a/include/libs/function/function.h b/include/libs/function/function.h index 49435a6317..003e9b900a 100644 --- a/include/libs/function/function.h +++ b/include/libs/function/function.h @@ -171,6 +171,7 @@ typedef struct { int32_t taskId; int64_t streamId; int64_t streamBackendRid; + int8_t dump; } SStreamState; typedef struct SFunctionStateStore { diff --git a/include/libs/stream/streamState.h b/include/libs/stream/streamState.h index ba90c0dc7a..24222677a4 100644 --- a/include/libs/stream/streamState.h +++ b/include/libs/stream/streamState.h @@ -50,7 +50,7 @@ void streamStateSetNumber(SStreamState* pState, int32_t number); int32_t streamStateSaveInfo(SStreamState* pState, void* pKey, int32_t keyLen, void* pVal, int32_t vLen); int32_t streamStateGetInfo(SStreamState* pState, void* pKey, int32_t keyLen, void** pVal, int32_t* pLen); -//session window +// session window int32_t streamStateSessionAddIfNotExist(SStreamState* pState, SSessionKey* key, TSKEY gap, void** pVal, int32_t* pVLen); int32_t streamStateSessionPut(SStreamState* pState, const SSessionKey* key, void* value, int32_t vLen); int32_t streamStateSessionGet(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen); @@ -65,7 +65,7 @@ SStreamStateCur* streamStateSessionSeekKeyNext(SStreamState* pState, const SSess SStreamStateCur* streamStateSessionSeekKeyCurrentPrev(SStreamState* pState, const SSessionKey* key); SStreamStateCur* streamStateSessionSeekKeyCurrentNext(SStreamState* pState, const SSessionKey* key); -//state window +// state window int32_t streamStateStateAddIfNotExist(SStreamState* pState, SSessionKey* key, char* pKeyData, int32_t keyDataLen, state_key_cmpr_fn fn, void** pVal, int32_t* pVLen); @@ -96,6 +96,9 @@ int32_t streamStatePutParName(SStreamState* pState, int64_t groupId, const char* int32_t streamStateGetParName(SStreamState* pState, int64_t groupId, void** pVal); void streamStateReloadInfo(SStreamState* pState, TSKEY ts); + +void streamStateCopyBackend(SStreamState* src, SStreamState* dst); + SStreamStateCur* createStreamStateCursor(); /***compare func **/ diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index 380be1dd38..21d813c7c0 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -66,6 +66,7 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t nextProcessVer } else { sndDebug("s-task:%s state:%p", pTask->id.idStr, pTask->pState); } + int32_t numOfVgroups = (int32_t)taosArrayGetSize(pTask->upstreamInfo.pList); SReadHandle handle = { diff --git a/source/dnode/snode/src/snodeInitApi.c b/source/dnode/snode/src/snodeInitApi.c index d1e0aefca0..c605a8373e 100644 --- a/source/dnode/snode/src/snodeInitApi.c +++ b/source/dnode/snode/src/snodeInitApi.c @@ -14,8 +14,8 @@ */ #include "storageapi.h" -#include "tstreamUpdate.h" #include "streamState.h" +#include "tstreamUpdate.h" static void initStateStoreAPI(SStateStore* pStore); static void initFunctionStateStore(SFunctionStateStore* pStore); @@ -100,9 +100,10 @@ void initStateStoreAPI(SStateStore* pStore) { pStore->streamStateClose = streamStateClose; pStore->streamStateBegin = streamStateBegin; pStore->streamStateCommit = streamStateCommit; - pStore->streamStateDestroy= streamStateDestroy; + pStore->streamStateDestroy = streamStateDestroy; pStore->streamStateDeleteCheckPoint = streamStateDeleteCheckPoint; pStore->streamStateReloadInfo = streamStateReloadInfo; + pStore->streamStateCopyBackend = streamStateCopyBackend; } void initFunctionStateStore(SFunctionStateStore* pStore) { diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index a106fe148b..dd7618adc3 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -13,9 +13,9 @@ * along with this program. If not, see . */ -#include "tstream.h" #include "tmsgcb.h" #include "tq.h" +#include "tstream.h" typedef struct STaskUpdateEntry { int64_t streamId; @@ -24,7 +24,7 @@ typedef struct STaskUpdateEntry { } STaskUpdateEntry; int32_t tqStreamTaskStartAsync(SStreamMeta* pMeta, SMsgCb* cb, bool restart) { - int32_t vgId = pMeta->vgId; + int32_t vgId = pMeta->vgId; int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList); if (numOfTasks == 0) { @@ -42,7 +42,7 @@ int32_t tqStreamTaskStartAsync(SStreamMeta* pMeta, SMsgCb* cb, bool restart) { tqDebug("vgId:%d start all %d stream task(s) async", vgId, numOfTasks); pRunReq->head.vgId = vgId; pRunReq->streamId = 0; - pRunReq->taskId = restart? STREAM_EXEC_RESTART_ALL_TASKS_ID:STREAM_EXEC_START_ALL_TASKS_ID; + pRunReq->taskId = restart ? STREAM_EXEC_RESTART_ALL_TASKS_ID : STREAM_EXEC_START_ALL_TASKS_ID; SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)}; tmsgPutToQueue(cb, STREAM_QUEUE, &msg); @@ -50,10 +50,10 @@ int32_t tqStreamTaskStartAsync(SStreamMeta* pMeta, SMsgCb* cb, bool restart) { } int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pMsg, bool restored) { - int32_t vgId = pMeta->vgId; - char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); - int32_t len = pMsg->contLen - sizeof(SMsgHead); - SRpcMsg rsp = {.info = pMsg->info, .code = TSDB_CODE_SUCCESS}; + int32_t vgId = pMeta->vgId; + char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); + int32_t len = pMsg->contLen - sizeof(SMsgHead); + SRpcMsg rsp = {.info = pMsg->info, .code = TSDB_CODE_SUCCESS}; SStreamTaskNodeUpdateMsg req = {0}; @@ -72,7 +72,7 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM streamMetaWLock(pMeta); // the task epset may be updated again and again, when replaying the WAL, the task may be in stop status. - STaskId id = {.streamId = req.streamId, .taskId = req.taskId}; + STaskId id = {.streamId = req.streamId, .taskId = req.taskId}; SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); if (ppTask == NULL || *ppTask == NULL) { tqError("vgId:%d failed to acquire task:0x%x when handling update, it may have been dropped already", pMeta->vgId, @@ -96,7 +96,7 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM } STaskUpdateEntry entry = {.streamId = req.streamId, .taskId = req.taskId, .transId = req.transId}; - void* exist = taosHashGet(pMeta->updateInfo.pTasks, &entry, sizeof(STaskUpdateEntry)); + void* exist = taosHashGet(pMeta->updateInfo.pTasks, &entry, sizeof(STaskUpdateEntry)); if (exist != NULL) { tqDebug("s-task:%s (vgId:%d) already update in trans:%d, discard the nodeEp update msg", pTask->id.idStr, vgId, req.transId); @@ -166,7 +166,8 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM streamMetaWUnLock(pMeta); } else { if (!restored) { - tqDebug("vgId:%d vnode restore not completed, not restart the tasks, clear the start after nodeUpdate flag", vgId); + tqDebug("vgId:%d vnode restore not completed, not restart the tasks, clear the start after nodeUpdate flag", + vgId); pMeta->startInfo.tasksWillRestart = 0; streamMetaWUnLock(pMeta); } else { @@ -238,7 +239,7 @@ int32_t tqStreamTaskProcessDispatchReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { SStreamDispatchReq req = {0}; - SDecoder decoder; + SDecoder decoder; tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen); if (tDecodeStreamDispatchReq(&decoder, &req) < 0) { tDecoderClear(&decoder); @@ -251,7 +252,7 @@ int32_t tqStreamTaskProcessDispatchReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, req.taskId); if (pTask) { SRpcMsg rsp = {.info = pMsg->info, .code = 0}; - if (streamProcessDispatchMsg(pTask, &req, &rsp) != 0){ + if (streamProcessDispatchMsg(pTask, &req, &rsp) != 0) { return -1; } tDeleteStreamDispatchReq(&req); @@ -355,8 +356,8 @@ int32_t tqStreamTaskProcessScanHistoryFinishReq(SStreamMeta* pMeta, SRpcMsg* pMs SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, req.downstreamTaskId); if (pTask == NULL) { - tqError("vgId:%d process scan history finish msg, failed to find task:0x%x, it may be destroyed", - pMeta->vgId, req.downstreamTaskId); + tqError("vgId:%d process scan history finish msg, failed to find task:0x%x, it may be destroyed", pMeta->vgId, + req.downstreamTaskId); return -1; } @@ -381,8 +382,8 @@ int32_t tqStreamTaskProcessScanHistoryFinishRsp(SStreamMeta* pMeta, SRpcMsg* pMs SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, req.upstreamTaskId); if (pTask == NULL) { - tqError("vgId:%d process scan history finish rsp, failed to find task:0x%x, it may be destroyed", - pMeta->vgId, req.upstreamTaskId); + tqError("vgId:%d process scan history finish rsp, failed to find task:0x%x, it may be destroyed", pMeta->vgId, + req.upstreamTaskId); return -1; } @@ -428,8 +429,9 @@ int32_t tqStreamTaskProcessCheckReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { // only the leader node handle the check request if (pMeta->role == NODE_ROLE_FOLLOWER) { - tqError("s-task:0x%x invalid check msg from upstream:0x%x(vgId:%d), vgId:%d is follower, not handle check status msg", - taskId, req.upstreamTaskId, req.upstreamNodeId, pMeta->vgId); + tqError( + "s-task:0x%x invalid check msg from upstream:0x%x(vgId:%d), vgId:%d is follower, not handle check status msg", + taskId, req.upstreamTaskId, req.upstreamNodeId, pMeta->vgId); rsp.status = TASK_DOWNSTREAM_NOT_LEADER; } else { SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, taskId); @@ -439,13 +441,14 @@ int32_t tqStreamTaskProcessCheckReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { char* p = NULL; streamTaskGetStatus(pTask, &p); - tqDebug("s-task:%s status:%s, stage:%"PRId64" recv task check req(reqId:0x%" PRIx64 ") task:0x%x (vgId:%d), check_status:%d", - pTask->id.idStr, p, rsp.oldStage, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status); + tqDebug("s-task:%s status:%s, stage:%" PRId64 " recv task check req(reqId:0x%" PRIx64 + ") task:0x%x (vgId:%d), check_status:%d", + pTask->id.idStr, p, rsp.oldStage, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status); } else { rsp.status = TASK_DOWNSTREAM_NOT_READY; tqDebug("tq recv task check(taskId:0x%" PRIx64 "-0x%x not built yet) req(reqId:0x%" PRIx64 - ") from task:0x%x (vgId:%d), rsp check_status %d", - req.streamId, taskId, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status); + ") from task:0x%x (vgId:%d), rsp check_status %d", + req.streamId, taskId, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status); } } @@ -472,7 +475,7 @@ int32_t tqStreamTaskProcessCheckRsp(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLe tDecoderClear(&decoder); tqDebug("tq task:0x%x (vgId:%d) recv check rsp(reqId:0x%" PRIx64 ") from 0x%x (vgId:%d) status %d", - rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.status); + rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.status); if (!isLeader) { streamMetaUpdateTaskDownstreamStatus(pMeta, rsp.streamId, rsp.upstreamTaskId, 0, taosGetTimestampMs(), false); @@ -485,7 +488,7 @@ int32_t tqStreamTaskProcessCheckRsp(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLe if (pTask == NULL) { streamMetaUpdateTaskDownstreamStatus(pMeta, rsp.streamId, rsp.upstreamTaskId, 0, taosGetTimestampMs(), false); tqError("tq failed to locate the stream task:0x%" PRIx64 "-0x%x (vgId:%d), it may have been destroyed or stopped", - rsp.streamId, rsp.upstreamTaskId, vgId); + rsp.streamId, rsp.upstreamTaskId, vgId); terrno = TSDB_CODE_STREAM_TASK_NOT_EXIST; return -1; } @@ -496,10 +499,10 @@ int32_t tqStreamTaskProcessCheckRsp(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLe } int32_t tqStreamTaskProcessCheckpointReadyMsg(SStreamMeta* pMeta, SRpcMsg* pMsg) { - int32_t vgId = pMeta->vgId; - char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); - int32_t len = pMsg->contLen - sizeof(SMsgHead); - int32_t code = 0; + int32_t vgId = pMeta->vgId; + char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); + int32_t len = pMsg->contLen - sizeof(SMsgHead); + int32_t code = 0; SStreamCheckpointReadyMsg req = {0}; @@ -526,7 +529,8 @@ int32_t tqStreamTaskProcessCheckpointReadyMsg(SStreamMeta* pMeta, SRpcMsg* pMsg) return code; } -int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, int64_t sversion, char* msg, int32_t msgLen, bool isLeader, bool restored) { +int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, int64_t sversion, char* msg, int32_t msgLen, bool isLeader, + bool restored) { int32_t code = 0; int32_t vgId = pMeta->vgId; @@ -538,7 +542,7 @@ int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, int64_t sversion, char* tqDebug("vgId:%d receive new stream task deploy msg, start to build stream task", vgId); // 1.deserialize msg and build task - int32_t size = sizeof(SStreamTask); + int32_t size = sizeof(SStreamTask); SStreamTask* pTask = taosMemoryCalloc(1, size); if (pTask == NULL) { tqError("vgId:%d failed to create stream task due to out of memory, alloc size:%d", vgId, size); @@ -566,7 +570,8 @@ int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, int64_t sversion, char* streamMetaWUnLock(pMeta); if (code < 0) { - tqError("failed to add s-task:0x%x into vgId:%d meta, total:%d, code:%s", vgId, taskId, numOfTasks, tstrerror(code)); + tqError("failed to add s-task:0x%x into vgId:%d meta, total:%d, code:%s", vgId, taskId, numOfTasks, + tstrerror(code)); tFreeStreamTask(pTask); return code; } @@ -603,7 +608,7 @@ int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, int64_t sversion, char* int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen) { SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg; - int32_t vgId = pMeta->vgId; + int32_t vgId = pMeta->vgId; tqDebug("vgId:%d receive msg to drop s-task:0x%x", vgId, pReq->taskId); SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId); @@ -634,8 +639,8 @@ int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen } int32_t startStreamTasks(SStreamMeta* pMeta) { - int32_t code = TSDB_CODE_SUCCESS; - int32_t vgId = pMeta->vgId; + int32_t code = TSDB_CODE_SUCCESS; + int32_t vgId = pMeta->vgId; int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList); tqDebug("vgId:%d start to check all %d stream task(s) downstream status", vgId, numOfTasks); @@ -679,7 +684,7 @@ int32_t startStreamTasks(SStreamMeta* pMeta) { } EStreamTaskEvent event = (HAS_RELATED_FILLHISTORY_TASK(pTask)) ? TASK_EVENT_INIT_STREAM_SCANHIST : TASK_EVENT_INIT; - int32_t ret = streamTaskHandleEvent(pTask->status.pSM, event); + int32_t ret = streamTaskHandleEvent(pTask->status.pSM, event); if (ret != TSDB_CODE_SUCCESS) { code = ret; } @@ -692,8 +697,8 @@ int32_t startStreamTasks(SStreamMeta* pMeta) { } int32_t resetStreamTaskStatus(SStreamMeta* pMeta) { - int32_t vgId = pMeta->vgId; - int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList); + int32_t vgId = pMeta->vgId; + int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList); tqDebug("vgId:%d reset all %d stream task(s) status to be uninit", vgId, numOfTasks); if (numOfTasks == 0) { @@ -703,7 +708,7 @@ int32_t resetStreamTaskStatus(SStreamMeta* pMeta) { for (int32_t i = 0; i < numOfTasks; ++i) { SStreamTaskId* pTaskId = taosArrayGet(pMeta->pTaskList, i); - STaskId id = {.streamId = pTaskId->streamId, .taskId = pTaskId->taskId}; + STaskId id = {.streamId = pTaskId->streamId, .taskId = pTaskId->taskId}; SStreamTask** pTask = taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); streamTaskResetStatus(*pTask); } @@ -716,7 +721,7 @@ static int32_t restartStreamTasks(SStreamMeta* pMeta, bool isLeader) { int32_t code = 0; int64_t st = taosGetTimestampMs(); - while(1) { + while (1) { int32_t startVal = atomic_val_compare_exchange_32(&pMeta->startInfo.taskStarting, 0, 1); if (startVal == 0) { break; @@ -739,7 +744,7 @@ static int32_t restartStreamTasks(SStreamMeta* pMeta, bool isLeader) { streamMetaClear(pMeta); int64_t el = taosGetTimestampMs() - st; - tqInfo("vgId:%d close&reload state elapsed time:%.3fs", vgId, el/1000.); + tqInfo("vgId:%d close&reload state elapsed time:%.3fs", vgId, el / 1000.); code = streamMetaLoadAllTasks(pMeta); if (code != TSDB_CODE_SUCCESS) { @@ -780,11 +785,11 @@ int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLead } SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, taskId); - if (pTask != NULL) { // even in halt status, the data in inputQ must be processed + if (pTask != NULL) { // even in halt status, the data in inputQ must be processed char* p = NULL; if (streamTaskReadyToRun(pTask, &p)) { tqDebug("vgId:%d s-task:%s start to process block from inputQ, next checked ver:%" PRId64, vgId, pTask->id.idStr, - pTask->chkInfo.nextProcessVer); + pTask->chkInfo.nextProcessVer); streamExecTask(pTask); } else { int8_t status = streamTaskSetSchedStatusInactive(pTask); @@ -800,5 +805,3 @@ int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLead return -1; } } - - diff --git a/source/dnode/vnode/src/vnd/vnodeInitApi.c b/source/dnode/vnode/src/vnd/vnodeInitApi.c index 9ba585caac..c323a81093 100644 --- a/source/dnode/vnode/src/vnd/vnodeInitApi.c +++ b/source/dnode/vnode/src/vnd/vnodeInitApi.c @@ -215,6 +215,7 @@ void initStateStoreAPI(SStateStore* pStore) { pStore->streamStateDestroy = streamStateDestroy; pStore->streamStateDeleteCheckPoint = streamStateDeleteCheckPoint; pStore->streamStateReloadInfo = streamStateReloadInfo; + pStore->streamStateCopyBackend = streamStateCopyBackend; } void initMetaReaderAPI(SStoreMetaReader* pMetaReader) { diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index ef3a4d6f90..fb348e4416 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -408,6 +408,11 @@ void destroyStreamFinalIntervalOperatorInfo(void* param) { taosArrayDestroy(pInfo->pDelWins); blockDataDestroy(pInfo->pDelRes); pInfo->stateStore.streamFileStateDestroy(pInfo->pState->pFileState); + + if (pInfo->pState->dump == 1) { + taosMemoryFreeClear(pInfo->pState->pTdbState->pOwner); + taosMemoryFreeClear(pInfo->pState->pTdbState); + } taosMemoryFreeClear(pInfo->pState); nodesDestroyNode((SNode*)pInfo->pPhyNode); @@ -1462,7 +1467,11 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, initBasicInfo(&pInfo->binfo, pResBlock); pInfo->pState = taosMemoryCalloc(1, sizeof(SStreamState)); - *(pInfo->pState) = *(pTaskInfo->streamInfo.pState); + qInfo("open state %p", pInfo->pState); + pAPI->stateStore.streamStateCopyBackend(pTaskInfo->streamInfo.pState, pInfo->pState); + //*(pInfo->pState) = *(pTaskInfo->streamInfo.pState); + + qInfo("copy state %p to %p", pTaskInfo->streamInfo.pState, pInfo->pState); pAPI->stateStore.streamStateSetNumber(pInfo->pState, -1); int32_t code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str, @@ -3338,7 +3347,8 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl SColumnInfoData* pKeyColInfo = taosArrayGet(pSDataBlock->pDataBlock, pInfo->stateCol.slotId); for (int32_t i = 0; i < rows; i += winRows) { if (pInfo->ignoreExpiredData && checkExpiredData(&pInfo->streamAggSup.stateStore, pInfo->streamAggSup.pUpdateInfo, - &pInfo->twAggSup, pSDataBlock->info.id.uid, tsCols[i]) || colDataIsNull_s(pKeyColInfo, i)) { + &pInfo->twAggSup, pSDataBlock->info.id.uid, tsCols[i]) || + colDataIsNull_s(pKeyColInfo, i)) { i++; continue; } diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index 276ed08785..776a9db522 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -91,8 +91,8 @@ int stateKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, int kLen2) { } SStreamState* streamStateOpen(char* path, void* pTask, bool specPath, int32_t szPage, int32_t pages) { - stDebug("open stream state, %s", path); SStreamState* pState = taosMemoryCalloc(1, sizeof(SStreamState)); + stDebug("open stream state %p, %s", pState, path); if (pState == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; @@ -211,7 +211,7 @@ int32_t streamStateDelTaskDb(SStreamState* pState) { SStreamTask* pTask = pState->pTdbState->pOwner; taskDbRemoveRef(pTask->pBackend); taosMemoryFree(pTask); - return 0; + return 0; } void streamStateClose(SStreamState* pState, bool remove) { SStreamTask* pTask = pState->pTdbState->pOwner; @@ -374,8 +374,8 @@ int32_t streamStateClear(SStreamState* pState) { streamStatePut(pState, &key, NULL, 0); while (1) { SStreamStateCur* pCur = streamStateSeekKeyNext(pState, &key); - SWinKey delKey = {0}; - int32_t code = streamStateGetKVByCur(pCur, &delKey, NULL, 0); + SWinKey delKey = {0}; + int32_t code = streamStateGetKVByCur(pCur, &delKey, NULL, 0); streamStateFreeCur(pCur); if (code == 0) { streamStateDel(pState, &delKey); @@ -493,7 +493,7 @@ int32_t streamStateGetKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const void** return -1; } const SStateKey* pKTmp = NULL; - int32_t kLen; + int32_t kLen; if (tdbTbcGet(pCur->pCur, (const void**)&pKTmp, &kLen, pVal, pVLen) < 0) { return -1; } @@ -513,7 +513,7 @@ int32_t streamStateFillGetKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const vo return -1; } const SWinKey* pKTmp = NULL; - int32_t kLen; + int32_t kLen; if (tdbTbcGet(pCur->pCur, (const void**)&pKTmp, &kLen, pVal, pVLen) < 0) { return -1; } @@ -530,7 +530,7 @@ int32_t streamStateGetGroupKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const v return -1; } uint64_t groupId = pKey->groupId; - int32_t code = streamStateFillGetKVByCur(pCur, pKey, pVal, pVLen); + int32_t code = streamStateFillGetKVByCur(pCur, pKey, pVal, pVLen); if (code == 0) { if (pKey->groupId == groupId) { return 0; @@ -555,7 +555,7 @@ SStreamStateCur* streamStateSeekKeyNext(SStreamState* pState, const SWinKey* key } SStateKey sKey = {.key = *key, .opNum = pState->number}; - int32_t c = 0; + int32_t c = 0; if (tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateKey), &c) < 0) { streamStateFreeCur(pCur); return NULL; @@ -710,7 +710,8 @@ int32_t streamStateSessionPut(SStreamState* pState, const SSessionKey* key, void #endif } -int32_t streamStateSessionAllocWinBuffByNextPosition(SStreamState* pState, SStreamStateCur* pCur, const SSessionKey* pKey, void** pVal, int32_t* pVLen) { +int32_t streamStateSessionAllocWinBuffByNextPosition(SStreamState* pState, SStreamStateCur* pCur, + const SSessionKey* pKey, void** pVal, int32_t* pVLen) { #ifdef USE_ROCKSDB return allocSessioncWinBuffByNextPosition(pState->pFileState, pCur, pKey, pVal, pVLen); #else @@ -724,9 +725,9 @@ int32_t streamStateSessionGet(SStreamState* pState, SSessionKey* key, void** pVa #else SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentNext(pState, key); - SSessionKey resKey = *key; - void* tmp = NULL; - int32_t code = streamStateSessionGetKVByCur(pCur, &resKey, &tmp, pVLen); + SSessionKey resKey = *key; + void* tmp = NULL; + int32_t code = streamStateSessionGetKVByCur(pCur, &resKey, &tmp, pVLen); if (code == 0) { if (key->win.skey != resKey.win.skey) { code = -1; @@ -767,7 +768,7 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentPrev(SStreamState* pState, cons } SStateSessionKey sKey = {.key = *key, .opNum = pState->number}; - int32_t c = 0; + int32_t c = 0; if (tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateSessionKey), &c) < 0) { streamStateFreeCur(pCur); return NULL; @@ -798,7 +799,7 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentNext(SStreamState* pState, cons } SStateSessionKey sKey = {.key = *key, .opNum = pState->number}; - int32_t c = 0; + int32_t c = 0; if (tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateSessionKey), &c) < 0) { streamStateFreeCur(pCur); return NULL; @@ -830,7 +831,7 @@ SStreamStateCur* streamStateSessionSeekKeyNext(SStreamState* pState, const SSess } SStateSessionKey sKey = {.key = *key, .opNum = pState->number}; - int32_t c = 0; + int32_t c = 0; if (tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateSessionKey), &c) < 0) { streamStateFreeCur(pCur); return NULL; @@ -854,7 +855,7 @@ int32_t streamStateSessionGetKVByCur(SStreamStateCur* pCur, SSessionKey* pKey, v return -1; } SStateSessionKey* pKTmp = NULL; - int32_t kLen; + int32_t kLen; if (tdbTbcGet(pCur->pCur, (const void**)&pKTmp, &kLen, (const void**)pVal, pVLen) < 0) { return -1; } @@ -874,13 +875,13 @@ int32_t streamStateSessionClear(SStreamState* pState) { sessionWinStateClear(pState->pFileState); return streamStateSessionClear_rocksdb(pState); #else - SSessionKey key = {.win.skey = 0, .win.ekey = 0, .groupId = 0}; + SSessionKey key = {.win.skey = 0, .win.ekey = 0, .groupId = 0}; SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentNext(pState, &key); while (1) { SSessionKey delKey = {0}; - void* buf = NULL; - int32_t size = 0; - int32_t code = streamStateSessionGetKVByCur(pCur, &delKey, &buf, &size); + void* buf = NULL; + int32_t size = 0; + int32_t code = streamStateSessionGetKVByCur(pCur, &delKey, &buf, &size); if (code == 0 && size > 0) { memset(buf, 0, size); streamStateSessionPut(pState, &delKey, buf, size); @@ -909,14 +910,14 @@ int32_t streamStateSessionGetKeyByRange(SStreamState* pState, const SSessionKey* } SStateSessionKey sKey = {.key = *key, .opNum = pState->number}; - int32_t c = 0; + int32_t c = 0; if (tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateSessionKey), &c) < 0) { streamStateFreeCur(pCur); return -1; } SSessionKey resKey = *key; - int32_t code = streamStateSessionGetKVByCur(pCur, &resKey, NULL, 0); + int32_t code = streamStateSessionGetKVByCur(pCur, &resKey, NULL, 0); if (code == 0 && sessionRangeKeyCmpr(key, &resKey) == 0) { *curKey = resKey; streamStateFreeCur(pCur); @@ -952,19 +953,19 @@ int32_t streamStateSessionAddIfNotExist(SStreamState* pState, SSessionKey* key, return getSessionWinResultBuff(pState->pFileState, key, gap, pVal, pVLen); #else // todo refactor - int32_t res = 0; + int32_t res = 0; SSessionKey originKey = *key; SSessionKey searchKey = *key; searchKey.win.skey = key->win.skey - gap; searchKey.win.ekey = key->win.ekey + gap; int32_t valSize = *pVLen; - void* tmp = tdbRealloc(NULL, valSize); + void* tmp = tdbRealloc(NULL, valSize); if (!tmp) { return -1; } SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentPrev(pState, key); - int32_t code = streamStateSessionGetKVByCur(pCur, key, pVal, pVLen); + int32_t code = streamStateSessionGetKVByCur(pCur, key, pVal, pVLen); if (code == 0) { if (sessionRangeKeyCmpr(&searchKey, key) == 0) { memcpy(tmp, *pVal, valSize); @@ -1007,16 +1008,16 @@ int32_t streamStateStateAddIfNotExist(SStreamState* pState, SSessionKey* key, ch #ifdef USE_ROCKSDB return getStateWinResultBuff(pState->pFileState, key, pKeyData, keyDataLen, fn, pVal, pVLen); #else - int32_t res = 0; + int32_t res = 0; SSessionKey tmpKey = *key; - int32_t valSize = *pVLen; - void* tmp = tdbRealloc(NULL, valSize); + int32_t valSize = *pVLen; + void* tmp = tdbRealloc(NULL, valSize); if (!tmp) { return -1; } SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentPrev(pState, key); - int32_t code = streamStateSessionGetKVByCur(pCur, key, pVal, pVLen); + int32_t code = streamStateSessionGetKVByCur(pCur, key, pVal, pVLen); if (code == 0) { if (key->win.skey <= tmpKey.win.skey && tmpKey.win.ekey <= key->win.ekey) { memcpy(tmp, *pVal, valSize); @@ -1115,6 +1116,20 @@ int32_t streamStateDeleteCheckPoint(SStreamState* pState, TSKEY mark) { void streamStateReloadInfo(SStreamState* pState, TSKEY ts) { streamFileStateReloadInfo(pState->pFileState, ts); } +void streamStateCopyBackend(SStreamState* src, SStreamState* dst) { + dst->pFileState = src->pFileState; + dst->parNameMap = src->parNameMap; + dst->number = src->number; + dst->taskId = src->taskId; + dst->streamId = src->streamId; + if (dst->pTdbState == NULL) { + dst->pTdbState = taosMemoryCalloc(1, sizeof(STdbState)); + dst->pTdbState->pOwner = taosMemoryCalloc(1, sizeof(SStreamTask)); + } + dst->dump = 1; + dst->pTdbState->pOwner->pBackend = src->pTdbState->pOwner->pBackend; + return; +} SStreamStateCur* createStreamStateCursor() { SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); pCur->buffIndex = -1; diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index eb77e74f71..8edfb352ab 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -26,7 +26,7 @@ ,,y,system-test,./pytest.sh python3 ./test.py -f 8-stream/pause_resume_test.py #,,n,system-test,python3 ./test.py -f 8-stream/vnode_restart.py -N 4 #,,n,system-test,python3 ./test.py -f 8-stream/snode_restart.py -N 4 -#,,n,system-test,python3 ./test.py -f 8-stream/snode_restart_with_checkpoint.py -N 4 +,,n,system-test,python3 ./test.py -f 8-stream/snode_restart_with_checkpoint.py -N 4 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/tbname_vgroup.py ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/stbJoin.py