diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index 1c9d11b755..dfd6f012cc 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -13,104 +13,193 @@ * along with this program. If not, see . */ -// clang-format off -#include -#include -#include -#include -#include "taos.h" +#include "executor.h" +#include "streamInc.h" +#include "tcommon.h" +#include "ttimer.h" -int32_t init_env() { - TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); - if (pConn == NULL) { +SStreamState* streamStateOpen(char* path, SStreamTask* pTask) { + SStreamState* pState = taosMemoryCalloc(1, sizeof(SStreamState)); + if (pState == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + char statePath[300]; + sprintf(statePath, "%s/%d", path, pTask->taskId); + if (tdbOpen(statePath, 4096, 256, &pState->db) < 0) { + goto _err; + } + + // open state storage backend + if (tdbTbOpen("state.db", sizeof(SWinKey), -1, SWinKeyCmpr, pState->db, &pState->pStateDb) < 0) { + goto _err; + } + + if (streamStateBegin(pState) < 0) { + goto _err; + } + + pState->pOwner = pTask; + + return pState; + +_err: + if (pState->pStateDb) tdbTbClose(pState->pStateDb); + if (pState->db) tdbClose(pState->db); + taosMemoryFree(pState); + return NULL; +} + +void streamStateClose(SStreamState* pState) { + tdbCommit(pState->db, &pState->txn); + tdbTbClose(pState->pStateDb); + tdbClose(pState->db); + + taosMemoryFree(pState); +} + +int32_t streamStateBegin(SStreamState* pState) { + if (tdbTxnOpen(&pState->txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < + 0) { return -1; } - TAOS_RES* pRes = taos_query(pConn, "create database if not exists abc1 vgroups 2"); - if (taos_errno(pRes) != 0) { - printf("error in create db, reason:%s\n", taos_errstr(pRes)); + if (tdbBegin(pState->db, &pState->txn) < 0) { + tdbTxnClose(&pState->txn); return -1; } - taos_free_result(pRes); - -#if 0 - pRes = taos_query(pConn, "create database if not exists abc2 vgroups 20"); - if (taos_errno(pRes) != 0) { - printf("error in create db, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); -#endif - - pRes = taos_query(pConn, "use abc1"); - if (taos_errno(pRes) != 0) { - printf("error in use db, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); - - pRes = taos_query(pConn, "create stable if not exists st1 (ts timestamp, k int) tags(a int)"); - if (taos_errno(pRes) != 0) { - printf("failed to create super table st1, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); - - pRes = taos_query(pConn, "create table if not exists tu1 using st1 tags(1)"); - if (taos_errno(pRes) != 0) { - printf("failed to create child table tu1, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); - - pRes = taos_query(pConn, "create table if not exists tu2 using st1 tags(2)"); - if (taos_errno(pRes) != 0) { - printf("failed to create child table tu2, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); - - pRes = taos_query(pConn, "create table if not exists tu3 using st1 tags(3)"); - if (taos_errno(pRes) != 0) { - printf("failed to create child table tu3, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); - return 0; } -int32_t create_stream() { - printf("create stream\n"); - TAOS_RES* pRes; - TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); - if (pConn == NULL) { +int32_t streamStateCommit(SStreamState* pState) { + if (tdbCommit(pState->db, &pState->txn) < 0) { return -1; } - - pRes = taos_query(pConn, "use abc1"); - if (taos_errno(pRes) != 0) { - printf("error in use db, reason:%s\n", taos_errstr(pRes)); + memset(&pState->txn, 0, sizeof(TXN)); + if (tdbTxnOpen(&pState->txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < + 0) { return -1; } - taos_free_result(pRes); - - pRes = taos_query(pConn, - "create stream stream1 trigger at_once watermark 10s into outstb as select _wstart start, avg(k) from st1 partition by tbname interval(10s)"); - if (taos_errno(pRes) != 0) { - printf("failed to create stream stream1, reason:%s\n", taos_errstr(pRes)); + if (tdbBegin(pState->db, &pState->txn) < 0) { return -1; } - taos_free_result(pRes); - taos_close(pConn); return 0; } -int main(int argc, char* argv[]) { - int code; - if (argc > 1) { - printf("env init\n"); - code = init_env(); +int32_t streamStateAbort(SStreamState* pState) { + if (tdbAbort(pState->db, &pState->txn) < 0) { + return -1; } - create_stream(); + memset(&pState->txn, 0, sizeof(TXN)); + if (tdbTxnOpen(&pState->txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < + 0) { + return -1; + } + if (tdbBegin(pState->db, &pState->txn) < 0) { + return -1; + } + return 0; } + +int32_t streamStatePut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) { + return tdbTbUpsert(pState->pStateDb, key, sizeof(SWinKey), value, vLen, &pState->txn); +} +int32_t streamStateGet(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) { + return tdbTbGet(pState->pStateDb, key, sizeof(SWinKey), pVal, pVLen); +} + +int32_t streamStateDel(SStreamState* pState, const SWinKey* key) { + return tdbTbDelete(pState->pStateDb, key, sizeof(SWinKey), &pState->txn); +} + +SStreamStateCur* streamStateGetCur(SStreamState* pState, const SWinKey* key) { + SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); + if (pCur == NULL) return NULL; + tdbTbcOpen(pState->pStateDb, &pCur->pCur, NULL); + + int32_t c; + tdbTbcMoveTo(pCur->pCur, key, sizeof(SWinKey), &c); + if (c != 0) { + taosMemoryFree(pCur); + return NULL; + } + return pCur; +} + +int32_t streamStateGetKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) { + const SWinKey* pKTmp = NULL; + int32_t kLen; + if (tdbTbcGet(pCur->pCur, (const void**)&pKTmp, &kLen, pVal, pVLen) < 0) { + return -1; + } + *pKey = *pKTmp; + return 0; +} + +int32_t streamStateSeekFirst(SStreamState* pState, SStreamStateCur* pCur) { + // + return tdbTbcMoveToFirst(pCur->pCur); +} + +int32_t streamStateSeekLast(SStreamState* pState, SStreamStateCur* pCur) { + // + return tdbTbcMoveToLast(pCur->pCur); +} + +SStreamStateCur* streamStateSeekKeyNext(SStreamState* pState, const SWinKey* key) { + SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); + if (pCur == NULL) { + return NULL; + } + + int32_t c; + if (tdbTbcMoveTo(pCur->pCur, key, sizeof(SWinKey), &c) < 0) { + taosMemoryFree(pCur); + return NULL; + } + if (c > 0) return pCur; + + if (tdbTbcMoveToNext(pCur->pCur) < 0) { + taosMemoryFree(pCur); + return NULL; + } + + return pCur; +} + +SStreamStateCur* streamStateSeekKeyPrev(SStreamState* pState, const SWinKey* key) { + SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); + if (pCur == NULL) { + return NULL; + } + + int32_t c; + if (tdbTbcMoveTo(pCur->pCur, key, sizeof(SWinKey), &c) < 0) { + taosMemoryFree(pCur); + return NULL; + } + if (c < 0) return pCur; + + if (tdbTbcMoveToPrev(pCur->pCur) < 0) { + taosMemoryFree(pCur); + return NULL; + } + + return pCur; +} + +int32_t streamStateCurNext(SStreamState* pState, SStreamStateCur* pCur) { + // + return tdbTbcMoveToNext(pCur->pCur); +} + +int32_t streamStateCurPrev(SStreamState* pState, SStreamStateCur* pCur) { + // + return tdbTbcMoveToPrev(pCur->pCur); +} +void streamStateFreeCur(SStreamStateCur* pCur) { + tdbTbcClose(pCur->pCur); + taosMemoryFree(pCur); +} + +void streamFreeVal(void* val) { tdbFree(val); }