Merge pull request #23946 from taosdata/fix/snodeBackendCrash
Fix/snode backend crash
This commit is contained in:
commit
1a9701d401
|
@ -38,8 +38,8 @@ extern "C" {
|
||||||
|
|
||||||
#define META_READER_NOLOCK 0x1
|
#define META_READER_NOLOCK 0x1
|
||||||
|
|
||||||
#define STREAM_STATE_BUFF_HASH 1
|
#define STREAM_STATE_BUFF_HASH 1
|
||||||
#define STREAM_STATE_BUFF_SORT 2
|
#define STREAM_STATE_BUFF_SORT 2
|
||||||
|
|
||||||
typedef struct SMeta SMeta;
|
typedef struct SMeta SMeta;
|
||||||
typedef TSKEY (*GetTsFun)(void*);
|
typedef TSKEY (*GetTsFun)(void*);
|
||||||
|
@ -102,14 +102,14 @@ typedef struct SMTbCursor {
|
||||||
} SMTbCursor;
|
} SMTbCursor;
|
||||||
|
|
||||||
typedef struct SMCtbCursor {
|
typedef struct SMCtbCursor {
|
||||||
SMeta *pMeta;
|
SMeta* pMeta;
|
||||||
void *pCur;
|
void* pCur;
|
||||||
tb_uid_t suid;
|
tb_uid_t suid;
|
||||||
void *pKey;
|
void* pKey;
|
||||||
void *pVal;
|
void* pVal;
|
||||||
int kLen;
|
int kLen;
|
||||||
int vLen;
|
int vLen;
|
||||||
int8_t paused;
|
int8_t paused;
|
||||||
int lock;
|
int lock;
|
||||||
} SMCtbCursor;
|
} SMCtbCursor;
|
||||||
|
|
||||||
|
@ -263,22 +263,23 @@ typedef struct SStoreMeta {
|
||||||
void* (*storeGetIndexInfo)();
|
void* (*storeGetIndexInfo)();
|
||||||
void* (*getInvertIndex)(void* pVnode);
|
void* (*getInvertIndex)(void* pVnode);
|
||||||
// support filter and non-filter cases. [vnodeGetCtbIdList & vnodeGetCtbIdListByFilter]
|
// support filter and non-filter cases. [vnodeGetCtbIdList & vnodeGetCtbIdListByFilter]
|
||||||
int32_t (*getChildTableList)( void* pVnode, int64_t suid, SArray* list);
|
int32_t (*getChildTableList)(void* pVnode, int64_t suid, SArray* list);
|
||||||
int32_t (*storeGetTableList)(void* pVnode, int8_t type, SArray* pList);
|
int32_t (*storeGetTableList)(void* pVnode, int8_t type, SArray* pList);
|
||||||
void* storeGetVersionRange;
|
void* storeGetVersionRange;
|
||||||
void* storeGetLastTimestamp;
|
void* storeGetLastTimestamp;
|
||||||
|
|
||||||
int32_t (*getTableSchema)(void* pVnode, int64_t uid, STSchema** pSchema, int64_t* suid); // tsdbGetTableSchema
|
int32_t (*getTableSchema)(void* pVnode, int64_t uid, STSchema** pSchema, int64_t* suid); // tsdbGetTableSchema
|
||||||
int32_t (*getNumOfChildTables)( void* pVnode, int64_t uid, int64_t* numOfTables, int32_t* numOfCols);
|
int32_t (*getNumOfChildTables)(void* pVnode, int64_t uid, int64_t* numOfTables, int32_t* numOfCols);
|
||||||
void (*getBasicInfo)(void* pVnode, const char** dbname, int32_t* vgId, int64_t* numOfTables, int64_t* numOfNormalTables);
|
void (*getBasicInfo)(void* pVnode, const char** dbname, int32_t* vgId, int64_t* numOfTables,
|
||||||
|
int64_t* numOfNormalTables);
|
||||||
|
|
||||||
int64_t (*getNumOfRowsInMem)(void* pVnode);
|
int64_t (*getNumOfRowsInMem)(void* pVnode);
|
||||||
|
|
||||||
SMCtbCursor* (*openCtbCursor)(void *pVnode, tb_uid_t uid, int lock);
|
SMCtbCursor* (*openCtbCursor)(void* pVnode, tb_uid_t uid, int lock);
|
||||||
int32_t (*resumeCtbCursor)(SMCtbCursor* pCtbCur, int8_t first);
|
int32_t (*resumeCtbCursor)(SMCtbCursor* pCtbCur, int8_t first);
|
||||||
void (*pauseCtbCursor)(SMCtbCursor* pCtbCur);
|
void (*pauseCtbCursor)(SMCtbCursor* pCtbCur);
|
||||||
void (*closeCtbCursor)(SMCtbCursor *pCtbCur);
|
void (*closeCtbCursor)(SMCtbCursor* pCtbCur);
|
||||||
tb_uid_t (*ctbCursorNext)(SMCtbCursor* pCur);
|
tb_uid_t (*ctbCursorNext)(SMCtbCursor* pCur);
|
||||||
} SStoreMeta;
|
} SStoreMeta;
|
||||||
|
|
||||||
typedef struct SStoreMetaReader {
|
typedef struct SStoreMetaReader {
|
||||||
|
@ -363,14 +364,14 @@ typedef struct SStateStore {
|
||||||
const SSessionKey* pKey, void** pVal, int32_t* pVLen);
|
const SSessionKey* pKey, void** pVal, int32_t* pVLen);
|
||||||
|
|
||||||
SUpdateInfo* (*updateInfoInit)(int64_t interval, int32_t precision, int64_t watermark, bool igUp);
|
SUpdateInfo* (*updateInfoInit)(int64_t interval, int32_t precision, int64_t watermark, bool igUp);
|
||||||
TSKEY (*updateInfoFillBlockData)(SUpdateInfo* pInfo, SSDataBlock* pBlock, int32_t primaryTsCol);
|
TSKEY (*updateInfoFillBlockData)(SUpdateInfo* pInfo, SSDataBlock* pBlock, int32_t primaryTsCol);
|
||||||
bool (*updateInfoIsUpdated)(SUpdateInfo* pInfo, uint64_t tableId, TSKEY ts);
|
bool (*updateInfoIsUpdated)(SUpdateInfo* pInfo, uint64_t tableId, TSKEY ts);
|
||||||
bool (*updateInfoIsTableInserted)(SUpdateInfo* pInfo, int64_t tbUid);
|
bool (*updateInfoIsTableInserted)(SUpdateInfo* pInfo, int64_t tbUid);
|
||||||
bool (*isIncrementalTimeStamp)(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts);
|
bool (*isIncrementalTimeStamp)(SUpdateInfo* pInfo, uint64_t tableId, TSKEY ts);
|
||||||
|
|
||||||
void (*updateInfoDestroy)(SUpdateInfo* pInfo);
|
void (*updateInfoDestroy)(SUpdateInfo* pInfo);
|
||||||
void (*windowSBfDelete)(SUpdateInfo *pInfo, uint64_t count);
|
void (*windowSBfDelete)(SUpdateInfo* pInfo, uint64_t count);
|
||||||
void (*windowSBfAdd)(SUpdateInfo *pInfo, uint64_t count);
|
void (*windowSBfAdd)(SUpdateInfo* pInfo, uint64_t count);
|
||||||
|
|
||||||
SUpdateInfo* (*updateInfoInitP)(SInterval* pInterval, int64_t watermark, bool igUp);
|
SUpdateInfo* (*updateInfoInitP)(SInterval* pInterval, int64_t watermark, bool igUp);
|
||||||
void (*updateInfoAddCloseWindowSBF)(SUpdateInfo* pInfo);
|
void (*updateInfoAddCloseWindowSBF)(SUpdateInfo* pInfo);
|
||||||
|
@ -397,6 +398,7 @@ typedef struct SStateStore {
|
||||||
void (*streamStateDestroy)(SStreamState* pState, bool remove);
|
void (*streamStateDestroy)(SStreamState* pState, bool remove);
|
||||||
int32_t (*streamStateDeleteCheckPoint)(SStreamState* pState, TSKEY mark);
|
int32_t (*streamStateDeleteCheckPoint)(SStreamState* pState, TSKEY mark);
|
||||||
void (*streamStateReloadInfo)(SStreamState* pState, TSKEY ts);
|
void (*streamStateReloadInfo)(SStreamState* pState, TSKEY ts);
|
||||||
|
void (*streamStateCopyBackend)(SStreamState* src, SStreamState* dst);
|
||||||
} SStateStore;
|
} SStateStore;
|
||||||
|
|
||||||
typedef struct SStorageAPI {
|
typedef struct SStorageAPI {
|
||||||
|
|
|
@ -171,6 +171,7 @@ typedef struct {
|
||||||
int32_t taskId;
|
int32_t taskId;
|
||||||
int64_t streamId;
|
int64_t streamId;
|
||||||
int64_t streamBackendRid;
|
int64_t streamBackendRid;
|
||||||
|
int8_t dump;
|
||||||
} SStreamState;
|
} SStreamState;
|
||||||
|
|
||||||
typedef struct SFunctionStateStore {
|
typedef struct SFunctionStateStore {
|
||||||
|
|
|
@ -50,7 +50,7 @@ void streamStateSetNumber(SStreamState* pState, int32_t number);
|
||||||
int32_t streamStateSaveInfo(SStreamState* pState, void* pKey, int32_t keyLen, void* pVal, int32_t vLen);
|
int32_t streamStateSaveInfo(SStreamState* pState, void* pKey, int32_t keyLen, void* pVal, int32_t vLen);
|
||||||
int32_t streamStateGetInfo(SStreamState* pState, void* pKey, int32_t keyLen, void** pVal, int32_t* pLen);
|
int32_t streamStateGetInfo(SStreamState* pState, void* pKey, int32_t keyLen, void** pVal, int32_t* pLen);
|
||||||
|
|
||||||
//session window
|
// session window
|
||||||
int32_t streamStateSessionAddIfNotExist(SStreamState* pState, SSessionKey* key, TSKEY gap, void** pVal, int32_t* pVLen);
|
int32_t streamStateSessionAddIfNotExist(SStreamState* pState, SSessionKey* key, TSKEY gap, void** pVal, int32_t* pVLen);
|
||||||
int32_t streamStateSessionPut(SStreamState* pState, const SSessionKey* key, void* value, int32_t vLen);
|
int32_t streamStateSessionPut(SStreamState* pState, const SSessionKey* key, void* value, int32_t vLen);
|
||||||
int32_t streamStateSessionGet(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen);
|
int32_t streamStateSessionGet(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen);
|
||||||
|
@ -65,7 +65,7 @@ SStreamStateCur* streamStateSessionSeekKeyNext(SStreamState* pState, const SSess
|
||||||
SStreamStateCur* streamStateSessionSeekKeyCurrentPrev(SStreamState* pState, const SSessionKey* key);
|
SStreamStateCur* streamStateSessionSeekKeyCurrentPrev(SStreamState* pState, const SSessionKey* key);
|
||||||
SStreamStateCur* streamStateSessionSeekKeyCurrentNext(SStreamState* pState, const SSessionKey* key);
|
SStreamStateCur* streamStateSessionSeekKeyCurrentNext(SStreamState* pState, const SSessionKey* key);
|
||||||
|
|
||||||
//state window
|
// state window
|
||||||
int32_t streamStateStateAddIfNotExist(SStreamState* pState, SSessionKey* key, char* pKeyData, int32_t keyDataLen,
|
int32_t streamStateStateAddIfNotExist(SStreamState* pState, SSessionKey* key, char* pKeyData, int32_t keyDataLen,
|
||||||
state_key_cmpr_fn fn, void** pVal, int32_t* pVLen);
|
state_key_cmpr_fn fn, void** pVal, int32_t* pVLen);
|
||||||
|
|
||||||
|
@ -96,6 +96,9 @@ int32_t streamStatePutParName(SStreamState* pState, int64_t groupId, const char*
|
||||||
int32_t streamStateGetParName(SStreamState* pState, int64_t groupId, void** pVal);
|
int32_t streamStateGetParName(SStreamState* pState, int64_t groupId, void** pVal);
|
||||||
|
|
||||||
void streamStateReloadInfo(SStreamState* pState, TSKEY ts);
|
void streamStateReloadInfo(SStreamState* pState, TSKEY ts);
|
||||||
|
|
||||||
|
void streamStateCopyBackend(SStreamState* src, SStreamState* dst);
|
||||||
|
|
||||||
SStreamStateCur* createStreamStateCursor();
|
SStreamStateCur* createStreamStateCursor();
|
||||||
|
|
||||||
/***compare func **/
|
/***compare func **/
|
||||||
|
|
|
@ -66,6 +66,7 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t nextProcessVer
|
||||||
} else {
|
} else {
|
||||||
sndDebug("s-task:%s state:%p", pTask->id.idStr, pTask->pState);
|
sndDebug("s-task:%s state:%p", pTask->id.idStr, pTask->pState);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
int32_t numOfVgroups = (int32_t)taosArrayGetSize(pTask->upstreamInfo.pList);
|
int32_t numOfVgroups = (int32_t)taosArrayGetSize(pTask->upstreamInfo.pList);
|
||||||
SReadHandle handle = {
|
SReadHandle handle = {
|
||||||
|
|
|
@ -14,8 +14,8 @@
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "storageapi.h"
|
#include "storageapi.h"
|
||||||
#include "tstreamUpdate.h"
|
|
||||||
#include "streamState.h"
|
#include "streamState.h"
|
||||||
|
#include "tstreamUpdate.h"
|
||||||
|
|
||||||
static void initStateStoreAPI(SStateStore* pStore);
|
static void initStateStoreAPI(SStateStore* pStore);
|
||||||
static void initFunctionStateStore(SFunctionStateStore* pStore);
|
static void initFunctionStateStore(SFunctionStateStore* pStore);
|
||||||
|
@ -100,9 +100,10 @@ void initStateStoreAPI(SStateStore* pStore) {
|
||||||
pStore->streamStateClose = streamStateClose;
|
pStore->streamStateClose = streamStateClose;
|
||||||
pStore->streamStateBegin = streamStateBegin;
|
pStore->streamStateBegin = streamStateBegin;
|
||||||
pStore->streamStateCommit = streamStateCommit;
|
pStore->streamStateCommit = streamStateCommit;
|
||||||
pStore->streamStateDestroy= streamStateDestroy;
|
pStore->streamStateDestroy = streamStateDestroy;
|
||||||
pStore->streamStateDeleteCheckPoint = streamStateDeleteCheckPoint;
|
pStore->streamStateDeleteCheckPoint = streamStateDeleteCheckPoint;
|
||||||
pStore->streamStateReloadInfo = streamStateReloadInfo;
|
pStore->streamStateReloadInfo = streamStateReloadInfo;
|
||||||
|
pStore->streamStateCopyBackend = streamStateCopyBackend;
|
||||||
}
|
}
|
||||||
|
|
||||||
void initFunctionStateStore(SFunctionStateStore* pStore) {
|
void initFunctionStateStore(SFunctionStateStore* pStore) {
|
||||||
|
|
|
@ -13,9 +13,9 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "tstream.h"
|
|
||||||
#include "tmsgcb.h"
|
#include "tmsgcb.h"
|
||||||
#include "tq.h"
|
#include "tq.h"
|
||||||
|
#include "tstream.h"
|
||||||
|
|
||||||
typedef struct STaskUpdateEntry {
|
typedef struct STaskUpdateEntry {
|
||||||
int64_t streamId;
|
int64_t streamId;
|
||||||
|
@ -24,7 +24,7 @@ typedef struct STaskUpdateEntry {
|
||||||
} STaskUpdateEntry;
|
} STaskUpdateEntry;
|
||||||
|
|
||||||
int32_t tqStreamTaskStartAsync(SStreamMeta* pMeta, SMsgCb* cb, bool restart) {
|
int32_t tqStreamTaskStartAsync(SStreamMeta* pMeta, SMsgCb* cb, bool restart) {
|
||||||
int32_t vgId = pMeta->vgId;
|
int32_t vgId = pMeta->vgId;
|
||||||
|
|
||||||
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
|
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
|
||||||
if (numOfTasks == 0) {
|
if (numOfTasks == 0) {
|
||||||
|
@ -42,7 +42,7 @@ int32_t tqStreamTaskStartAsync(SStreamMeta* pMeta, SMsgCb* cb, bool restart) {
|
||||||
tqDebug("vgId:%d start all %d stream task(s) async", vgId, numOfTasks);
|
tqDebug("vgId:%d start all %d stream task(s) async", vgId, numOfTasks);
|
||||||
pRunReq->head.vgId = vgId;
|
pRunReq->head.vgId = vgId;
|
||||||
pRunReq->streamId = 0;
|
pRunReq->streamId = 0;
|
||||||
pRunReq->taskId = restart? STREAM_EXEC_RESTART_ALL_TASKS_ID:STREAM_EXEC_START_ALL_TASKS_ID;
|
pRunReq->taskId = restart ? STREAM_EXEC_RESTART_ALL_TASKS_ID : STREAM_EXEC_START_ALL_TASKS_ID;
|
||||||
|
|
||||||
SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)};
|
SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)};
|
||||||
tmsgPutToQueue(cb, STREAM_QUEUE, &msg);
|
tmsgPutToQueue(cb, STREAM_QUEUE, &msg);
|
||||||
|
@ -50,10 +50,10 @@ int32_t tqStreamTaskStartAsync(SStreamMeta* pMeta, SMsgCb* cb, bool restart) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pMsg, bool restored) {
|
int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pMsg, bool restored) {
|
||||||
int32_t vgId = pMeta->vgId;
|
int32_t vgId = pMeta->vgId;
|
||||||
char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
|
char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
|
||||||
int32_t len = pMsg->contLen - sizeof(SMsgHead);
|
int32_t len = pMsg->contLen - sizeof(SMsgHead);
|
||||||
SRpcMsg rsp = {.info = pMsg->info, .code = TSDB_CODE_SUCCESS};
|
SRpcMsg rsp = {.info = pMsg->info, .code = TSDB_CODE_SUCCESS};
|
||||||
|
|
||||||
SStreamTaskNodeUpdateMsg req = {0};
|
SStreamTaskNodeUpdateMsg req = {0};
|
||||||
|
|
||||||
|
@ -72,7 +72,7 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM
|
||||||
streamMetaWLock(pMeta);
|
streamMetaWLock(pMeta);
|
||||||
|
|
||||||
// the task epset may be updated again and again, when replaying the WAL, the task may be in stop status.
|
// the task epset may be updated again and again, when replaying the WAL, the task may be in stop status.
|
||||||
STaskId id = {.streamId = req.streamId, .taskId = req.taskId};
|
STaskId id = {.streamId = req.streamId, .taskId = req.taskId};
|
||||||
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
|
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
|
||||||
if (ppTask == NULL || *ppTask == NULL) {
|
if (ppTask == NULL || *ppTask == NULL) {
|
||||||
tqError("vgId:%d failed to acquire task:0x%x when handling update, it may have been dropped already", pMeta->vgId,
|
tqError("vgId:%d failed to acquire task:0x%x when handling update, it may have been dropped already", pMeta->vgId,
|
||||||
|
@ -96,7 +96,7 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM
|
||||||
}
|
}
|
||||||
|
|
||||||
STaskUpdateEntry entry = {.streamId = req.streamId, .taskId = req.taskId, .transId = req.transId};
|
STaskUpdateEntry entry = {.streamId = req.streamId, .taskId = req.taskId, .transId = req.transId};
|
||||||
void* exist = taosHashGet(pMeta->updateInfo.pTasks, &entry, sizeof(STaskUpdateEntry));
|
void* exist = taosHashGet(pMeta->updateInfo.pTasks, &entry, sizeof(STaskUpdateEntry));
|
||||||
if (exist != NULL) {
|
if (exist != NULL) {
|
||||||
tqDebug("s-task:%s (vgId:%d) already update in trans:%d, discard the nodeEp update msg", pTask->id.idStr, vgId,
|
tqDebug("s-task:%s (vgId:%d) already update in trans:%d, discard the nodeEp update msg", pTask->id.idStr, vgId,
|
||||||
req.transId);
|
req.transId);
|
||||||
|
@ -166,7 +166,8 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM
|
||||||
streamMetaWUnLock(pMeta);
|
streamMetaWUnLock(pMeta);
|
||||||
} else {
|
} else {
|
||||||
if (!restored) {
|
if (!restored) {
|
||||||
tqDebug("vgId:%d vnode restore not completed, not restart the tasks, clear the start after nodeUpdate flag", vgId);
|
tqDebug("vgId:%d vnode restore not completed, not restart the tasks, clear the start after nodeUpdate flag",
|
||||||
|
vgId);
|
||||||
pMeta->startInfo.tasksWillRestart = 0;
|
pMeta->startInfo.tasksWillRestart = 0;
|
||||||
streamMetaWUnLock(pMeta);
|
streamMetaWUnLock(pMeta);
|
||||||
} else {
|
} else {
|
||||||
|
@ -238,7 +239,7 @@ int32_t tqStreamTaskProcessDispatchReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
|
||||||
|
|
||||||
SStreamDispatchReq req = {0};
|
SStreamDispatchReq req = {0};
|
||||||
|
|
||||||
SDecoder decoder;
|
SDecoder decoder;
|
||||||
tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
|
tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
|
||||||
if (tDecodeStreamDispatchReq(&decoder, &req) < 0) {
|
if (tDecodeStreamDispatchReq(&decoder, &req) < 0) {
|
||||||
tDecoderClear(&decoder);
|
tDecoderClear(&decoder);
|
||||||
|
@ -251,7 +252,7 @@ int32_t tqStreamTaskProcessDispatchReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
|
||||||
SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, req.taskId);
|
SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, req.taskId);
|
||||||
if (pTask) {
|
if (pTask) {
|
||||||
SRpcMsg rsp = {.info = pMsg->info, .code = 0};
|
SRpcMsg rsp = {.info = pMsg->info, .code = 0};
|
||||||
if (streamProcessDispatchMsg(pTask, &req, &rsp) != 0){
|
if (streamProcessDispatchMsg(pTask, &req, &rsp) != 0) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
tDeleteStreamDispatchReq(&req);
|
tDeleteStreamDispatchReq(&req);
|
||||||
|
@ -355,8 +356,8 @@ int32_t tqStreamTaskProcessScanHistoryFinishReq(SStreamMeta* pMeta, SRpcMsg* pMs
|
||||||
|
|
||||||
SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, req.downstreamTaskId);
|
SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, req.downstreamTaskId);
|
||||||
if (pTask == NULL) {
|
if (pTask == NULL) {
|
||||||
tqError("vgId:%d process scan history finish msg, failed to find task:0x%x, it may be destroyed",
|
tqError("vgId:%d process scan history finish msg, failed to find task:0x%x, it may be destroyed", pMeta->vgId,
|
||||||
pMeta->vgId, req.downstreamTaskId);
|
req.downstreamTaskId);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -381,8 +382,8 @@ int32_t tqStreamTaskProcessScanHistoryFinishRsp(SStreamMeta* pMeta, SRpcMsg* pMs
|
||||||
|
|
||||||
SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, req.upstreamTaskId);
|
SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, req.upstreamTaskId);
|
||||||
if (pTask == NULL) {
|
if (pTask == NULL) {
|
||||||
tqError("vgId:%d process scan history finish rsp, failed to find task:0x%x, it may be destroyed",
|
tqError("vgId:%d process scan history finish rsp, failed to find task:0x%x, it may be destroyed", pMeta->vgId,
|
||||||
pMeta->vgId, req.upstreamTaskId);
|
req.upstreamTaskId);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -428,8 +429,9 @@ int32_t tqStreamTaskProcessCheckReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
|
||||||
|
|
||||||
// only the leader node handle the check request
|
// only the leader node handle the check request
|
||||||
if (pMeta->role == NODE_ROLE_FOLLOWER) {
|
if (pMeta->role == NODE_ROLE_FOLLOWER) {
|
||||||
tqError("s-task:0x%x invalid check msg from upstream:0x%x(vgId:%d), vgId:%d is follower, not handle check status msg",
|
tqError(
|
||||||
taskId, req.upstreamTaskId, req.upstreamNodeId, pMeta->vgId);
|
"s-task:0x%x invalid check msg from upstream:0x%x(vgId:%d), vgId:%d is follower, not handle check status msg",
|
||||||
|
taskId, req.upstreamTaskId, req.upstreamNodeId, pMeta->vgId);
|
||||||
rsp.status = TASK_DOWNSTREAM_NOT_LEADER;
|
rsp.status = TASK_DOWNSTREAM_NOT_LEADER;
|
||||||
} else {
|
} else {
|
||||||
SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, taskId);
|
SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, taskId);
|
||||||
|
@ -439,13 +441,14 @@ int32_t tqStreamTaskProcessCheckReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
|
||||||
|
|
||||||
char* p = NULL;
|
char* p = NULL;
|
||||||
streamTaskGetStatus(pTask, &p);
|
streamTaskGetStatus(pTask, &p);
|
||||||
tqDebug("s-task:%s status:%s, stage:%"PRId64" recv task check req(reqId:0x%" PRIx64 ") task:0x%x (vgId:%d), check_status:%d",
|
tqDebug("s-task:%s status:%s, stage:%" PRId64 " recv task check req(reqId:0x%" PRIx64
|
||||||
pTask->id.idStr, p, rsp.oldStage, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);
|
") task:0x%x (vgId:%d), check_status:%d",
|
||||||
|
pTask->id.idStr, p, rsp.oldStage, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);
|
||||||
} else {
|
} else {
|
||||||
rsp.status = TASK_DOWNSTREAM_NOT_READY;
|
rsp.status = TASK_DOWNSTREAM_NOT_READY;
|
||||||
tqDebug("tq recv task check(taskId:0x%" PRIx64 "-0x%x not built yet) req(reqId:0x%" PRIx64
|
tqDebug("tq recv task check(taskId:0x%" PRIx64 "-0x%x not built yet) req(reqId:0x%" PRIx64
|
||||||
") from task:0x%x (vgId:%d), rsp check_status %d",
|
") from task:0x%x (vgId:%d), rsp check_status %d",
|
||||||
req.streamId, taskId, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);
|
req.streamId, taskId, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -472,7 +475,7 @@ int32_t tqStreamTaskProcessCheckRsp(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLe
|
||||||
|
|
||||||
tDecoderClear(&decoder);
|
tDecoderClear(&decoder);
|
||||||
tqDebug("tq task:0x%x (vgId:%d) recv check rsp(reqId:0x%" PRIx64 ") from 0x%x (vgId:%d) status %d",
|
tqDebug("tq task:0x%x (vgId:%d) recv check rsp(reqId:0x%" PRIx64 ") from 0x%x (vgId:%d) status %d",
|
||||||
rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.status);
|
rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.status);
|
||||||
|
|
||||||
if (!isLeader) {
|
if (!isLeader) {
|
||||||
streamMetaUpdateTaskDownstreamStatus(pMeta, rsp.streamId, rsp.upstreamTaskId, 0, taosGetTimestampMs(), false);
|
streamMetaUpdateTaskDownstreamStatus(pMeta, rsp.streamId, rsp.upstreamTaskId, 0, taosGetTimestampMs(), false);
|
||||||
|
@ -485,7 +488,7 @@ int32_t tqStreamTaskProcessCheckRsp(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLe
|
||||||
if (pTask == NULL) {
|
if (pTask == NULL) {
|
||||||
streamMetaUpdateTaskDownstreamStatus(pMeta, rsp.streamId, rsp.upstreamTaskId, 0, taosGetTimestampMs(), false);
|
streamMetaUpdateTaskDownstreamStatus(pMeta, rsp.streamId, rsp.upstreamTaskId, 0, taosGetTimestampMs(), false);
|
||||||
tqError("tq failed to locate the stream task:0x%" PRIx64 "-0x%x (vgId:%d), it may have been destroyed or stopped",
|
tqError("tq failed to locate the stream task:0x%" PRIx64 "-0x%x (vgId:%d), it may have been destroyed or stopped",
|
||||||
rsp.streamId, rsp.upstreamTaskId, vgId);
|
rsp.streamId, rsp.upstreamTaskId, vgId);
|
||||||
terrno = TSDB_CODE_STREAM_TASK_NOT_EXIST;
|
terrno = TSDB_CODE_STREAM_TASK_NOT_EXIST;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -496,10 +499,10 @@ int32_t tqStreamTaskProcessCheckRsp(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLe
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqStreamTaskProcessCheckpointReadyMsg(SStreamMeta* pMeta, SRpcMsg* pMsg) {
|
int32_t tqStreamTaskProcessCheckpointReadyMsg(SStreamMeta* pMeta, SRpcMsg* pMsg) {
|
||||||
int32_t vgId = pMeta->vgId;
|
int32_t vgId = pMeta->vgId;
|
||||||
char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
|
char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
|
||||||
int32_t len = pMsg->contLen - sizeof(SMsgHead);
|
int32_t len = pMsg->contLen - sizeof(SMsgHead);
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
SStreamCheckpointReadyMsg req = {0};
|
SStreamCheckpointReadyMsg req = {0};
|
||||||
|
|
||||||
|
@ -526,7 +529,8 @@ int32_t tqStreamTaskProcessCheckpointReadyMsg(SStreamMeta* pMeta, SRpcMsg* pMsg)
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, int64_t sversion, char* msg, int32_t msgLen, bool isLeader, bool restored) {
|
int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, int64_t sversion, char* msg, int32_t msgLen, bool isLeader,
|
||||||
|
bool restored) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int32_t vgId = pMeta->vgId;
|
int32_t vgId = pMeta->vgId;
|
||||||
|
|
||||||
|
@ -538,7 +542,7 @@ int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, int64_t sversion, char*
|
||||||
tqDebug("vgId:%d receive new stream task deploy msg, start to build stream task", vgId);
|
tqDebug("vgId:%d receive new stream task deploy msg, start to build stream task", vgId);
|
||||||
|
|
||||||
// 1.deserialize msg and build task
|
// 1.deserialize msg and build task
|
||||||
int32_t size = sizeof(SStreamTask);
|
int32_t size = sizeof(SStreamTask);
|
||||||
SStreamTask* pTask = taosMemoryCalloc(1, size);
|
SStreamTask* pTask = taosMemoryCalloc(1, size);
|
||||||
if (pTask == NULL) {
|
if (pTask == NULL) {
|
||||||
tqError("vgId:%d failed to create stream task due to out of memory, alloc size:%d", vgId, size);
|
tqError("vgId:%d failed to create stream task due to out of memory, alloc size:%d", vgId, size);
|
||||||
|
@ -566,7 +570,8 @@ int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, int64_t sversion, char*
|
||||||
streamMetaWUnLock(pMeta);
|
streamMetaWUnLock(pMeta);
|
||||||
|
|
||||||
if (code < 0) {
|
if (code < 0) {
|
||||||
tqError("failed to add s-task:0x%x into vgId:%d meta, total:%d, code:%s", vgId, taskId, numOfTasks, tstrerror(code));
|
tqError("failed to add s-task:0x%x into vgId:%d meta, total:%d, code:%s", vgId, taskId, numOfTasks,
|
||||||
|
tstrerror(code));
|
||||||
tFreeStreamTask(pTask);
|
tFreeStreamTask(pTask);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -603,7 +608,7 @@ int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, int64_t sversion, char*
|
||||||
int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen) {
|
int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen) {
|
||||||
SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg;
|
SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg;
|
||||||
|
|
||||||
int32_t vgId = pMeta->vgId;
|
int32_t vgId = pMeta->vgId;
|
||||||
tqDebug("vgId:%d receive msg to drop s-task:0x%x", vgId, pReq->taskId);
|
tqDebug("vgId:%d receive msg to drop s-task:0x%x", vgId, pReq->taskId);
|
||||||
|
|
||||||
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId);
|
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId);
|
||||||
|
@ -634,8 +639,8 @@ int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t startStreamTasks(SStreamMeta* pMeta) {
|
int32_t startStreamTasks(SStreamMeta* pMeta) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
int32_t vgId = pMeta->vgId;
|
int32_t vgId = pMeta->vgId;
|
||||||
|
|
||||||
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
|
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
|
||||||
tqDebug("vgId:%d start to check all %d stream task(s) downstream status", vgId, numOfTasks);
|
tqDebug("vgId:%d start to check all %d stream task(s) downstream status", vgId, numOfTasks);
|
||||||
|
@ -679,7 +684,7 @@ int32_t startStreamTasks(SStreamMeta* pMeta) {
|
||||||
}
|
}
|
||||||
|
|
||||||
EStreamTaskEvent event = (HAS_RELATED_FILLHISTORY_TASK(pTask)) ? TASK_EVENT_INIT_STREAM_SCANHIST : TASK_EVENT_INIT;
|
EStreamTaskEvent event = (HAS_RELATED_FILLHISTORY_TASK(pTask)) ? TASK_EVENT_INIT_STREAM_SCANHIST : TASK_EVENT_INIT;
|
||||||
int32_t ret = streamTaskHandleEvent(pTask->status.pSM, event);
|
int32_t ret = streamTaskHandleEvent(pTask->status.pSM, event);
|
||||||
if (ret != TSDB_CODE_SUCCESS) {
|
if (ret != TSDB_CODE_SUCCESS) {
|
||||||
code = ret;
|
code = ret;
|
||||||
}
|
}
|
||||||
|
@ -692,8 +697,8 @@ int32_t startStreamTasks(SStreamMeta* pMeta) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t resetStreamTaskStatus(SStreamMeta* pMeta) {
|
int32_t resetStreamTaskStatus(SStreamMeta* pMeta) {
|
||||||
int32_t vgId = pMeta->vgId;
|
int32_t vgId = pMeta->vgId;
|
||||||
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
|
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
|
||||||
|
|
||||||
tqDebug("vgId:%d reset all %d stream task(s) status to be uninit", vgId, numOfTasks);
|
tqDebug("vgId:%d reset all %d stream task(s) status to be uninit", vgId, numOfTasks);
|
||||||
if (numOfTasks == 0) {
|
if (numOfTasks == 0) {
|
||||||
|
@ -703,7 +708,7 @@ int32_t resetStreamTaskStatus(SStreamMeta* pMeta) {
|
||||||
for (int32_t i = 0; i < numOfTasks; ++i) {
|
for (int32_t i = 0; i < numOfTasks; ++i) {
|
||||||
SStreamTaskId* pTaskId = taosArrayGet(pMeta->pTaskList, i);
|
SStreamTaskId* pTaskId = taosArrayGet(pMeta->pTaskList, i);
|
||||||
|
|
||||||
STaskId id = {.streamId = pTaskId->streamId, .taskId = pTaskId->taskId};
|
STaskId id = {.streamId = pTaskId->streamId, .taskId = pTaskId->taskId};
|
||||||
SStreamTask** pTask = taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
|
SStreamTask** pTask = taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
|
||||||
streamTaskResetStatus(*pTask);
|
streamTaskResetStatus(*pTask);
|
||||||
}
|
}
|
||||||
|
@ -716,7 +721,7 @@ static int32_t restartStreamTasks(SStreamMeta* pMeta, bool isLeader) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int64_t st = taosGetTimestampMs();
|
int64_t st = taosGetTimestampMs();
|
||||||
|
|
||||||
while(1) {
|
while (1) {
|
||||||
int32_t startVal = atomic_val_compare_exchange_32(&pMeta->startInfo.taskStarting, 0, 1);
|
int32_t startVal = atomic_val_compare_exchange_32(&pMeta->startInfo.taskStarting, 0, 1);
|
||||||
if (startVal == 0) {
|
if (startVal == 0) {
|
||||||
break;
|
break;
|
||||||
|
@ -739,7 +744,7 @@ static int32_t restartStreamTasks(SStreamMeta* pMeta, bool isLeader) {
|
||||||
streamMetaClear(pMeta);
|
streamMetaClear(pMeta);
|
||||||
|
|
||||||
int64_t el = taosGetTimestampMs() - st;
|
int64_t el = taosGetTimestampMs() - st;
|
||||||
tqInfo("vgId:%d close&reload state elapsed time:%.3fs", vgId, el/1000.);
|
tqInfo("vgId:%d close&reload state elapsed time:%.3fs", vgId, el / 1000.);
|
||||||
|
|
||||||
code = streamMetaLoadAllTasks(pMeta);
|
code = streamMetaLoadAllTasks(pMeta);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
@ -780,11 +785,11 @@ int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLead
|
||||||
}
|
}
|
||||||
|
|
||||||
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, taskId);
|
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, taskId);
|
||||||
if (pTask != NULL) { // even in halt status, the data in inputQ must be processed
|
if (pTask != NULL) { // even in halt status, the data in inputQ must be processed
|
||||||
char* p = NULL;
|
char* p = NULL;
|
||||||
if (streamTaskReadyToRun(pTask, &p)) {
|
if (streamTaskReadyToRun(pTask, &p)) {
|
||||||
tqDebug("vgId:%d s-task:%s start to process block from inputQ, next checked ver:%" PRId64, vgId, pTask->id.idStr,
|
tqDebug("vgId:%d s-task:%s start to process block from inputQ, next checked ver:%" PRId64, vgId, pTask->id.idStr,
|
||||||
pTask->chkInfo.nextProcessVer);
|
pTask->chkInfo.nextProcessVer);
|
||||||
streamExecTask(pTask);
|
streamExecTask(pTask);
|
||||||
} else {
|
} else {
|
||||||
int8_t status = streamTaskSetSchedStatusInactive(pTask);
|
int8_t status = streamTaskSetSchedStatusInactive(pTask);
|
||||||
|
@ -800,5 +805,3 @@ int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLead
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -215,6 +215,7 @@ void initStateStoreAPI(SStateStore* pStore) {
|
||||||
pStore->streamStateDestroy = streamStateDestroy;
|
pStore->streamStateDestroy = streamStateDestroy;
|
||||||
pStore->streamStateDeleteCheckPoint = streamStateDeleteCheckPoint;
|
pStore->streamStateDeleteCheckPoint = streamStateDeleteCheckPoint;
|
||||||
pStore->streamStateReloadInfo = streamStateReloadInfo;
|
pStore->streamStateReloadInfo = streamStateReloadInfo;
|
||||||
|
pStore->streamStateCopyBackend = streamStateCopyBackend;
|
||||||
}
|
}
|
||||||
|
|
||||||
void initMetaReaderAPI(SStoreMetaReader* pMetaReader) {
|
void initMetaReaderAPI(SStoreMetaReader* pMetaReader) {
|
||||||
|
|
|
@ -408,6 +408,11 @@ void destroyStreamFinalIntervalOperatorInfo(void* param) {
|
||||||
taosArrayDestroy(pInfo->pDelWins);
|
taosArrayDestroy(pInfo->pDelWins);
|
||||||
blockDataDestroy(pInfo->pDelRes);
|
blockDataDestroy(pInfo->pDelRes);
|
||||||
pInfo->stateStore.streamFileStateDestroy(pInfo->pState->pFileState);
|
pInfo->stateStore.streamFileStateDestroy(pInfo->pState->pFileState);
|
||||||
|
|
||||||
|
if (pInfo->pState->dump == 1) {
|
||||||
|
taosMemoryFreeClear(pInfo->pState->pTdbState->pOwner);
|
||||||
|
taosMemoryFreeClear(pInfo->pState->pTdbState);
|
||||||
|
}
|
||||||
taosMemoryFreeClear(pInfo->pState);
|
taosMemoryFreeClear(pInfo->pState);
|
||||||
|
|
||||||
nodesDestroyNode((SNode*)pInfo->pPhyNode);
|
nodesDestroyNode((SNode*)pInfo->pPhyNode);
|
||||||
|
@ -1462,7 +1467,11 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
|
||||||
initBasicInfo(&pInfo->binfo, pResBlock);
|
initBasicInfo(&pInfo->binfo, pResBlock);
|
||||||
|
|
||||||
pInfo->pState = taosMemoryCalloc(1, sizeof(SStreamState));
|
pInfo->pState = taosMemoryCalloc(1, sizeof(SStreamState));
|
||||||
*(pInfo->pState) = *(pTaskInfo->streamInfo.pState);
|
qInfo("open state %p", pInfo->pState);
|
||||||
|
pAPI->stateStore.streamStateCopyBackend(pTaskInfo->streamInfo.pState, pInfo->pState);
|
||||||
|
//*(pInfo->pState) = *(pTaskInfo->streamInfo.pState);
|
||||||
|
|
||||||
|
qInfo("copy state %p to %p", pTaskInfo->streamInfo.pState, pInfo->pState);
|
||||||
|
|
||||||
pAPI->stateStore.streamStateSetNumber(pInfo->pState, -1);
|
pAPI->stateStore.streamStateSetNumber(pInfo->pState, -1);
|
||||||
int32_t code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str,
|
int32_t code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str,
|
||||||
|
@ -3338,7 +3347,8 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
|
||||||
SColumnInfoData* pKeyColInfo = taosArrayGet(pSDataBlock->pDataBlock, pInfo->stateCol.slotId);
|
SColumnInfoData* pKeyColInfo = taosArrayGet(pSDataBlock->pDataBlock, pInfo->stateCol.slotId);
|
||||||
for (int32_t i = 0; i < rows; i += winRows) {
|
for (int32_t i = 0; i < rows; i += winRows) {
|
||||||
if (pInfo->ignoreExpiredData && checkExpiredData(&pInfo->streamAggSup.stateStore, pInfo->streamAggSup.pUpdateInfo,
|
if (pInfo->ignoreExpiredData && checkExpiredData(&pInfo->streamAggSup.stateStore, pInfo->streamAggSup.pUpdateInfo,
|
||||||
&pInfo->twAggSup, pSDataBlock->info.id.uid, tsCols[i]) || colDataIsNull_s(pKeyColInfo, i)) {
|
&pInfo->twAggSup, pSDataBlock->info.id.uid, tsCols[i]) ||
|
||||||
|
colDataIsNull_s(pKeyColInfo, i)) {
|
||||||
i++;
|
i++;
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
|
@ -91,8 +91,8 @@ int stateKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, int kLen2) {
|
||||||
}
|
}
|
||||||
|
|
||||||
SStreamState* streamStateOpen(char* path, void* pTask, bool specPath, int32_t szPage, int32_t pages) {
|
SStreamState* streamStateOpen(char* path, void* pTask, bool specPath, int32_t szPage, int32_t pages) {
|
||||||
stDebug("open stream state, %s", path);
|
|
||||||
SStreamState* pState = taosMemoryCalloc(1, sizeof(SStreamState));
|
SStreamState* pState = taosMemoryCalloc(1, sizeof(SStreamState));
|
||||||
|
stDebug("open stream state %p, %s", pState, path);
|
||||||
if (pState == NULL) {
|
if (pState == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -211,7 +211,7 @@ int32_t streamStateDelTaskDb(SStreamState* pState) {
|
||||||
SStreamTask* pTask = pState->pTdbState->pOwner;
|
SStreamTask* pTask = pState->pTdbState->pOwner;
|
||||||
taskDbRemoveRef(pTask->pBackend);
|
taskDbRemoveRef(pTask->pBackend);
|
||||||
taosMemoryFree(pTask);
|
taosMemoryFree(pTask);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
void streamStateClose(SStreamState* pState, bool remove) {
|
void streamStateClose(SStreamState* pState, bool remove) {
|
||||||
SStreamTask* pTask = pState->pTdbState->pOwner;
|
SStreamTask* pTask = pState->pTdbState->pOwner;
|
||||||
|
@ -374,8 +374,8 @@ int32_t streamStateClear(SStreamState* pState) {
|
||||||
streamStatePut(pState, &key, NULL, 0);
|
streamStatePut(pState, &key, NULL, 0);
|
||||||
while (1) {
|
while (1) {
|
||||||
SStreamStateCur* pCur = streamStateSeekKeyNext(pState, &key);
|
SStreamStateCur* pCur = streamStateSeekKeyNext(pState, &key);
|
||||||
SWinKey delKey = {0};
|
SWinKey delKey = {0};
|
||||||
int32_t code = streamStateGetKVByCur(pCur, &delKey, NULL, 0);
|
int32_t code = streamStateGetKVByCur(pCur, &delKey, NULL, 0);
|
||||||
streamStateFreeCur(pCur);
|
streamStateFreeCur(pCur);
|
||||||
if (code == 0) {
|
if (code == 0) {
|
||||||
streamStateDel(pState, &delKey);
|
streamStateDel(pState, &delKey);
|
||||||
|
@ -493,7 +493,7 @@ int32_t streamStateGetKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const void**
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
const SStateKey* pKTmp = NULL;
|
const SStateKey* pKTmp = NULL;
|
||||||
int32_t kLen;
|
int32_t kLen;
|
||||||
if (tdbTbcGet(pCur->pCur, (const void**)&pKTmp, &kLen, pVal, pVLen) < 0) {
|
if (tdbTbcGet(pCur->pCur, (const void**)&pKTmp, &kLen, pVal, pVLen) < 0) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -513,7 +513,7 @@ int32_t streamStateFillGetKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const vo
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
const SWinKey* pKTmp = NULL;
|
const SWinKey* pKTmp = NULL;
|
||||||
int32_t kLen;
|
int32_t kLen;
|
||||||
if (tdbTbcGet(pCur->pCur, (const void**)&pKTmp, &kLen, pVal, pVLen) < 0) {
|
if (tdbTbcGet(pCur->pCur, (const void**)&pKTmp, &kLen, pVal, pVLen) < 0) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -530,7 +530,7 @@ int32_t streamStateGetGroupKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const v
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
uint64_t groupId = pKey->groupId;
|
uint64_t groupId = pKey->groupId;
|
||||||
int32_t code = streamStateFillGetKVByCur(pCur, pKey, pVal, pVLen);
|
int32_t code = streamStateFillGetKVByCur(pCur, pKey, pVal, pVLen);
|
||||||
if (code == 0) {
|
if (code == 0) {
|
||||||
if (pKey->groupId == groupId) {
|
if (pKey->groupId == groupId) {
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -555,7 +555,7 @@ SStreamStateCur* streamStateSeekKeyNext(SStreamState* pState, const SWinKey* key
|
||||||
}
|
}
|
||||||
|
|
||||||
SStateKey sKey = {.key = *key, .opNum = pState->number};
|
SStateKey sKey = {.key = *key, .opNum = pState->number};
|
||||||
int32_t c = 0;
|
int32_t c = 0;
|
||||||
if (tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateKey), &c) < 0) {
|
if (tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateKey), &c) < 0) {
|
||||||
streamStateFreeCur(pCur);
|
streamStateFreeCur(pCur);
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -710,7 +710,8 @@ int32_t streamStateSessionPut(SStreamState* pState, const SSessionKey* key, void
|
||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamStateSessionAllocWinBuffByNextPosition(SStreamState* pState, SStreamStateCur* pCur, const SSessionKey* pKey, void** pVal, int32_t* pVLen) {
|
int32_t streamStateSessionAllocWinBuffByNextPosition(SStreamState* pState, SStreamStateCur* pCur,
|
||||||
|
const SSessionKey* pKey, void** pVal, int32_t* pVLen) {
|
||||||
#ifdef USE_ROCKSDB
|
#ifdef USE_ROCKSDB
|
||||||
return allocSessioncWinBuffByNextPosition(pState->pFileState, pCur, pKey, pVal, pVLen);
|
return allocSessioncWinBuffByNextPosition(pState->pFileState, pCur, pKey, pVal, pVLen);
|
||||||
#else
|
#else
|
||||||
|
@ -724,9 +725,9 @@ int32_t streamStateSessionGet(SStreamState* pState, SSessionKey* key, void** pVa
|
||||||
#else
|
#else
|
||||||
|
|
||||||
SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentNext(pState, key);
|
SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentNext(pState, key);
|
||||||
SSessionKey resKey = *key;
|
SSessionKey resKey = *key;
|
||||||
void* tmp = NULL;
|
void* tmp = NULL;
|
||||||
int32_t code = streamStateSessionGetKVByCur(pCur, &resKey, &tmp, pVLen);
|
int32_t code = streamStateSessionGetKVByCur(pCur, &resKey, &tmp, pVLen);
|
||||||
if (code == 0) {
|
if (code == 0) {
|
||||||
if (key->win.skey != resKey.win.skey) {
|
if (key->win.skey != resKey.win.skey) {
|
||||||
code = -1;
|
code = -1;
|
||||||
|
@ -767,7 +768,7 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentPrev(SStreamState* pState, cons
|
||||||
}
|
}
|
||||||
|
|
||||||
SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
|
SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
|
||||||
int32_t c = 0;
|
int32_t c = 0;
|
||||||
if (tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateSessionKey), &c) < 0) {
|
if (tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateSessionKey), &c) < 0) {
|
||||||
streamStateFreeCur(pCur);
|
streamStateFreeCur(pCur);
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -798,7 +799,7 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentNext(SStreamState* pState, cons
|
||||||
}
|
}
|
||||||
|
|
||||||
SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
|
SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
|
||||||
int32_t c = 0;
|
int32_t c = 0;
|
||||||
if (tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateSessionKey), &c) < 0) {
|
if (tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateSessionKey), &c) < 0) {
|
||||||
streamStateFreeCur(pCur);
|
streamStateFreeCur(pCur);
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -830,7 +831,7 @@ SStreamStateCur* streamStateSessionSeekKeyNext(SStreamState* pState, const SSess
|
||||||
}
|
}
|
||||||
|
|
||||||
SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
|
SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
|
||||||
int32_t c = 0;
|
int32_t c = 0;
|
||||||
if (tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateSessionKey), &c) < 0) {
|
if (tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateSessionKey), &c) < 0) {
|
||||||
streamStateFreeCur(pCur);
|
streamStateFreeCur(pCur);
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -854,7 +855,7 @@ int32_t streamStateSessionGetKVByCur(SStreamStateCur* pCur, SSessionKey* pKey, v
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
SStateSessionKey* pKTmp = NULL;
|
SStateSessionKey* pKTmp = NULL;
|
||||||
int32_t kLen;
|
int32_t kLen;
|
||||||
if (tdbTbcGet(pCur->pCur, (const void**)&pKTmp, &kLen, (const void**)pVal, pVLen) < 0) {
|
if (tdbTbcGet(pCur->pCur, (const void**)&pKTmp, &kLen, (const void**)pVal, pVLen) < 0) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -874,13 +875,13 @@ int32_t streamStateSessionClear(SStreamState* pState) {
|
||||||
sessionWinStateClear(pState->pFileState);
|
sessionWinStateClear(pState->pFileState);
|
||||||
return streamStateSessionClear_rocksdb(pState);
|
return streamStateSessionClear_rocksdb(pState);
|
||||||
#else
|
#else
|
||||||
SSessionKey key = {.win.skey = 0, .win.ekey = 0, .groupId = 0};
|
SSessionKey key = {.win.skey = 0, .win.ekey = 0, .groupId = 0};
|
||||||
SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentNext(pState, &key);
|
SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentNext(pState, &key);
|
||||||
while (1) {
|
while (1) {
|
||||||
SSessionKey delKey = {0};
|
SSessionKey delKey = {0};
|
||||||
void* buf = NULL;
|
void* buf = NULL;
|
||||||
int32_t size = 0;
|
int32_t size = 0;
|
||||||
int32_t code = streamStateSessionGetKVByCur(pCur, &delKey, &buf, &size);
|
int32_t code = streamStateSessionGetKVByCur(pCur, &delKey, &buf, &size);
|
||||||
if (code == 0 && size > 0) {
|
if (code == 0 && size > 0) {
|
||||||
memset(buf, 0, size);
|
memset(buf, 0, size);
|
||||||
streamStateSessionPut(pState, &delKey, buf, size);
|
streamStateSessionPut(pState, &delKey, buf, size);
|
||||||
|
@ -909,14 +910,14 @@ int32_t streamStateSessionGetKeyByRange(SStreamState* pState, const SSessionKey*
|
||||||
}
|
}
|
||||||
|
|
||||||
SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
|
SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
|
||||||
int32_t c = 0;
|
int32_t c = 0;
|
||||||
if (tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateSessionKey), &c) < 0) {
|
if (tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateSessionKey), &c) < 0) {
|
||||||
streamStateFreeCur(pCur);
|
streamStateFreeCur(pCur);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
SSessionKey resKey = *key;
|
SSessionKey resKey = *key;
|
||||||
int32_t code = streamStateSessionGetKVByCur(pCur, &resKey, NULL, 0);
|
int32_t code = streamStateSessionGetKVByCur(pCur, &resKey, NULL, 0);
|
||||||
if (code == 0 && sessionRangeKeyCmpr(key, &resKey) == 0) {
|
if (code == 0 && sessionRangeKeyCmpr(key, &resKey) == 0) {
|
||||||
*curKey = resKey;
|
*curKey = resKey;
|
||||||
streamStateFreeCur(pCur);
|
streamStateFreeCur(pCur);
|
||||||
|
@ -952,19 +953,19 @@ int32_t streamStateSessionAddIfNotExist(SStreamState* pState, SSessionKey* key,
|
||||||
return getSessionWinResultBuff(pState->pFileState, key, gap, pVal, pVLen);
|
return getSessionWinResultBuff(pState->pFileState, key, gap, pVal, pVLen);
|
||||||
#else
|
#else
|
||||||
// todo refactor
|
// todo refactor
|
||||||
int32_t res = 0;
|
int32_t res = 0;
|
||||||
SSessionKey originKey = *key;
|
SSessionKey originKey = *key;
|
||||||
SSessionKey searchKey = *key;
|
SSessionKey searchKey = *key;
|
||||||
searchKey.win.skey = key->win.skey - gap;
|
searchKey.win.skey = key->win.skey - gap;
|
||||||
searchKey.win.ekey = key->win.ekey + gap;
|
searchKey.win.ekey = key->win.ekey + gap;
|
||||||
int32_t valSize = *pVLen;
|
int32_t valSize = *pVLen;
|
||||||
void* tmp = tdbRealloc(NULL, valSize);
|
void* tmp = tdbRealloc(NULL, valSize);
|
||||||
if (!tmp) {
|
if (!tmp) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentPrev(pState, key);
|
SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentPrev(pState, key);
|
||||||
int32_t code = streamStateSessionGetKVByCur(pCur, key, pVal, pVLen);
|
int32_t code = streamStateSessionGetKVByCur(pCur, key, pVal, pVLen);
|
||||||
if (code == 0) {
|
if (code == 0) {
|
||||||
if (sessionRangeKeyCmpr(&searchKey, key) == 0) {
|
if (sessionRangeKeyCmpr(&searchKey, key) == 0) {
|
||||||
memcpy(tmp, *pVal, valSize);
|
memcpy(tmp, *pVal, valSize);
|
||||||
|
@ -1007,16 +1008,16 @@ int32_t streamStateStateAddIfNotExist(SStreamState* pState, SSessionKey* key, ch
|
||||||
#ifdef USE_ROCKSDB
|
#ifdef USE_ROCKSDB
|
||||||
return getStateWinResultBuff(pState->pFileState, key, pKeyData, keyDataLen, fn, pVal, pVLen);
|
return getStateWinResultBuff(pState->pFileState, key, pKeyData, keyDataLen, fn, pVal, pVLen);
|
||||||
#else
|
#else
|
||||||
int32_t res = 0;
|
int32_t res = 0;
|
||||||
SSessionKey tmpKey = *key;
|
SSessionKey tmpKey = *key;
|
||||||
int32_t valSize = *pVLen;
|
int32_t valSize = *pVLen;
|
||||||
void* tmp = tdbRealloc(NULL, valSize);
|
void* tmp = tdbRealloc(NULL, valSize);
|
||||||
if (!tmp) {
|
if (!tmp) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentPrev(pState, key);
|
SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentPrev(pState, key);
|
||||||
int32_t code = streamStateSessionGetKVByCur(pCur, key, pVal, pVLen);
|
int32_t code = streamStateSessionGetKVByCur(pCur, key, pVal, pVLen);
|
||||||
if (code == 0) {
|
if (code == 0) {
|
||||||
if (key->win.skey <= tmpKey.win.skey && tmpKey.win.ekey <= key->win.ekey) {
|
if (key->win.skey <= tmpKey.win.skey && tmpKey.win.ekey <= key->win.ekey) {
|
||||||
memcpy(tmp, *pVal, valSize);
|
memcpy(tmp, *pVal, valSize);
|
||||||
|
@ -1115,6 +1116,20 @@ int32_t streamStateDeleteCheckPoint(SStreamState* pState, TSKEY mark) {
|
||||||
|
|
||||||
void streamStateReloadInfo(SStreamState* pState, TSKEY ts) { streamFileStateReloadInfo(pState->pFileState, ts); }
|
void streamStateReloadInfo(SStreamState* pState, TSKEY ts) { streamFileStateReloadInfo(pState->pFileState, ts); }
|
||||||
|
|
||||||
|
void streamStateCopyBackend(SStreamState* src, SStreamState* dst) {
|
||||||
|
dst->pFileState = src->pFileState;
|
||||||
|
dst->parNameMap = src->parNameMap;
|
||||||
|
dst->number = src->number;
|
||||||
|
dst->taskId = src->taskId;
|
||||||
|
dst->streamId = src->streamId;
|
||||||
|
if (dst->pTdbState == NULL) {
|
||||||
|
dst->pTdbState = taosMemoryCalloc(1, sizeof(STdbState));
|
||||||
|
dst->pTdbState->pOwner = taosMemoryCalloc(1, sizeof(SStreamTask));
|
||||||
|
}
|
||||||
|
dst->dump = 1;
|
||||||
|
dst->pTdbState->pOwner->pBackend = src->pTdbState->pOwner->pBackend;
|
||||||
|
return;
|
||||||
|
}
|
||||||
SStreamStateCur* createStreamStateCursor() {
|
SStreamStateCur* createStreamStateCursor() {
|
||||||
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
|
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
|
||||||
pCur->buffIndex = -1;
|
pCur->buffIndex = -1;
|
||||||
|
|
|
@ -26,7 +26,7 @@
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 8-stream/pause_resume_test.py
|
,,y,system-test,./pytest.sh python3 ./test.py -f 8-stream/pause_resume_test.py
|
||||||
#,,n,system-test,python3 ./test.py -f 8-stream/vnode_restart.py -N 4
|
#,,n,system-test,python3 ./test.py -f 8-stream/vnode_restart.py -N 4
|
||||||
#,,n,system-test,python3 ./test.py -f 8-stream/snode_restart.py -N 4
|
#,,n,system-test,python3 ./test.py -f 8-stream/snode_restart.py -N 4
|
||||||
#,,n,system-test,python3 ./test.py -f 8-stream/snode_restart_with_checkpoint.py -N 4
|
,,n,system-test,python3 ./test.py -f 8-stream/snode_restart_with_checkpoint.py -N 4
|
||||||
|
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/tbname_vgroup.py
|
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/tbname_vgroup.py
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/stbJoin.py
|
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/stbJoin.py
|
||||||
|
|
Loading…
Reference in New Issue