commit
e9b7810d40
|
@ -163,6 +163,7 @@ typedef struct {
|
||||||
int64_t checkPointId;
|
int64_t checkPointId;
|
||||||
int32_t taskId;
|
int32_t taskId;
|
||||||
int64_t streamId;
|
int64_t streamId;
|
||||||
|
int64_t streamBackendRid;
|
||||||
} SStreamState;
|
} SStreamState;
|
||||||
|
|
||||||
typedef struct SFunctionStateStore {
|
typedef struct SFunctionStateStore {
|
||||||
|
|
|
@ -344,7 +344,6 @@ typedef struct SStreamMeta {
|
||||||
SRWLatch lock;
|
SRWLatch lock;
|
||||||
int32_t walScanCounter;
|
int32_t walScanCounter;
|
||||||
void* streamBackend;
|
void* streamBackend;
|
||||||
int32_t streamBackendId;
|
|
||||||
int64_t streamBackendRid;
|
int64_t streamBackendRid;
|
||||||
SHashObj* pTaskBackendUnique;
|
SHashObj* pTaskBackendUnique;
|
||||||
} SStreamMeta;
|
} SStreamMeta;
|
||||||
|
|
|
@ -22,21 +22,20 @@ extern "C" {
|
||||||
|
|
||||||
// If the error is in a third-party library, place this header file under the third-party library header file.
|
// If the error is in a third-party library, place this header file under the third-party library header file.
|
||||||
// When you want to use this feature, you should find or add the same function in the following sectio
|
// When you want to use this feature, you should find or add the same function in the following sectio
|
||||||
// #if !defined(WINDOWS)
|
#if !defined(WINDOWS)
|
||||||
|
|
||||||
// #ifndef ALLOW_FORBID_FUNC
|
#ifndef ALLOW_FORBID_FUNC
|
||||||
// #define malloc MALLOC_FUNC_TAOS_FORBID
|
#define malloc MALLOC_FUNC_TAOS_FORBID
|
||||||
// #define calloc CALLOC_FUNC_TAOS_FORBID
|
#define calloc CALLOC_FUNC_TAOS_FORBID
|
||||||
// #define realloc REALLOC_FUNC_TAOS_FORBID
|
#define realloc REALLOC_FUNC_TAOS_FORBID
|
||||||
// #define free FREE_FUNC_TAOS_FORBID
|
#define free FREE_FUNC_TAOS_FORBID
|
||||||
// #ifdef strdup
|
#ifdef strdup
|
||||||
// #undef strdup
|
#undef strdup
|
||||||
// #define strdup STRDUP_FUNC_TAOS_FORBID
|
#define strdup STRDUP_FUNC_TAOS_FORBID
|
||||||
// #endif
|
#endif
|
||||||
// #endif // ifndef ALLOW_FORBID_FUNC
|
#endif // ifndef ALLOW_FORBID_FUNC
|
||||||
// #endif // if !defined(WINDOWS)
|
#endif // if !defined(WINDOWS)
|
||||||
|
|
||||||
// // #define taosMemoryFree malloc
|
|
||||||
// #define taosMemoryMalloc malloc
|
// #define taosMemoryMalloc malloc
|
||||||
// #define taosMemoryCalloc calloc
|
// #define taosMemoryCalloc calloc
|
||||||
// #define taosMemoryRealloc realloc
|
// #define taosMemoryRealloc realloc
|
||||||
|
|
|
@ -36,8 +36,9 @@ static SStreamGlobalEnv streamEnv;
|
||||||
int32_t streamDispatchStreamBlock(SStreamTask* pTask);
|
int32_t streamDispatchStreamBlock(SStreamTask* pTask);
|
||||||
|
|
||||||
SStreamDataBlock* createStreamDataFromDispatchMsg(const SStreamDispatchReq* pReq, int32_t blockType, int32_t srcVg);
|
SStreamDataBlock* createStreamDataFromDispatchMsg(const SStreamDispatchReq* pReq, int32_t blockType, int32_t srcVg);
|
||||||
SStreamDataBlock* createStreamBlockFromResults(SStreamQueueItem* pItem, SStreamTask* pTask, int64_t resultSize, SArray* pRes);
|
SStreamDataBlock* createStreamBlockFromResults(SStreamQueueItem* pItem, SStreamTask* pTask, int64_t resultSize,
|
||||||
void destroyStreamDataBlock(SStreamDataBlock* pBlock);
|
SArray* pRes);
|
||||||
|
void destroyStreamDataBlock(SStreamDataBlock* pBlock);
|
||||||
|
|
||||||
int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock* pData);
|
int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock* pData);
|
||||||
int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* data);
|
int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* data);
|
||||||
|
@ -53,6 +54,8 @@ int32_t streamDispatchOneRecoverFinishReq(SStreamTask* pTask, const SStreamRecov
|
||||||
|
|
||||||
SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem);
|
SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem);
|
||||||
|
|
||||||
|
extern int32_t streamBackendId;
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -16,7 +16,9 @@
|
||||||
#include "streamBackendRocksdb.h"
|
#include "streamBackendRocksdb.h"
|
||||||
#include "executor.h"
|
#include "executor.h"
|
||||||
#include "query.h"
|
#include "query.h"
|
||||||
|
#include "streamInc.h"
|
||||||
#include "tcommon.h"
|
#include "tcommon.h"
|
||||||
|
#include "tref.h"
|
||||||
|
|
||||||
typedef struct SCompactFilteFactory {
|
typedef struct SCompactFilteFactory {
|
||||||
void* status;
|
void* status;
|
||||||
|
@ -79,8 +81,8 @@ const char* compareParKeyName(void* name);
|
||||||
const char* comparePartagKeyName(void* name);
|
const char* comparePartagKeyName(void* name);
|
||||||
|
|
||||||
void* streamBackendInit(const char* path) {
|
void* streamBackendInit(const char* path) {
|
||||||
qDebug("init stream backend");
|
qDebug("start to init stream backend at %s", path);
|
||||||
SBackendHandle* pHandle = calloc(1, sizeof(SBackendHandle));
|
SBackendHandle* pHandle = taosMemoryCalloc(1, sizeof(SBackendHandle));
|
||||||
pHandle->list = tdListNew(sizeof(SCfComparator));
|
pHandle->list = tdListNew(sizeof(SCfComparator));
|
||||||
taosThreadMutexInit(&pHandle->mutex, NULL);
|
taosThreadMutexInit(&pHandle->mutex, NULL);
|
||||||
taosThreadMutexInit(&pHandle->cfMutex, NULL);
|
taosThreadMutexInit(&pHandle->cfMutex, NULL);
|
||||||
|
@ -119,6 +121,7 @@ void* streamBackendInit(const char* path) {
|
||||||
if (err != NULL) {
|
if (err != NULL) {
|
||||||
qError("failed to open rocksdb, path:%s, reason:%s", path, err);
|
qError("failed to open rocksdb, path:%s, reason:%s", path, err);
|
||||||
taosMemoryFreeClear(err);
|
taosMemoryFreeClear(err);
|
||||||
|
goto _EXIT;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
/*
|
/*
|
||||||
|
@ -129,6 +132,7 @@ void* streamBackendInit(const char* path) {
|
||||||
if (cfs != NULL) {
|
if (cfs != NULL) {
|
||||||
rocksdb_list_column_families_destroy(cfs, nCf);
|
rocksdb_list_column_families_destroy(cfs, nCf);
|
||||||
}
|
}
|
||||||
|
qDebug("succ to init stream backend at %s, backend:%p", path, pHandle);
|
||||||
|
|
||||||
return (void*)pHandle;
|
return (void*)pHandle;
|
||||||
_EXIT:
|
_EXIT:
|
||||||
|
@ -140,7 +144,8 @@ _EXIT:
|
||||||
taosHashCleanup(pHandle->cfInst);
|
taosHashCleanup(pHandle->cfInst);
|
||||||
rocksdb_compactionfilterfactory_destroy(pHandle->filterFactory);
|
rocksdb_compactionfilterfactory_destroy(pHandle->filterFactory);
|
||||||
tdListFree(pHandle->list);
|
tdListFree(pHandle->list);
|
||||||
free(pHandle);
|
taosMemoryFree(pHandle);
|
||||||
|
qDebug("failed to init stream backend at %s", path);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
void streamBackendCleanup(void* arg) {
|
void streamBackendCleanup(void* arg) {
|
||||||
|
@ -168,19 +173,20 @@ void streamBackendCleanup(void* arg) {
|
||||||
rocksdb_env_destroy(pHandle->env);
|
rocksdb_env_destroy(pHandle->env);
|
||||||
rocksdb_cache_destroy(pHandle->cache);
|
rocksdb_cache_destroy(pHandle->cache);
|
||||||
|
|
||||||
taosThreadMutexDestroy(&pHandle->mutex);
|
|
||||||
SListNode* head = tdListPopHead(pHandle->list);
|
SListNode* head = tdListPopHead(pHandle->list);
|
||||||
while (head != NULL) {
|
while (head != NULL) {
|
||||||
streamStateDestroyCompar(head->data);
|
streamStateDestroyCompar(head->data);
|
||||||
taosMemoryFree(head);
|
taosMemoryFree(head);
|
||||||
head = tdListPopHead(pHandle->list);
|
head = tdListPopHead(pHandle->list);
|
||||||
}
|
}
|
||||||
// rocksdb_compactionfilterfactory_destroy(pHandle->filterFactory);
|
|
||||||
tdListFree(pHandle->list);
|
tdListFree(pHandle->list);
|
||||||
|
taosThreadMutexDestroy(&pHandle->mutex);
|
||||||
|
|
||||||
taosThreadMutexDestroy(&pHandle->cfMutex);
|
taosThreadMutexDestroy(&pHandle->cfMutex);
|
||||||
|
|
||||||
taosMemoryFree(pHandle);
|
taosMemoryFree(pHandle);
|
||||||
|
qDebug("destroy stream backend backend:%p", pHandle);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
SListNode* streamBackendAddCompare(void* backend, void* arg) {
|
SListNode* streamBackendAddCompare(void* backend, void* arg) {
|
||||||
|
@ -803,7 +809,8 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
int streamStateOpenBackend(void* backend, SStreamState* pState) {
|
int streamStateOpenBackend(void* backend, SStreamState* pState) {
|
||||||
qInfo("start to open backend, %p 0x%" PRIx64 "-%d", pState, pState->streamId, pState->taskId);
|
qInfo("start to open state %p on backend %p 0x%" PRIx64 "-%d", pState, backend, pState->streamId, pState->taskId);
|
||||||
|
taosAcquireRef(streamBackendId, pState->streamBackendRid);
|
||||||
SBackendHandle* handle = backend;
|
SBackendHandle* handle = backend;
|
||||||
|
|
||||||
sprintf(pState->pTdbState->idstr, "0x%" PRIx64 "-%d", pState->streamId, pState->taskId);
|
sprintf(pState->pTdbState->idstr, "0x%" PRIx64 "-%d", pState->streamId, pState->taskId);
|
||||||
|
@ -866,7 +873,7 @@ int streamStateOpenBackend(void* backend, SStreamState* pState) {
|
||||||
SCfComparator compare = {.comp = pCompare, .numOfComp = cfLen};
|
SCfComparator compare = {.comp = pCompare, .numOfComp = cfLen};
|
||||||
pState->pTdbState->pComparNode = streamBackendAddCompare(handle, &compare);
|
pState->pTdbState->pComparNode = streamBackendAddCompare(handle, &compare);
|
||||||
// rocksdb_writeoptions_disable_WAL(pState->pTdbState->writeOpts, 1);
|
// rocksdb_writeoptions_disable_WAL(pState->pTdbState->writeOpts, 1);
|
||||||
qInfo("succ to open backend, %p, 0x%" PRIx64 "-%d", pState, pState->streamId, pState->taskId);
|
qInfo("succ to open state %p on backend, %p, 0x%" PRIx64 "-%d", pState, handle, pState->streamId, pState->taskId);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -882,8 +889,8 @@ void streamStateCloseBackend(SStreamState* pState, bool remove) {
|
||||||
taosThreadMutexUnlock(&pHandle->cfMutex);
|
taosThreadMutexUnlock(&pHandle->cfMutex);
|
||||||
|
|
||||||
char* status[] = {"close", "drop"};
|
char* status[] = {"close", "drop"};
|
||||||
qInfo("start to %s backend, %p, 0x%" PRIx64 "-%d", status[remove == false ? 0 : 1], pState, pState->streamId,
|
qInfo("start to close %s state %p on backend %p 0x%" PRIx64 "-%d", status[remove == false ? 0 : 1], pState, pHandle,
|
||||||
pState->taskId);
|
pState->streamId, pState->taskId);
|
||||||
if (pState->pTdbState->rocksdb == NULL) {
|
if (pState->pTdbState->rocksdb == NULL) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -938,6 +945,7 @@ void streamStateCloseBackend(SStreamState* pState, bool remove) {
|
||||||
|
|
||||||
taosThreadRwlockDestroy(&pState->pTdbState->rwLock);
|
taosThreadRwlockDestroy(&pState->pTdbState->rwLock);
|
||||||
pState->pTdbState->rocksdb = NULL;
|
pState->pTdbState->rocksdb = NULL;
|
||||||
|
taosReleaseRef(streamBackendId, pState->streamBackendRid);
|
||||||
}
|
}
|
||||||
void streamStateDestroyCompar(void* arg) {
|
void streamStateDestroyCompar(void* arg) {
|
||||||
SCfComparator* comp = (SCfComparator*)arg;
|
SCfComparator* comp = (SCfComparator*)arg;
|
||||||
|
|
|
@ -20,7 +20,7 @@
|
||||||
#include "ttimer.h"
|
#include "ttimer.h"
|
||||||
|
|
||||||
static TdThreadOnce streamMetaModuleInit = PTHREAD_ONCE_INIT;
|
static TdThreadOnce streamMetaModuleInit = PTHREAD_ONCE_INIT;
|
||||||
static int32_t streamBackendId = 0;
|
int32_t streamBackendId = 0;
|
||||||
static void streamMetaEnvInit() { streamBackendId = taosOpenRef(20, streamBackendCleanup); }
|
static void streamMetaEnvInit() { streamBackendId = taosOpenRef(20, streamBackendCleanup); }
|
||||||
|
|
||||||
void streamMetaInit() { taosThreadOnce(&streamMetaModuleInit, streamMetaEnvInit); }
|
void streamMetaInit() { taosThreadOnce(&streamMetaModuleInit, streamMetaEnvInit); }
|
||||||
|
@ -79,7 +79,6 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
|
||||||
pMeta->vgId = vgId;
|
pMeta->vgId = vgId;
|
||||||
pMeta->ahandle = ahandle;
|
pMeta->ahandle = ahandle;
|
||||||
pMeta->expandFunc = expandFunc;
|
pMeta->expandFunc = expandFunc;
|
||||||
pMeta->streamBackendId = streamBackendId;
|
|
||||||
|
|
||||||
memset(streamPath, 0, len);
|
memset(streamPath, 0, len);
|
||||||
sprintf(streamPath, "%s/%s", pMeta->path, "state");
|
sprintf(streamPath, "%s/%s", pMeta->path, "state");
|
||||||
|
@ -90,6 +89,9 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
|
||||||
}
|
}
|
||||||
|
|
||||||
pMeta->streamBackend = streamBackendInit(streamPath);
|
pMeta->streamBackend = streamBackendInit(streamPath);
|
||||||
|
if (pMeta->streamBackend == NULL) {
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
pMeta->streamBackendRid = taosAddRef(streamBackendId, pMeta->streamBackend);
|
pMeta->streamBackendRid = taosAddRef(streamBackendId, pMeta->streamBackend);
|
||||||
|
|
||||||
taosMemoryFree(streamPath);
|
taosMemoryFree(streamPath);
|
||||||
|
|
|
@ -106,7 +106,7 @@ SStreamState* streamStateOpen(char* path, void* pTask, bool specPath, int32_t sz
|
||||||
}
|
}
|
||||||
|
|
||||||
SStreamTask* pStreamTask = pTask;
|
SStreamTask* pStreamTask = pTask;
|
||||||
char statePath[1024];
|
char statePath[1024];
|
||||||
if (!specPath) {
|
if (!specPath) {
|
||||||
sprintf(statePath, "%s/%d", path, pStreamTask->id.taskId);
|
sprintf(statePath, "%s/%d", path, pStreamTask->id.taskId);
|
||||||
} else {
|
} else {
|
||||||
|
@ -119,10 +119,10 @@ SStreamState* streamStateOpen(char* path, void* pTask, bool specPath, int32_t sz
|
||||||
|
|
||||||
#ifdef USE_ROCKSDB
|
#ifdef USE_ROCKSDB
|
||||||
SStreamMeta* pMeta = pStreamTask->pMeta;
|
SStreamMeta* pMeta = pStreamTask->pMeta;
|
||||||
taosAcquireRef(pMeta->streamBackendId, pMeta->streamBackendRid);
|
pState->streamBackendRid = pMeta->streamBackendRid;
|
||||||
int code = streamStateOpenBackend(pMeta->streamBackend, pState);
|
int code = streamStateOpenBackend(pMeta->streamBackend, pState);
|
||||||
if (code == -1) {
|
if (code == -1) {
|
||||||
taosReleaseRef(pMeta->streamBackendId, pMeta->streamBackendRid);
|
taosReleaseRef(streamBackendId, pMeta->streamBackendRid);
|
||||||
taosMemoryFree(pState);
|
taosMemoryFree(pState);
|
||||||
pState = NULL;
|
pState = NULL;
|
||||||
}
|
}
|
||||||
|
@ -222,9 +222,7 @@ _err:
|
||||||
void streamStateClose(SStreamState* pState, bool remove) {
|
void streamStateClose(SStreamState* pState, bool remove) {
|
||||||
SStreamTask* pTask = pState->pTdbState->pOwner;
|
SStreamTask* pTask = pState->pTdbState->pOwner;
|
||||||
#ifdef USE_ROCKSDB
|
#ifdef USE_ROCKSDB
|
||||||
// streamStateCloseBackend(pState);
|
|
||||||
streamStateDestroy(pState, remove);
|
streamStateDestroy(pState, remove);
|
||||||
taosReleaseRef(pTask->pMeta->streamBackendId, pTask->pMeta->streamBackendRid);
|
|
||||||
#else
|
#else
|
||||||
tdbCommit(pState->pTdbState->db, pState->pTdbState->txn);
|
tdbCommit(pState->pTdbState->db, pState->pTdbState->txn);
|
||||||
tdbPostCommit(pState->pTdbState->db, pState->pTdbState->txn);
|
tdbPostCommit(pState->pTdbState->db, pState->pTdbState->txn);
|
||||||
|
@ -278,10 +276,10 @@ int32_t streamStateCommit(SStreamState* pState) {
|
||||||
|
|
||||||
int32_t streamStateFuncPut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) {
|
int32_t streamStateFuncPut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) {
|
||||||
#ifdef USE_ROCKSDB
|
#ifdef USE_ROCKSDB
|
||||||
void* pVal = NULL;
|
void* pVal = NULL;
|
||||||
int32_t len = 0;
|
int32_t len = 0;
|
||||||
int32_t code = getRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey), &pVal, &len);
|
int32_t code = getRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey), &pVal, &len);
|
||||||
char* buf = ((SRowBuffPos*)pVal)->pRowBuff;
|
char* buf = ((SRowBuffPos*)pVal)->pRowBuff;
|
||||||
uint32_t rowSize = streamFileStateGeSelectRowSize(pState->pFileState);
|
uint32_t rowSize = streamFileStateGeSelectRowSize(pState->pFileState);
|
||||||
memcpy(buf + len - rowSize, value, vLen);
|
memcpy(buf + len - rowSize, value, vLen);
|
||||||
return code;
|
return code;
|
||||||
|
@ -291,10 +289,10 @@ int32_t streamStateFuncPut(SStreamState* pState, const SWinKey* key, const void*
|
||||||
}
|
}
|
||||||
int32_t streamStateFuncGet(SStreamState* pState, const SWinKey* key, void** ppVal, int32_t* pVLen) {
|
int32_t streamStateFuncGet(SStreamState* pState, const SWinKey* key, void** ppVal, int32_t* pVLen) {
|
||||||
#ifdef USE_ROCKSDB
|
#ifdef USE_ROCKSDB
|
||||||
void* pVal = NULL;
|
void* pVal = NULL;
|
||||||
int32_t len = 0;
|
int32_t len = 0;
|
||||||
int32_t code = getRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey), (void**)(&pVal), &len);
|
int32_t code = getRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey), (void**)(&pVal), &len);
|
||||||
char* buf = ((SRowBuffPos*)pVal)->pRowBuff;
|
char* buf = ((SRowBuffPos*)pVal)->pRowBuff;
|
||||||
uint32_t rowSize = streamFileStateGeSelectRowSize(pState->pFileState);
|
uint32_t rowSize = streamFileStateGeSelectRowSize(pState->pFileState);
|
||||||
*ppVal = buf + len - rowSize;
|
*ppVal = buf + len - rowSize;
|
||||||
return code;
|
return code;
|
||||||
|
|
|
@ -419,7 +419,7 @@ int32_t deleteExpiredCheckPoint(SStreamFileState* pFileState, TSKEY mark) {
|
||||||
if (code != 0 || len == 0 || val == NULL) {
|
if (code != 0 || len == 0 || val == NULL) {
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
memcpy(val, buf, len);
|
memcpy(buf, val, len);
|
||||||
buf[len] = 0;
|
buf[len] = 0;
|
||||||
maxCheckPointId = atol((char*)buf);
|
maxCheckPointId = atol((char*)buf);
|
||||||
taosMemoryFree(val);
|
taosMemoryFree(val);
|
||||||
|
@ -433,7 +433,7 @@ int32_t deleteExpiredCheckPoint(SStreamFileState* pFileState, TSKEY mark) {
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
memcpy(val, buf, len);
|
memcpy(buf, val, len);
|
||||||
buf[len] = 0;
|
buf[len] = 0;
|
||||||
taosMemoryFree(val);
|
taosMemoryFree(val);
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue