diff --git a/cmake/cmake.define b/cmake/cmake.define
index 1500858d9f..f55a9bdabc 100644
--- a/cmake/cmake.define
+++ b/cmake/cmake.define
@@ -1,6 +1,6 @@
cmake_minimum_required(VERSION 3.0)
-set(CMAKE_VERBOSE_MAKEFILE ON)
+set(CMAKE_VERBOSE_MAKEFILE OFF)
set(TD_BUILD_TAOSA_INTERNAL FALSE)
#set output directory
diff --git a/include/libs/stream/streamState.h b/include/libs/stream/streamState.h
index 63e9e3799a..5b125b42d4 100644
--- a/include/libs/stream/streamState.h
+++ b/include/libs/stream/streamState.h
@@ -20,13 +20,13 @@
#include "tsimplehash.h"
#include "tstreamFileState.h"
+#ifndef _STREAM_STATE_H_
+#define _STREAM_STATE_H_
+
#ifdef __cplusplus
extern "C" {
#endif
-#ifndef _STREAM_STATE_H_
-#define _STREAM_STATE_H_
-
// void* streamBackendInit(const char* path);
// void streamBackendCleanup(void* arg);
// SListNode* streamBackendAddCompare(void* backend, void* arg);
diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h
index 865977d62b..c7e55650cd 100644
--- a/include/libs/stream/tstream.h
+++ b/include/libs/stream/tstream.h
@@ -13,16 +13,13 @@
* along with this program. If not, see .
*/
-#include "executor.h"
#include "os.h"
-#include "query.h"
#include "streamState.h"
#include "tdatablock.h"
#include "tdbInt.h"
#include "tmsg.h"
#include "tmsgcb.h"
#include "tqueue.h"
-#include "trpc.h"
#ifdef __cplusplus
extern "C" {
@@ -340,7 +337,7 @@ typedef struct SStreamMeta {
TTB* pTaskDb;
TTB* pCheckpointDb;
SHashObj* pTasks;
- SArray* pTaskList; // SArray
+ SArray* pTaskList; // SArray
void* ahandle;
TXN* txn;
FTaskExpand* expandFunc;
@@ -568,6 +565,8 @@ int32_t streamAggRecoverPrepare(SStreamTask* pTask);
// int32_t streamAggChildrenRecoverFinish(SStreamTask* pTask);
int32_t streamProcessRecoverFinishReq(SStreamTask* pTask, int32_t childId);
+void streamMetaInit();
+void streamMetaCleanup();
SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId);
void streamMetaClose(SStreamMeta* streamMeta);
diff --git a/source/dnode/mgmt/node_mgmt/src/dmMgmt.c b/source/dnode/mgmt/node_mgmt/src/dmMgmt.c
index d884120147..544512233e 100644
--- a/source/dnode/mgmt/node_mgmt/src/dmMgmt.c
+++ b/source/dnode/mgmt/node_mgmt/src/dmMgmt.c
@@ -18,6 +18,7 @@
#include "dmNodes.h"
#include "index.h"
#include "qworker.h"
+#include "tstream.h"
static bool dmRequireNode(SDnode *pDnode, SMgmtWrapper *pWrapper) {
SMgmtInputOpt input = dmBuildMgmtInputOpt(pWrapper);
@@ -153,6 +154,7 @@ int32_t dmInitDnode(SDnode *pDnode) {
}
indexInit(tsNumOfCommitThreads);
+ streamMetaInit();
dmReportStartup("dnode-transport", "initialized");
dDebug("dnode is created, ptr:%p", pDnode);
@@ -175,6 +177,7 @@ void dmCleanupDnode(SDnode *pDnode) {
dmCleanupServer(pDnode);
dmClearVars(pDnode);
rpcCleanup();
+ streamMetaCleanup();
indexCleanup();
taosConvDestroy();
dDebug("dnode is closed, ptr:%p", pDnode);
diff --git a/source/libs/stream/inc/streamBackendRocksdb.h b/source/libs/stream/inc/streamBackendRocksdb.h
index 5d2970a4b7..0f39cf817b 100644
--- a/source/libs/stream/inc/streamBackendRocksdb.h
+++ b/source/libs/stream/inc/streamBackendRocksdb.h
@@ -16,8 +16,6 @@
#ifndef _STREAM_BACKEDN_ROCKSDB_H_
#define _STREAM_BACKEDN_ROCKSDB_H_
-#include "executor.h"
-
#include "rocksdb/c.h"
// #include "streamInc.h"
#include "streamState.h"
@@ -112,14 +110,6 @@ int32_t streamStateGetParName_rocksdb(SStreamState* pState, int64_t groupId, voi
void streamStateDestroy_rocksdb(SStreamState* pState, bool remove);
-void* streamStateCreateBatch();
-int32_t streamStateGetBatchSize(void* pBatch);
-void streamStateClearBatch(void* pBatch);
-void streamStateDestroyBatch(void* pBatch);
-int32_t streamStatePutBatch(SStreamState* pState, const char* cfName, rocksdb_writebatch_t* pBatch, void* key,
- void* val, int32_t vlen);
-int32_t streamStatePutBatch_rocksdb(SStreamState* pState, void* pBatch);
-
// default cf
int32_t streamDefaultPut_rocksdb(SStreamState* pState, const void* key, void* pVal, int32_t pVLen);
int32_t streamDefaultGet_rocksdb(SStreamState* pState, const void* key, void** pVal, int32_t* pVLen);
@@ -138,7 +128,7 @@ int32_t streamStateGetBatchSize(void* pBatch);
void streamStateClearBatch(void* pBatch);
void streamStateDestroyBatch(void* pBatch);
int32_t streamStatePutBatch(SStreamState* pState, const char* cfName, rocksdb_writebatch_t* pBatch, void* key,
- void* val, int32_t vlen);
+ void* val, int32_t vlen, int64_t ttl);
int32_t streamStatePutBatch_rocksdb(SStreamState* pState, void* pBatch);
// int32_t streamDefaultIter_rocksdb(SStreamState* pState, const void* start, const void* end, SArray* result);
#endif
\ No newline at end of file
diff --git a/source/libs/stream/inc/streamInc.h b/source/libs/stream/inc/streamInc.h
index 71fbe5e086..c471bc2bd8 100644
--- a/source/libs/stream/inc/streamInc.h
+++ b/source/libs/stream/inc/streamInc.h
@@ -16,9 +16,12 @@
#ifndef _STREAM_INC_H_
#define _STREAM_INC_H_
-//#include "executor.h"
+#include "executor.h"
+#include "query.h"
#include "tstream.h"
+#include "trpc.h"
+
#ifdef __cplusplus
extern "C" {
#endif
diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c
index db4ec17b19..16ba81c74a 100644
--- a/source/libs/stream/src/streamBackendRocksdb.c
+++ b/source/libs/stream/src/streamBackendRocksdb.c
@@ -13,8 +13,9 @@
* along with this program. If not, see .
*/
-// #include "streamStateRocksdb.h"
#include "streamBackendRocksdb.h"
+#include "executor.h"
+#include "query.h"
#include "tcommon.h"
typedef struct SCompactFilteFactory {
@@ -110,6 +111,9 @@ void* streamBackendInit(const char* path) {
taosMemoryFreeClear(err);
}
} else {
+ /*
+ list all cf and get prefix
+ */
int64_t streamId;
int32_t taskId, dummpy = 0;
SHashObj* tbl = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
@@ -649,18 +653,7 @@ const char* compactFilteFactoryName(void* arg) {
void destroyCompactFilte(void* arg) { (void)arg; }
unsigned char compactFilte(void* arg, int level, const char* key, size_t klen, const char* val, size_t vlen,
char** newval, size_t* newvlen, unsigned char* value_changed) {
- // int64_t unixTime = taosGetTimestampMs();
- if (streamStateValueIsStale((char*)val)) {
- return 1;
- }
- // SStreamValue value;
- // memset(&value, 0, sizeof(value));
- // streamValueDecode(&value, (char*)val);
- // taosMemoryFree(value.data);
- // if (value.unixTimestamp != 0 && value.unixTimestamp < unixTime) {
- // return 1;
- // }
- return 0;
+ return streamStateValueIsStale((char*)val) ? 1 : 0;
}
const char* compactFilteName(void* arg) { return "stream_filte"; }
@@ -703,7 +696,6 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, SHashObj* ids) {
memcpy(cfNames[0], "default", strlen("default"));
continue;
}
- qError("cf name %s", idstr);
GEN_COLUMN_FAMILY_NAME(cfNames[i], idstr, ginitDict[(i - 1) % (cfLen)].key);
if (i % cfLen == 0) {
@@ -711,9 +703,6 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, SHashObj* ids) {
if (pIter != NULL) idstr = taosHashGetKey(pIter, &keyLen);
}
}
- for (int i = 0; i < nSize * cfLen + 1; i++) {
- qError("cf name %s", cfNames[i]);
- }
rocksdb_options_t** cfOpts = taosMemoryCalloc(nSize * cfLen + 1, sizeof(rocksdb_options_t*));
RocksdbCfParam* params = taosMemoryCalloc(nSize * cfLen + 1, sizeof(RocksdbCfParam*));
for (int i = 0; i < nSize * cfLen + 1; i++) {
@@ -858,7 +847,6 @@ int streamStateOpenBackend(void* backend, SStreamState* pState) {
if (err != NULL) {
qError("failed to create cf:%s_%s, reason:%s", pState->pTdbState->idstr, ginitDict[i].key, err);
taosMemoryFreeClear(err);
- // return -1;
}
}
pState->pTdbState->rocksdb = handle->db;
@@ -1012,53 +1000,51 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa
taosMemoryFree(ttlV); \
} while (0);
-#define STREAM_STATE_GET_ROCKSDB(pState, funcname, key, pVal, vLen) \
- do { \
- code = 0; \
- char buf[128] = {0}; \
- char* err = NULL; \
- int i = streamGetInit(funcname); \
- if (i < 0) { \
- qWarn("streamState failed to get cf name: %s", funcname); \
- code = -1; \
- break; \
- } \
- char toString[128] = {0}; \
- if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \
- int32_t klen = ginitDict[i].enFunc((void*)key, buf); \
- rocksdb_column_family_handle_t* pHandle = pState->pTdbState->pHandle[ginitDict[i].idx]; \
- rocksdb_t* db = pState->pTdbState->rocksdb; \
- rocksdb_readoptions_t* opts = pState->pTdbState->readOpts; \
- size_t len = 0; \
- char* val = rocksdb_get_cf(db, opts, pHandle, (const char*)buf, klen, (size_t*)&len, &err); \
- if (val == NULL) { \
- qDebug("streamState str: %s failed to read from %s, err: not exist", toString, funcname); \
- if (err != NULL) taosMemoryFree(err); \
- code = -1; \
- } else { \
- char * p = NULL, *end = NULL; \
- int32_t len = ginitDict[i].deValueFunc(val, len, NULL, &p); \
- if (len < 0) { \
- qDebug("streamState str: %s failed to read from %s, err: %s, timeout", toString, funcname, err); \
- code = -1; \
- } else { \
- qDebug("streamState str: %s succ to read from %s, valLen:%d", toString, funcname, len); \
- } \
- if (pVal != NULL) { \
- *pVal = p; \
- } else { \
- taosMemoryFree(p); \
- } \
- taosMemoryFree(val); \
- if (vLen != NULL) *vLen = len; \
- } \
- if (err != NULL) { \
- taosMemoryFree(err); \
- qDebug("streamState str: %s failed to read from %s, err: %s", toString, funcname, err); \
- code = -1; \
- } else { \
- if (code == 0) qDebug("streamState str: %s succ to read from %s", toString, funcname); \
- } \
+#define STREAM_STATE_GET_ROCKSDB(pState, funcname, key, pVal, vLen) \
+ do { \
+ code = 0; \
+ char buf[128] = {0}; \
+ char* err = NULL; \
+ int i = streamGetInit(funcname); \
+ if (i < 0) { \
+ qWarn("streamState failed to get cf name: %s", funcname); \
+ code = -1; \
+ break; \
+ } \
+ char toString[128] = {0}; \
+ if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \
+ int32_t klen = ginitDict[i].enFunc((void*)key, buf); \
+ rocksdb_column_family_handle_t* pHandle = pState->pTdbState->pHandle[ginitDict[i].idx]; \
+ rocksdb_t* db = pState->pTdbState->rocksdb; \
+ rocksdb_readoptions_t* opts = pState->pTdbState->readOpts; \
+ size_t len = 0; \
+ char* val = rocksdb_get_cf(db, opts, pHandle, (const char*)buf, klen, (size_t*)&len, &err); \
+ if (val == NULL) { \
+ if (err == NULL) { \
+ qDebug("streamState str: %s failed to read from %s_%s, err: not exist", toString, pState->pTdbState->idstr, \
+ funcname); \
+ } else { \
+ qDebug("streamState str: %s failed to read from %s_%s, err: %s", toString, pState->pTdbState->idstr, funcname, \
+ err); \
+ taosMemoryFreeClear(err); \
+ } \
+ code = -1; \
+ } else { \
+ char* p = NULL; \
+ int32_t len = ginitDict[i].deValueFunc(val, len, NULL, (char**)pVal); \
+ if (len < 0) { \
+ qDebug("streamState str: %s failed to read from %s_%s, err: already ttl ", toString, pState->pTdbState->idstr, \
+ funcname); \
+ code = -1; \
+ } else { \
+ qDebug("streamState str: %s succ to read from %s_%s, valLen:%d", toString, pState->pTdbState->idstr, funcname, \
+ len); \
+ } \
+ taosMemoryFree(val); \
+ if (vLen != NULL) *vLen = len; \
+ } \
+ if (code == 0) \
+ qDebug("streamState str: %s succ to read from %s_%s", toString, pState->pTdbState->idstr, funcname); \
} while (0);
#define STREAM_STATE_DEL_ROCKSDB(pState, funcname, key) \
@@ -1133,10 +1119,7 @@ int32_t streamStateClear_rocksdb(SStreamState* pState) {
// rocksdb_compact_range_cf(pState->pTdbState->rocksdb, pState->pTdbState->pHandle[0], sKeyStr, sLen, eKeyStr,
// eLen);
if (err != NULL) {
- qWarn(
- "failed to delete range cf(state) err: %s, "
- "start: %s, end:%s",
- err, toStringStart, toStringEnd);
+ qWarn("failed to delete range cf(state) start: %s, end:%s, reason:%s", toStringStart, toStringEnd, err);
taosMemoryFree(err);
}
@@ -1588,20 +1571,17 @@ int32_t streamStateFillGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey,
if (!rocksdb_iter_valid(pCur->iter) || iterValueIsStale(pCur->iter)) {
return -1;
}
- size_t tlen;
- char* keyStr = (char*)rocksdb_iter_key(pCur->iter, &tlen);
+ size_t klen, vlen;
+ char* keyStr = (char*)rocksdb_iter_key(pCur->iter, &klen);
winKeyDecode(&winKey, keyStr);
- size_t vlen = 0;
const char* valStr = rocksdb_iter_value(pCur->iter, &vlen);
- char* dst = NULL;
- int32_t len = decodeValueFunc((void*)valStr, vlen, NULL, &dst);
+ // char* dst = NULL;
+ int32_t len = decodeValueFunc((void*)valStr, vlen, NULL, (char**)pVal);
if (len < 0) {
return -1;
}
-
- if (pVal != NULL) *pVal = (char*)dst;
- if (pVLen != NULL) *pVLen = vlen;
+ if (pVLen != NULL) *pVLen = len;
*pKey = winKey;
return 0;
@@ -1999,7 +1979,7 @@ int32_t streamStateGetBatchSize(void* pBatch) {
void streamStateClearBatch(void* pBatch) { rocksdb_writebatch_clear((rocksdb_writebatch_t*)pBatch); }
void streamStateDestroyBatch(void* pBatch) { rocksdb_writebatch_destroy((rocksdb_writebatch_t*)pBatch); }
int32_t streamStatePutBatch(SStreamState* pState, const char* cfName, rocksdb_writebatch_t* pBatch, void* key,
- void* val, int32_t vlen) {
+ void* val, int32_t vlen, int64_t ttl) {
int i = streamGetInit(cfName);
if (i < 0) {
@@ -2010,7 +1990,7 @@ int32_t streamStatePutBatch(SStreamState* pState, const char* cfName, rocksdb_wr
int32_t klen = ginitDict[i].enFunc((void*)key, buf);
char* ttlV = NULL;
- int32_t ttlVLen = ginitDict[i].enValueFunc(val, vlen, 0, &ttlV);
+ int32_t ttlVLen = ginitDict[i].enValueFunc(val, vlen, ttl, &ttlV);
rocksdb_column_family_handle_t* pCf = pState->pTdbState->pHandle[ginitDict[i].idx];
rocksdb_writebatch_put_cf((rocksdb_writebatch_t*)pBatch, pCf, buf, (size_t)klen, ttlV, (size_t)ttlVLen);
taosMemoryFree(ttlV);
diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c
index 0fb78fb589..5f9aec4db0 100644
--- a/source/libs/stream/src/streamExec.c
+++ b/source/libs/stream/src/streamExec.c
@@ -20,12 +20,12 @@
#define MIN_STREAM_EXEC_BATCH_NUM 16
bool streamTaskShouldStop(const SStreamStatus* pStatus) {
- int32_t status = atomic_load_8((int8_t*) &pStatus->taskStatus);
+ int32_t status = atomic_load_8((int8_t*)&pStatus->taskStatus);
return (status == TASK_STATUS__STOP) || (status == TASK_STATUS__DROPPING);
}
bool streamTaskShouldPause(const SStreamStatus* pStatus) {
- int32_t status = atomic_load_8((int8_t*) &pStatus->taskStatus);
+ int32_t status = atomic_load_8((int8_t*)&pStatus->taskStatus);
return (status == TASK_STATUS__PAUSE);
}
@@ -53,14 +53,14 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray*
ASSERT(pTask->taskLevel == TASK_LEVEL__SOURCE);
const SStreamDataSubmit2* pSubmit = (const SStreamDataSubmit2*)data;
qSetMultiStreamInput(pExecutor, &pSubmit->submit, 1, STREAM_INPUT__DATA_SUBMIT);
- qDebug("s-task:%s set submit blocks as source block completed, %p %p len:%d ver:%" PRId64, pTask->id.idStr, pSubmit, pSubmit->submit.msgStr,
- pSubmit->submit.msgLen, pSubmit->submit.ver);
+ qDebug("s-task:%s set submit blocks as source block completed, %p %p len:%d ver:%" PRId64, pTask->id.idStr, pSubmit,
+ pSubmit->submit.msgStr, pSubmit->submit.msgLen, pSubmit->submit.ver);
} else if (pItem->type == STREAM_INPUT__DATA_BLOCK || pItem->type == STREAM_INPUT__DATA_RETRIEVE) {
const SStreamDataBlock* pBlock = (const SStreamDataBlock*)data;
SArray* pBlockList = pBlock->blocks;
int32_t numOfBlocks = taosArrayGetSize(pBlockList);
- qDebug("s-task:%s set sdata blocks as input num:%d, ver:%"PRId64, pTask->id.idStr, numOfBlocks, pBlock->sourceVer);
+ qDebug("s-task:%s set sdata blocks as input num:%d, ver:%" PRId64, pTask->id.idStr, numOfBlocks, pBlock->sourceVer);
qSetMultiStreamInput(pExecutor, pBlockList->pData, numOfBlocks, STREAM_INPUT__DATA_BLOCK);
} else if (pItem->type == STREAM_INPUT__MERGED_SUBMIT) {
const SStreamMergedSubmit2* pMerged = (const SStreamMergedSubmit2*)data;
@@ -202,7 +202,8 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) {
qRes->blocks = pRes;
code = streamTaskOutput(pTask, qRes);
if (code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY) {
- taosFreeQitem(pRes);
+ taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
+ taosFreeQitem(qRes);
return code;
}
@@ -332,12 +333,12 @@ int32_t streamExecForAll(SStreamTask* pTask) {
int64_t ckId = 0;
int64_t dataVer = 0;
qGetCheckpointVersion(pTask->exec.pExecutor, &dataVer, &ckId);
- if (ckId > pTask->chkInfo.id) { // save it since the checkpoint is updated
+ if (ckId > pTask->chkInfo.id) { // save it since the checkpoint is updated
qDebug("s-task:%s exec end, start to update check point, ver from %" PRId64 " to %" PRId64
", checkPoint id:%" PRId64 " -> %" PRId64,
pTask->id.idStr, pTask->chkInfo.version, dataVer, pTask->chkInfo.id, ckId);
- pTask->chkInfo = (SCheckpointInfo) {.version = dataVer, .id = ckId, .currentVer = pTask->chkInfo.currentVer};
+ pTask->chkInfo = (SCheckpointInfo){.version = dataVer, .id = ckId, .currentVer = pTask->chkInfo.currentVer};
taosWLockLatch(&pTask->pMeta->lock);
diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c
index de56cf24ca..682ce08c7f 100644
--- a/source/libs/stream/src/streamMeta.c
+++ b/source/libs/stream/src/streamMeta.c
@@ -19,6 +19,13 @@
#include "tref.h"
#include "ttimer.h"
+static TdThreadOnce streamMetaModuleInit = PTHREAD_ONCE_INIT;
+static int32_t streamBackendId = 0;
+static void streamMetaEnvInit() { streamBackendId = taosOpenRef(20, streamBackendCleanup); }
+
+void streamMetaInit() { taosThreadOnce(&streamMetaModuleInit, streamMetaEnvInit); }
+void streamMetaCleanup() { taosCloseRef(streamBackendId); }
+
SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId) {
int32_t code = -1;
SStreamMeta* pMeta = taosMemoryCalloc(1, sizeof(SStreamMeta));
@@ -32,18 +39,16 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
sprintf(streamPath, "%s/%s", path, "stream");
pMeta->path = taosStrdup(streamPath);
if (tdbOpen(pMeta->path, 16 * 1024, 1, &pMeta->db, 0) < 0) {
- taosMemoryFree(streamPath);
goto _err;
}
+ memset(streamPath, 0, len);
sprintf(streamPath, "%s/%s", pMeta->path, "checkpoints");
code = taosMulModeMkDir(streamPath, 0755);
if (code != 0) {
terrno = TAOS_SYSTEM_ERROR(code);
- taosMemoryFree(streamPath);
goto _err;
}
- taosMemoryFree(streamPath);
if (tdbTbOpen("task.db", sizeof(int32_t), -1, NULL, pMeta->db, &pMeta->pTaskDb, 0) < 0) {
goto _err;
@@ -74,26 +79,26 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
pMeta->vgId = vgId;
pMeta->ahandle = ahandle;
pMeta->expandFunc = expandFunc;
+ pMeta->streamBackendId = streamBackendId;
- char* statePath = taosMemoryCalloc(1, len);
- sprintf(statePath, "%s/%s", pMeta->path, "state");
- code = taosMulModeMkDir(statePath, 0755);
+ memset(streamPath, 0, len);
+ sprintf(streamPath, "%s/%s", pMeta->path, "state");
+ code = taosMulModeMkDir(streamPath, 0755);
if (code != 0) {
terrno = TAOS_SYSTEM_ERROR(code);
- taosMemoryFree(streamPath);
goto _err;
}
- pMeta->streamBackend = streamBackendInit(statePath);
- pMeta->streamBackendId = taosOpenRef(20, streamBackendCleanup);
- pMeta->streamBackendRid = taosAddRef(pMeta->streamBackendId, pMeta->streamBackend);
+ pMeta->streamBackend = streamBackendInit(streamPath);
+ pMeta->streamBackendRid = taosAddRef(streamBackendId, pMeta->streamBackend);
- taosMemoryFree(statePath);
+ taosMemoryFree(streamPath);
taosInitRWLatch(&pMeta->lock);
return pMeta;
_err:
+ taosMemoryFree(streamPath);
taosMemoryFree(pMeta->path);
if (pMeta->pTasks) taosHashCleanup(pMeta->pTasks);
if (pMeta->pTaskList) taosArrayDestroy(pMeta->pTaskList);
@@ -129,9 +134,7 @@ void streamMetaClose(SStreamMeta* pMeta) {
}
taosHashCleanup(pMeta->pTasks);
- taosRemoveRef(pMeta->streamBackendId, pMeta->streamBackendRid);
- // streamBackendCleanup(pMeta->streamBackend);
- taosCloseRef(pMeta->streamBackendId);
+ taosRemoveRef(streamBackendId, pMeta->streamBackendRid);
pMeta->pTaskList = taosArrayDestroy(pMeta->pTaskList);
taosMemoryFree(pMeta->path);
taosMemoryFree(pMeta);
@@ -265,13 +268,9 @@ void streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) {
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t));
if (ppTask) {
SStreamTask* pTask = *ppTask;
-
- // taosWLockLatch(&pMeta->lock);
-
taosHashRemove(pMeta->pTasks, &taskId, sizeof(int32_t));
tdbTbDelete(pMeta->pTaskDb, &taskId, sizeof(int32_t), pMeta->txn);
- //
atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__DROPPING);
int32_t num = taosArrayGetSize(pMeta->pTaskList);
diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c
index 1cca4d55cf..373cb27941 100644
--- a/source/libs/stream/src/streamState.c
+++ b/source/libs/stream/src/streamState.c
@@ -115,7 +115,7 @@ SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int
pState->taskId = pTask->id.taskId;
pState->streamId = pTask->id.streamId;
#ifdef USE_ROCKSDB
- qWarn("open stream state1");
+ // qWarn("open stream state1");
taosAcquireRef(pTask->pMeta->streamBackendId, pTask->pMeta->streamBackendRid);
int code = streamStateOpenBackend(pTask->pMeta->streamBackend, pState);
if (code == -1) {
@@ -220,6 +220,7 @@ void streamStateClose(SStreamState* pState, bool remove) {
#ifdef USE_ROCKSDB
// streamStateCloseBackend(pState);
streamStateDestroy(pState, remove);
+ taosReleaseRef(pTask->pMeta->streamBackendId, pTask->pMeta->streamBackendRid);
#else
tdbCommit(pState->pTdbState->db, pState->pTdbState->txn);
tdbPostCommit(pState->pTdbState->db, pState->pTdbState->txn);
@@ -231,7 +232,6 @@ void streamStateClose(SStreamState* pState, bool remove) {
tdbTbClose(pState->pTdbState->pParTagDb);
tdbClose(pState->pTdbState->db);
#endif
- taosReleaseRef(pTask->pMeta->streamBackendId, pTask->pMeta->streamBackendRid);
}
int32_t streamStateBegin(SStreamState* pState) {
@@ -399,7 +399,7 @@ int32_t streamStateSaveInfo(SStreamState* pState, void* pKey, int32_t keyLen, vo
int32_t code = 0;
void* batch = streamStateCreateBatch();
- code = streamStatePutBatch(pState, "default", batch, pKey, pVal, vLen);
+ code = streamStatePutBatch(pState, "default", batch, pKey, pVal, vLen, 0);
if (code != 0) {
return code;
}
diff --git a/source/libs/stream/src/tstreamFileState.c b/source/libs/stream/src/tstreamFileState.c
index b7401ec5d9..67835e77b8 100644
--- a/source/libs/stream/src/tstreamFileState.c
+++ b/source/libs/stream/src/tstreamFileState.c
@@ -15,6 +15,7 @@
#include "tstreamFileState.h"
+#include "query.h"
#include "streamBackendRocksdb.h"
#include "taos.h"
#include "tcommon.h"
@@ -154,9 +155,7 @@ void streamFileStateClear(SStreamFileState* pFileState) {
clearExpiredRowBuff(pFileState, 0, true);
}
-bool needClearDiskBuff(SStreamFileState* pFileState) {
- return pFileState->flushMark > 0;
-}
+bool needClearDiskBuff(SStreamFileState* pFileState) { return pFileState->flushMark > 0; }
void popUsedBuffs(SStreamFileState* pFileState, SStreamSnapshot* pFlushList, uint64_t max, bool used) {
uint64_t i = 0;
@@ -325,7 +324,9 @@ bool hasRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen) {
void releaseRowBuffPos(SRowBuffPos* pBuff) { pBuff->beUsed = false; }
SStreamSnapshot* getSnapshot(SStreamFileState* pFileState) {
- clearExpiredRowBuff(pFileState, pFileState->maxTs - pFileState->deleteMark, false);
+ int64_t mark = (INT64_MIN + pFileState->deleteMark >= pFileState->maxTs) ? INT64_MIN
+ : pFileState->maxTs - pFileState->deleteMark;
+ clearExpiredRowBuff(pFileState, mark, false);
return pFileState->usedBuffs;
}
@@ -356,7 +357,7 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot,
}
SStateKey sKey = {.key = *((SWinKey*)pPos->pKey), .opNum = ((SStreamState*)pFileState->pFileStore)->number};
- code = streamStatePutBatch(pFileState->pFileStore, "state", batch, &sKey, pPos->pRowBuff, pFileState->rowSize);
+ code = streamStatePutBatch(pFileState->pFileStore, "state", batch, &sKey, pPos->pRowBuff, pFileState->rowSize, 0);
qDebug("===stream===put %" PRId64 " to disc, res %d", sKey.key.ts, code);
}
if (streamStateGetBatchSize(batch) > 0) {
@@ -372,7 +373,7 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot,
int32_t len = 0;
sprintf(keyBuf, "%s:%" PRId64 "", taskKey, ((SStreamState*)pFileState->pFileStore)->checkPointId);
streamFileStateEncode(&pFileState->flushMark, &valBuf, &len);
- code = streamStatePutBatch(pFileState->pFileStore, "default", batch, keyBuf, valBuf, len);
+ code = streamStatePutBatch(pFileState->pFileStore, "default", batch, keyBuf, valBuf, len, 0);
taosMemoryFree(valBuf);
}
{
@@ -381,7 +382,7 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot,
int32_t len = 0;
memcpy(keyBuf, taskKey, strlen(taskKey));
len = sprintf(valBuf, "%" PRId64 "", ((SStreamState*)pFileState->pFileStore)->checkPointId);
- code = streamStatePutBatch(pFileState->pFileStore, "default", batch, keyBuf, valBuf, len);
+ code = streamStatePutBatch(pFileState->pFileStore, "default", batch, keyBuf, valBuf, len, 0);
}
streamStatePutBatch_rocksdb(pFileState->pFileStore, batch);
}
@@ -440,7 +441,9 @@ int32_t deleteExpiredCheckPoint(SStreamFileState* pFileState, TSKEY mark) {
int32_t recoverSnapshot(SStreamFileState* pFileState) {
int32_t code = TSDB_CODE_SUCCESS;
- deleteExpiredCheckPoint(pFileState, pFileState->maxTs - pFileState->deleteMark);
+ int64_t mark = (INT64_MIN + pFileState->deleteMark >= pFileState->maxTs) ? INT64_MIN
+ : pFileState->maxTs - pFileState->deleteMark;
+ deleteExpiredCheckPoint(pFileState, mark);
void* pStVal = NULL;
int32_t len = 0;
diff --git a/source/libs/stream/test/CMakeLists.txt b/source/libs/stream/test/CMakeLists.txt
index 5a97ba45f6..a0c1717690 100644
--- a/source/libs/stream/test/CMakeLists.txt
+++ b/source/libs/stream/test/CMakeLists.txt
@@ -10,7 +10,7 @@ ADD_EXECUTABLE(streamUpdateTest "tstreamUpdateTest.cpp")
TARGET_LINK_LIBRARIES(
streamUpdateTest
- PUBLIC os util common gtest stream
+ PUBLIC os util common gtest gtest_main stream
)
TARGET_INCLUDE_DIRECTORIES(
diff --git a/source/libs/stream/test/tstreamUpdateTest.cpp b/source/libs/stream/test/tstreamUpdateTest.cpp
index c698187874..18c60aff28 100644
--- a/source/libs/stream/test/tstreamUpdateTest.cpp
+++ b/source/libs/stream/test/tstreamUpdateTest.cpp
@@ -1,11 +1,28 @@
#include
+#include "streamBackendRocksdb.h"
+#include "tstream.h"
#include "tstreamUpdate.h"
#include "ttime.h"
using namespace std;
#define MAX_NUM_SCALABLE_BF 100000
+class StreamStateEnv : public ::testing::Test {
+ protected:
+ virtual void SetUp() {
+ streamMetaInit();
+ backend = streamBackendInit(path);
+ }
+ virtual void TearDown() {
+ streamMetaCleanup();
+ // indexClose(index);
+ }
+
+ const char *path = TD_TMP_DIR_PATH "stream";
+ void *backend;
+};
+
bool equalSBF(SScalableBf *left, SScalableBf *right) {
if (left->growth != right->growth) return false;
if (left->numBits != right->numBits) return false;
@@ -191,8 +208,9 @@ TEST(TD_STREAM_UPDATE_TEST, update) {
// updateInfoDestroy(pSU6);
// updateInfoDestroy(pSU7);
}
-
-int main(int argc, char *argv[]) {
- testing::InitGoogleTest(&argc, argv);
- return RUN_ALL_TESTS();
-}
\ No newline at end of file
+// TEST()
+TEST(StreamStateEnv, test1) {}
+// int main(int argc, char *argv[]) {
+// testing::InitGoogleTest(&argc, argv);
+// return RUN_ALL_TESTS();
+// }
\ No newline at end of file