refactor stream backend

This commit is contained in:
yihaoDeng 2023-10-24 11:53:52 +08:00
parent 986a502019
commit d46cf878eb
3 changed files with 31 additions and 11 deletions

View File

@ -1120,10 +1120,11 @@ static const char *mndGetStreamDB(SMnode *pMnode) {
}
static int32_t mndCheckNodeStatus(SMnode *pMnode) {
bool ready = true;
bool ready = false;
// check if the node update happens or not
int64_t ts = taosGetTimestampSec();
taosThreadMutexLock(&execNodeList.lock);
if (execNodeList.pNodeEntryList == NULL || (taosArrayGetSize(execNodeList.pNodeEntryList) == 0)) {
if (execNodeList.pNodeEntryList != NULL) {
execNodeList.pNodeEntryList = taosArrayDestroy(execNodeList.pNodeEntryList);
@ -1135,14 +1136,14 @@ static int32_t mndCheckNodeStatus(SMnode *pMnode) {
if (taosArrayGetSize(execNodeList.pNodeEntryList) == 0) {
mDebug("stream task node change checking done, no vgroups exist, do nothing");
execNodeList.ts = ts;
return -1;
goto _EXIT;
}
for (int32_t i = 0; i < taosArrayGetSize(execNodeList.pNodeEntryList); ++i) {
SNodeEntry *pNodeEntry = taosArrayGet(execNodeList.pNodeEntryList, i);
if (pNodeEntry->stageUpdated) {
mDebug("stream task not ready due to node update detected, checkpoint not issued");
return -1;
goto _EXIT;
}
SArray *pNodeSnapshot = mndTakeVgroupSnapshot(pMnode);
@ -1155,13 +1156,12 @@ static int32_t mndCheckNodeStatus(SMnode *pMnode) {
if (nodeUpdated) {
mDebug("stream task not ready due to node update, checkpoint not issued");
return -1;
goto _EXIT;
}
}
// check if all tasks are in TASK_STATUS__NORMAL status
taosThreadMutexLock(&execNodeList.lock);
for (int32_t i = 0; i < taosArrayGetSize(execNodeList.pTaskList); ++i) {
STaskId *p = taosArrayGet(execNodeList.pTaskList, i);
STaskStatusEntry *pEntry = taosHashGet(execNodeList.pTaskMap, p, sizeof(*p));
@ -1176,6 +1176,9 @@ static int32_t mndCheckNodeStatus(SMnode *pMnode) {
break;
}
}
ready = true;
_EXIT:
taosThreadMutexUnlock(&execNodeList.lock);
return ready == true ? 0 : -1;
}

View File

@ -2637,6 +2637,9 @@ int32_t streamStateFuncDel_rocksdb(SStreamState* pState, const STupleKey* key) {
int32_t streamStateSessionPut_rocksdb(SStreamState* pState, const SSessionKey* key, const void* value, int32_t vLen) {
int code = 0;
SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
if (value == NULL || vLen == 0) {
stError("streamStateSessionPut_rocksdb val: %p, len: %d", value, vLen);
}
STREAM_STATE_PUT_ROCKSDB(pState, "sess", &sKey, value, vLen);
return code;
}
@ -2685,11 +2688,11 @@ SStreamStateCur* streamStateSessionSeekToLast_rocksdb(SStreamState* pState) {
if (code != 0) {
return NULL;
}
STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper;
SStreamStateCur* pCur = createStreamStateCursor();
SStreamStateCur* pCur = createStreamStateCursor();
pCur->number = pState->number;
pCur->db = wrapper->rocksdb;
pCur->db = wrapper->db;
pCur->iter = streamStateIterCreate(pState, "sess", (rocksdb_snapshot_t**)&pCur->snapshot,
(rocksdb_readoptions_t**)&pCur->readOpt);

View File

@ -706,9 +706,23 @@ void streamStateFreeVal(void* val) {
int32_t streamStateSessionPut(SStreamState* pState, const SSessionKey* key, void* value, int32_t vLen) {
#ifdef USE_ROCKSDB
qDebug("===stream===save skey:%" PRId64 ", ekey:%" PRId64 ", groupId:%" PRIu64, key->win.skey, key->win.ekey,
key->groupId);
return streamStateSessionPut_rocksdb(pState, key, value, vLen);
int32_t code = TSDB_CODE_SUCCESS;
SRowBuffPos* pos = (SRowBuffPos*)value;
if (pos->needFree) {
if (isFlushedState(pState->pFileState, key->win.ekey, 0)) {
if (!pos->pRowBuff) {
return code;
}
code = streamStateSessionPut_rocksdb(pState, key, pos->pRowBuff, vLen);
streamStateReleaseBuf(pState, pos, true);
putFreeBuff(pState->pFileState, pos);
stDebug("===stream===save skey:%" PRId64 ", ekey:%" PRId64 ", groupId:%" PRIu64 ".code:%d", key->win.skey,
key->win.ekey, key->groupId, code);
} else {
code = putSessionWinResultBuff(pState->pFileState, value);
}
}
return code;
#else
SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
return tdbTbUpsert(pState->pTdbState->pSessionStateDb, &sKey, sizeof(SStateSessionKey), value, vLen,