diff --git a/include/libs/executor/storageapi.h b/include/libs/executor/storageapi.h index 1d40274504..6ab650d5d6 100644 --- a/include/libs/executor/storageapi.h +++ b/include/libs/executor/storageapi.h @@ -38,8 +38,8 @@ extern "C" { #define META_READER_NOLOCK 0x1 -#define STREAM_STATE_BUFF_HASH 1 -#define STREAM_STATE_BUFF_SORT 2 +#define STREAM_STATE_BUFF_HASH 1 +#define STREAM_STATE_BUFF_SORT 2 typedef struct SMeta SMeta; typedef TSKEY (*GetTsFun)(void*); @@ -102,14 +102,14 @@ typedef struct SMTbCursor { } SMTbCursor; typedef struct SMCtbCursor { - SMeta *pMeta; - void *pCur; + SMeta* pMeta; + void* pCur; tb_uid_t suid; - void *pKey; - void *pVal; + void* pKey; + void* pVal; int kLen; int vLen; - int8_t paused; + int8_t paused; int lock; } SMCtbCursor; @@ -248,22 +248,23 @@ typedef struct SStoreMeta { void* (*storeGetIndexInfo)(); void* (*getInvertIndex)(void* pVnode); // support filter and non-filter cases. [vnodeGetCtbIdList & vnodeGetCtbIdListByFilter] - int32_t (*getChildTableList)( void* pVnode, int64_t suid, SArray* list); + int32_t (*getChildTableList)(void* pVnode, int64_t suid, SArray* list); int32_t (*storeGetTableList)(void* pVnode, int8_t type, SArray* pList); void* storeGetVersionRange; void* storeGetLastTimestamp; int32_t (*getTableSchema)(void* pVnode, int64_t uid, STSchema** pSchema, int64_t* suid); // tsdbGetTableSchema - int32_t (*getNumOfChildTables)( void* pVnode, int64_t uid, int64_t* numOfTables, int32_t* numOfCols); - void (*getBasicInfo)(void* pVnode, const char** dbname, int32_t* vgId, int64_t* numOfTables, int64_t* numOfNormalTables); + int32_t (*getNumOfChildTables)(void* pVnode, int64_t uid, int64_t* numOfTables, int32_t* numOfCols); + void (*getBasicInfo)(void* pVnode, const char** dbname, int32_t* vgId, int64_t* numOfTables, + int64_t* numOfNormalTables); int64_t (*getNumOfRowsInMem)(void* pVnode); - SMCtbCursor* (*openCtbCursor)(void *pVnode, tb_uid_t uid, int lock); - int32_t (*resumeCtbCursor)(SMCtbCursor* pCtbCur, int8_t first); - void (*pauseCtbCursor)(SMCtbCursor* pCtbCur); - void (*closeCtbCursor)(SMCtbCursor *pCtbCur); - tb_uid_t (*ctbCursorNext)(SMCtbCursor* pCur); + SMCtbCursor* (*openCtbCursor)(void* pVnode, tb_uid_t uid, int lock); + int32_t (*resumeCtbCursor)(SMCtbCursor* pCtbCur, int8_t first); + void (*pauseCtbCursor)(SMCtbCursor* pCtbCur); + void (*closeCtbCursor)(SMCtbCursor* pCtbCur); + tb_uid_t (*ctbCursorNext)(SMCtbCursor* pCur); } SStoreMeta; typedef struct SStoreMetaReader { @@ -348,14 +349,14 @@ typedef struct SStateStore { const SSessionKey* pKey, void** pVal, int32_t* pVLen); SUpdateInfo* (*updateInfoInit)(int64_t interval, int32_t precision, int64_t watermark, bool igUp); - TSKEY (*updateInfoFillBlockData)(SUpdateInfo* pInfo, SSDataBlock* pBlock, int32_t primaryTsCol); - bool (*updateInfoIsUpdated)(SUpdateInfo* pInfo, uint64_t tableId, TSKEY ts); - bool (*updateInfoIsTableInserted)(SUpdateInfo* pInfo, int64_t tbUid); - bool (*isIncrementalTimeStamp)(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts); + TSKEY (*updateInfoFillBlockData)(SUpdateInfo* pInfo, SSDataBlock* pBlock, int32_t primaryTsCol); + bool (*updateInfoIsUpdated)(SUpdateInfo* pInfo, uint64_t tableId, TSKEY ts); + bool (*updateInfoIsTableInserted)(SUpdateInfo* pInfo, int64_t tbUid); + bool (*isIncrementalTimeStamp)(SUpdateInfo* pInfo, uint64_t tableId, TSKEY ts); void (*updateInfoDestroy)(SUpdateInfo* pInfo); - void (*windowSBfDelete)(SUpdateInfo *pInfo, uint64_t count); - void (*windowSBfAdd)(SUpdateInfo *pInfo, uint64_t count); + void (*windowSBfDelete)(SUpdateInfo* pInfo, uint64_t count); + void (*windowSBfAdd)(SUpdateInfo* pInfo, uint64_t count); SUpdateInfo* (*updateInfoInitP)(SInterval* pInterval, int64_t watermark, bool igUp); void (*updateInfoAddCloseWindowSBF)(SUpdateInfo* pInfo); @@ -382,6 +383,7 @@ typedef struct SStateStore { void (*streamStateDestroy)(SStreamState* pState, bool remove); int32_t (*streamStateDeleteCheckPoint)(SStreamState* pState, TSKEY mark); void (*streamStateReloadInfo)(SStreamState* pState, TSKEY ts); + void (*streamStateCopyBackend)(SStreamState* src, SStreamState* dst); } SStateStore; typedef struct SStorageAPI { diff --git a/include/libs/stream/streamState.h b/include/libs/stream/streamState.h index ba90c0dc7a..24222677a4 100644 --- a/include/libs/stream/streamState.h +++ b/include/libs/stream/streamState.h @@ -50,7 +50,7 @@ void streamStateSetNumber(SStreamState* pState, int32_t number); int32_t streamStateSaveInfo(SStreamState* pState, void* pKey, int32_t keyLen, void* pVal, int32_t vLen); int32_t streamStateGetInfo(SStreamState* pState, void* pKey, int32_t keyLen, void** pVal, int32_t* pLen); -//session window +// session window int32_t streamStateSessionAddIfNotExist(SStreamState* pState, SSessionKey* key, TSKEY gap, void** pVal, int32_t* pVLen); int32_t streamStateSessionPut(SStreamState* pState, const SSessionKey* key, void* value, int32_t vLen); int32_t streamStateSessionGet(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen); @@ -65,7 +65,7 @@ SStreamStateCur* streamStateSessionSeekKeyNext(SStreamState* pState, const SSess SStreamStateCur* streamStateSessionSeekKeyCurrentPrev(SStreamState* pState, const SSessionKey* key); SStreamStateCur* streamStateSessionSeekKeyCurrentNext(SStreamState* pState, const SSessionKey* key); -//state window +// state window int32_t streamStateStateAddIfNotExist(SStreamState* pState, SSessionKey* key, char* pKeyData, int32_t keyDataLen, state_key_cmpr_fn fn, void** pVal, int32_t* pVLen); @@ -96,6 +96,9 @@ int32_t streamStatePutParName(SStreamState* pState, int64_t groupId, const char* int32_t streamStateGetParName(SStreamState* pState, int64_t groupId, void** pVal); void streamStateReloadInfo(SStreamState* pState, TSKEY ts); + +void streamStateCopyBackend(SStreamState* src, SStreamState* dst); + SStreamStateCur* createStreamStateCursor(); /***compare func **/ diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index 380be1dd38..21d813c7c0 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -66,6 +66,7 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t nextProcessVer } else { sndDebug("s-task:%s state:%p", pTask->id.idStr, pTask->pState); } + int32_t numOfVgroups = (int32_t)taosArrayGetSize(pTask->upstreamInfo.pList); SReadHandle handle = { diff --git a/source/dnode/snode/src/snodeInitApi.c b/source/dnode/snode/src/snodeInitApi.c index d1e0aefca0..c605a8373e 100644 --- a/source/dnode/snode/src/snodeInitApi.c +++ b/source/dnode/snode/src/snodeInitApi.c @@ -14,8 +14,8 @@ */ #include "storageapi.h" -#include "tstreamUpdate.h" #include "streamState.h" +#include "tstreamUpdate.h" static void initStateStoreAPI(SStateStore* pStore); static void initFunctionStateStore(SFunctionStateStore* pStore); @@ -100,9 +100,10 @@ void initStateStoreAPI(SStateStore* pStore) { pStore->streamStateClose = streamStateClose; pStore->streamStateBegin = streamStateBegin; pStore->streamStateCommit = streamStateCommit; - pStore->streamStateDestroy= streamStateDestroy; + pStore->streamStateDestroy = streamStateDestroy; pStore->streamStateDeleteCheckPoint = streamStateDeleteCheckPoint; pStore->streamStateReloadInfo = streamStateReloadInfo; + pStore->streamStateCopyBackend = streamStateCopyBackend; } void initFunctionStateStore(SFunctionStateStore* pStore) { diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index b1d49bf31b..1be2e7b79c 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -736,15 +736,15 @@ static int32_t restartStreamTasks(SStreamMeta* pMeta, bool isLeader) { } streamMetaWLock(pMeta); - code = streamMetaReopen(pMeta); - if (code != TSDB_CODE_SUCCESS) { - tqError("vgId:%d failed to reopen stream meta", vgId); - streamMetaWUnLock(pMeta); - code = terrno; - return code; - } + // code = streamMetaReopen(pMeta); + // if (code != TSDB_CODE_SUCCESS) { + // tqError("vgId:%d failed to reopen stream meta", vgId); + // streamMetaWUnLock(pMeta); + // code = terrno; + // return code; + // } - streamMetaInitBackend(pMeta); + // streamMetaInitBackend(pMeta); int64_t el = taosGetTimestampMs() - st; tqInfo("vgId:%d close&reload state elapsed time:%.3fs", vgId, el/1000.); diff --git a/source/dnode/vnode/src/vnd/vnodeInitApi.c b/source/dnode/vnode/src/vnd/vnodeInitApi.c index 6584b7072f..f0bc32c853 100644 --- a/source/dnode/vnode/src/vnd/vnodeInitApi.c +++ b/source/dnode/vnode/src/vnd/vnodeInitApi.c @@ -212,6 +212,7 @@ void initStateStoreAPI(SStateStore* pStore) { pStore->streamStateDestroy = streamStateDestroy; pStore->streamStateDeleteCheckPoint = streamStateDeleteCheckPoint; pStore->streamStateReloadInfo = streamStateReloadInfo; + pStore->streamStateCopyBackend = streamStateCopyBackend; } void initMetaReaderAPI(SStoreMetaReader* pMetaReader) { diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index 3dfc92d953..8cd441798e 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -1465,7 +1465,9 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, initBasicInfo(&pInfo->binfo, pResBlock); pInfo->pState = taosMemoryCalloc(1, sizeof(SStreamState)); + // qError("open state %p", pInfo->pState); *(pInfo->pState) = *(pTaskInfo->streamInfo.pState); + // qError("copy state %p to %p", pTaskInfo->streamInfo.pState, pInfo->pState); pAPI->stateStore.streamStateSetNumber(pInfo->pState, -1); int32_t code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str, @@ -3341,7 +3343,8 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl SColumnInfoData* pKeyColInfo = taosArrayGet(pSDataBlock->pDataBlock, pInfo->stateCol.slotId); for (int32_t i = 0; i < rows; i += winRows) { if (pInfo->ignoreExpiredData && checkExpiredData(&pInfo->streamAggSup.stateStore, pInfo->streamAggSup.pUpdateInfo, - &pInfo->twAggSup, pSDataBlock->info.id.uid, tsCols[i]) || colDataIsNull_s(pKeyColInfo, i)) { + &pInfo->twAggSup, pSDataBlock->info.id.uid, tsCols[i]) || + colDataIsNull_s(pKeyColInfo, i)) { i++; continue; } diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index 276ed08785..29f6600e16 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -91,8 +91,8 @@ int stateKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, int kLen2) { } SStreamState* streamStateOpen(char* path, void* pTask, bool specPath, int32_t szPage, int32_t pages) { - stDebug("open stream state, %s", path); SStreamState* pState = taosMemoryCalloc(1, sizeof(SStreamState)); + stDebug("open stream state %p, %s", pState, path); if (pState == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; @@ -211,7 +211,7 @@ int32_t streamStateDelTaskDb(SStreamState* pState) { SStreamTask* pTask = pState->pTdbState->pOwner; taskDbRemoveRef(pTask->pBackend); taosMemoryFree(pTask); - return 0; + return 0; } void streamStateClose(SStreamState* pState, bool remove) { SStreamTask* pTask = pState->pTdbState->pOwner; @@ -374,8 +374,8 @@ int32_t streamStateClear(SStreamState* pState) { streamStatePut(pState, &key, NULL, 0); while (1) { SStreamStateCur* pCur = streamStateSeekKeyNext(pState, &key); - SWinKey delKey = {0}; - int32_t code = streamStateGetKVByCur(pCur, &delKey, NULL, 0); + SWinKey delKey = {0}; + int32_t code = streamStateGetKVByCur(pCur, &delKey, NULL, 0); streamStateFreeCur(pCur); if (code == 0) { streamStateDel(pState, &delKey); @@ -493,7 +493,7 @@ int32_t streamStateGetKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const void** return -1; } const SStateKey* pKTmp = NULL; - int32_t kLen; + int32_t kLen; if (tdbTbcGet(pCur->pCur, (const void**)&pKTmp, &kLen, pVal, pVLen) < 0) { return -1; } @@ -513,7 +513,7 @@ int32_t streamStateFillGetKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const vo return -1; } const SWinKey* pKTmp = NULL; - int32_t kLen; + int32_t kLen; if (tdbTbcGet(pCur->pCur, (const void**)&pKTmp, &kLen, pVal, pVLen) < 0) { return -1; } @@ -530,7 +530,7 @@ int32_t streamStateGetGroupKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const v return -1; } uint64_t groupId = pKey->groupId; - int32_t code = streamStateFillGetKVByCur(pCur, pKey, pVal, pVLen); + int32_t code = streamStateFillGetKVByCur(pCur, pKey, pVal, pVLen); if (code == 0) { if (pKey->groupId == groupId) { return 0; @@ -555,7 +555,7 @@ SStreamStateCur* streamStateSeekKeyNext(SStreamState* pState, const SWinKey* key } SStateKey sKey = {.key = *key, .opNum = pState->number}; - int32_t c = 0; + int32_t c = 0; if (tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateKey), &c) < 0) { streamStateFreeCur(pCur); return NULL; @@ -710,7 +710,8 @@ int32_t streamStateSessionPut(SStreamState* pState, const SSessionKey* key, void #endif } -int32_t streamStateSessionAllocWinBuffByNextPosition(SStreamState* pState, SStreamStateCur* pCur, const SSessionKey* pKey, void** pVal, int32_t* pVLen) { +int32_t streamStateSessionAllocWinBuffByNextPosition(SStreamState* pState, SStreamStateCur* pCur, + const SSessionKey* pKey, void** pVal, int32_t* pVLen) { #ifdef USE_ROCKSDB return allocSessioncWinBuffByNextPosition(pState->pFileState, pCur, pKey, pVal, pVLen); #else @@ -724,9 +725,9 @@ int32_t streamStateSessionGet(SStreamState* pState, SSessionKey* key, void** pVa #else SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentNext(pState, key); - SSessionKey resKey = *key; - void* tmp = NULL; - int32_t code = streamStateSessionGetKVByCur(pCur, &resKey, &tmp, pVLen); + SSessionKey resKey = *key; + void* tmp = NULL; + int32_t code = streamStateSessionGetKVByCur(pCur, &resKey, &tmp, pVLen); if (code == 0) { if (key->win.skey != resKey.win.skey) { code = -1; @@ -767,7 +768,7 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentPrev(SStreamState* pState, cons } SStateSessionKey sKey = {.key = *key, .opNum = pState->number}; - int32_t c = 0; + int32_t c = 0; if (tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateSessionKey), &c) < 0) { streamStateFreeCur(pCur); return NULL; @@ -798,7 +799,7 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentNext(SStreamState* pState, cons } SStateSessionKey sKey = {.key = *key, .opNum = pState->number}; - int32_t c = 0; + int32_t c = 0; if (tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateSessionKey), &c) < 0) { streamStateFreeCur(pCur); return NULL; @@ -830,7 +831,7 @@ SStreamStateCur* streamStateSessionSeekKeyNext(SStreamState* pState, const SSess } SStateSessionKey sKey = {.key = *key, .opNum = pState->number}; - int32_t c = 0; + int32_t c = 0; if (tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateSessionKey), &c) < 0) { streamStateFreeCur(pCur); return NULL; @@ -854,7 +855,7 @@ int32_t streamStateSessionGetKVByCur(SStreamStateCur* pCur, SSessionKey* pKey, v return -1; } SStateSessionKey* pKTmp = NULL; - int32_t kLen; + int32_t kLen; if (tdbTbcGet(pCur->pCur, (const void**)&pKTmp, &kLen, (const void**)pVal, pVLen) < 0) { return -1; } @@ -874,13 +875,13 @@ int32_t streamStateSessionClear(SStreamState* pState) { sessionWinStateClear(pState->pFileState); return streamStateSessionClear_rocksdb(pState); #else - SSessionKey key = {.win.skey = 0, .win.ekey = 0, .groupId = 0}; + SSessionKey key = {.win.skey = 0, .win.ekey = 0, .groupId = 0}; SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentNext(pState, &key); while (1) { SSessionKey delKey = {0}; - void* buf = NULL; - int32_t size = 0; - int32_t code = streamStateSessionGetKVByCur(pCur, &delKey, &buf, &size); + void* buf = NULL; + int32_t size = 0; + int32_t code = streamStateSessionGetKVByCur(pCur, &delKey, &buf, &size); if (code == 0 && size > 0) { memset(buf, 0, size); streamStateSessionPut(pState, &delKey, buf, size); @@ -909,14 +910,14 @@ int32_t streamStateSessionGetKeyByRange(SStreamState* pState, const SSessionKey* } SStateSessionKey sKey = {.key = *key, .opNum = pState->number}; - int32_t c = 0; + int32_t c = 0; if (tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateSessionKey), &c) < 0) { streamStateFreeCur(pCur); return -1; } SSessionKey resKey = *key; - int32_t code = streamStateSessionGetKVByCur(pCur, &resKey, NULL, 0); + int32_t code = streamStateSessionGetKVByCur(pCur, &resKey, NULL, 0); if (code == 0 && sessionRangeKeyCmpr(key, &resKey) == 0) { *curKey = resKey; streamStateFreeCur(pCur); @@ -952,19 +953,19 @@ int32_t streamStateSessionAddIfNotExist(SStreamState* pState, SSessionKey* key, return getSessionWinResultBuff(pState->pFileState, key, gap, pVal, pVLen); #else // todo refactor - int32_t res = 0; + int32_t res = 0; SSessionKey originKey = *key; SSessionKey searchKey = *key; searchKey.win.skey = key->win.skey - gap; searchKey.win.ekey = key->win.ekey + gap; int32_t valSize = *pVLen; - void* tmp = tdbRealloc(NULL, valSize); + void* tmp = tdbRealloc(NULL, valSize); if (!tmp) { return -1; } SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentPrev(pState, key); - int32_t code = streamStateSessionGetKVByCur(pCur, key, pVal, pVLen); + int32_t code = streamStateSessionGetKVByCur(pCur, key, pVal, pVLen); if (code == 0) { if (sessionRangeKeyCmpr(&searchKey, key) == 0) { memcpy(tmp, *pVal, valSize); @@ -1007,16 +1008,16 @@ int32_t streamStateStateAddIfNotExist(SStreamState* pState, SSessionKey* key, ch #ifdef USE_ROCKSDB return getStateWinResultBuff(pState->pFileState, key, pKeyData, keyDataLen, fn, pVal, pVLen); #else - int32_t res = 0; + int32_t res = 0; SSessionKey tmpKey = *key; - int32_t valSize = *pVLen; - void* tmp = tdbRealloc(NULL, valSize); + int32_t valSize = *pVLen; + void* tmp = tdbRealloc(NULL, valSize); if (!tmp) { return -1; } SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentPrev(pState, key); - int32_t code = streamStateSessionGetKVByCur(pCur, key, pVal, pVLen); + int32_t code = streamStateSessionGetKVByCur(pCur, key, pVal, pVLen); if (code == 0) { if (key->win.skey <= tmpKey.win.skey && tmpKey.win.ekey <= key->win.ekey) { memcpy(tmp, *pVal, valSize); @@ -1115,6 +1116,10 @@ int32_t streamStateDeleteCheckPoint(SStreamState* pState, TSKEY mark) { void streamStateReloadInfo(SStreamState* pState, TSKEY ts) { streamFileStateReloadInfo(pState->pFileState, ts); } +void streamStateCopyBackend(SStreamState* src, SStreamState* dst) { + dst->pTdbState->pOwner = src->pTdbState->pOwner; + return; +} SStreamStateCur* createStreamStateCursor() { SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); pCur->buffIndex = -1;