other: revert to 3.0
This commit is contained in:
parent
39cd95d825
commit
e75ed9f06d
|
@ -13,104 +13,193 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
// clang-format off
|
#include "executor.h"
|
||||||
#include <assert.h>
|
#include "streamInc.h"
|
||||||
#include <stdio.h>
|
#include "tcommon.h"
|
||||||
#include <string.h>
|
#include "ttimer.h"
|
||||||
#include <time.h>
|
|
||||||
#include "taos.h"
|
|
||||||
|
|
||||||
int32_t init_env() {
|
SStreamState* streamStateOpen(char* path, SStreamTask* pTask) {
|
||||||
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
SStreamState* pState = taosMemoryCalloc(1, sizeof(SStreamState));
|
||||||
if (pConn == NULL) {
|
if (pState == NULL) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
char statePath[300];
|
||||||
|
sprintf(statePath, "%s/%d", path, pTask->taskId);
|
||||||
|
if (tdbOpen(statePath, 4096, 256, &pState->db) < 0) {
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
|
// open state storage backend
|
||||||
|
if (tdbTbOpen("state.db", sizeof(SWinKey), -1, SWinKeyCmpr, pState->db, &pState->pStateDb) < 0) {
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (streamStateBegin(pState) < 0) {
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
|
pState->pOwner = pTask;
|
||||||
|
|
||||||
|
return pState;
|
||||||
|
|
||||||
|
_err:
|
||||||
|
if (pState->pStateDb) tdbTbClose(pState->pStateDb);
|
||||||
|
if (pState->db) tdbClose(pState->db);
|
||||||
|
taosMemoryFree(pState);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
void streamStateClose(SStreamState* pState) {
|
||||||
|
tdbCommit(pState->db, &pState->txn);
|
||||||
|
tdbTbClose(pState->pStateDb);
|
||||||
|
tdbClose(pState->db);
|
||||||
|
|
||||||
|
taosMemoryFree(pState);
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t streamStateBegin(SStreamState* pState) {
|
||||||
|
if (tdbTxnOpen(&pState->txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) <
|
||||||
|
0) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
TAOS_RES* pRes = taos_query(pConn, "create database if not exists abc1 vgroups 2");
|
if (tdbBegin(pState->db, &pState->txn) < 0) {
|
||||||
if (taos_errno(pRes) != 0) {
|
tdbTxnClose(&pState->txn);
|
||||||
printf("error in create db, reason:%s\n", taos_errstr(pRes));
|
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
taos_free_result(pRes);
|
|
||||||
|
|
||||||
#if 0
|
|
||||||
pRes = taos_query(pConn, "create database if not exists abc2 vgroups 20");
|
|
||||||
if (taos_errno(pRes) != 0) {
|
|
||||||
printf("error in create db, reason:%s\n", taos_errstr(pRes));
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
taos_free_result(pRes);
|
|
||||||
#endif
|
|
||||||
|
|
||||||
pRes = taos_query(pConn, "use abc1");
|
|
||||||
if (taos_errno(pRes) != 0) {
|
|
||||||
printf("error in use db, reason:%s\n", taos_errstr(pRes));
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
taos_free_result(pRes);
|
|
||||||
|
|
||||||
pRes = taos_query(pConn, "create stable if not exists st1 (ts timestamp, k int) tags(a int)");
|
|
||||||
if (taos_errno(pRes) != 0) {
|
|
||||||
printf("failed to create super table st1, reason:%s\n", taos_errstr(pRes));
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
taos_free_result(pRes);
|
|
||||||
|
|
||||||
pRes = taos_query(pConn, "create table if not exists tu1 using st1 tags(1)");
|
|
||||||
if (taos_errno(pRes) != 0) {
|
|
||||||
printf("failed to create child table tu1, reason:%s\n", taos_errstr(pRes));
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
taos_free_result(pRes);
|
|
||||||
|
|
||||||
pRes = taos_query(pConn, "create table if not exists tu2 using st1 tags(2)");
|
|
||||||
if (taos_errno(pRes) != 0) {
|
|
||||||
printf("failed to create child table tu2, reason:%s\n", taos_errstr(pRes));
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
taos_free_result(pRes);
|
|
||||||
|
|
||||||
pRes = taos_query(pConn, "create table if not exists tu3 using st1 tags(3)");
|
|
||||||
if (taos_errno(pRes) != 0) {
|
|
||||||
printf("failed to create child table tu3, reason:%s\n", taos_errstr(pRes));
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
taos_free_result(pRes);
|
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t create_stream() {
|
int32_t streamStateCommit(SStreamState* pState) {
|
||||||
printf("create stream\n");
|
if (tdbCommit(pState->db, &pState->txn) < 0) {
|
||||||
TAOS_RES* pRes;
|
|
||||||
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
|
||||||
if (pConn == NULL) {
|
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
memset(&pState->txn, 0, sizeof(TXN));
|
||||||
pRes = taos_query(pConn, "use abc1");
|
if (tdbTxnOpen(&pState->txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) <
|
||||||
if (taos_errno(pRes) != 0) {
|
0) {
|
||||||
printf("error in use db, reason:%s\n", taos_errstr(pRes));
|
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
taos_free_result(pRes);
|
if (tdbBegin(pState->db, &pState->txn) < 0) {
|
||||||
|
|
||||||
pRes = taos_query(pConn,
|
|
||||||
"create stream stream1 trigger at_once watermark 10s into outstb as select _wstart start, avg(k) from st1 partition by tbname interval(10s)");
|
|
||||||
if (taos_errno(pRes) != 0) {
|
|
||||||
printf("failed to create stream stream1, reason:%s\n", taos_errstr(pRes));
|
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
taos_free_result(pRes);
|
|
||||||
taos_close(pConn);
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int main(int argc, char* argv[]) {
|
int32_t streamStateAbort(SStreamState* pState) {
|
||||||
int code;
|
if (tdbAbort(pState->db, &pState->txn) < 0) {
|
||||||
if (argc > 1) {
|
return -1;
|
||||||
printf("env init\n");
|
|
||||||
code = init_env();
|
|
||||||
}
|
}
|
||||||
create_stream();
|
memset(&pState->txn, 0, sizeof(TXN));
|
||||||
|
if (tdbTxnOpen(&pState->txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) <
|
||||||
|
0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
if (tdbBegin(pState->db, &pState->txn) < 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t streamStatePut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) {
|
||||||
|
return tdbTbUpsert(pState->pStateDb, key, sizeof(SWinKey), value, vLen, &pState->txn);
|
||||||
|
}
|
||||||
|
int32_t streamStateGet(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) {
|
||||||
|
return tdbTbGet(pState->pStateDb, key, sizeof(SWinKey), pVal, pVLen);
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t streamStateDel(SStreamState* pState, const SWinKey* key) {
|
||||||
|
return tdbTbDelete(pState->pStateDb, key, sizeof(SWinKey), &pState->txn);
|
||||||
|
}
|
||||||
|
|
||||||
|
SStreamStateCur* streamStateGetCur(SStreamState* pState, const SWinKey* key) {
|
||||||
|
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
|
||||||
|
if (pCur == NULL) return NULL;
|
||||||
|
tdbTbcOpen(pState->pStateDb, &pCur->pCur, NULL);
|
||||||
|
|
||||||
|
int32_t c;
|
||||||
|
tdbTbcMoveTo(pCur->pCur, key, sizeof(SWinKey), &c);
|
||||||
|
if (c != 0) {
|
||||||
|
taosMemoryFree(pCur);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
return pCur;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t streamStateGetKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) {
|
||||||
|
const SWinKey* pKTmp = NULL;
|
||||||
|
int32_t kLen;
|
||||||
|
if (tdbTbcGet(pCur->pCur, (const void**)&pKTmp, &kLen, pVal, pVLen) < 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
*pKey = *pKTmp;
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t streamStateSeekFirst(SStreamState* pState, SStreamStateCur* pCur) {
|
||||||
|
//
|
||||||
|
return tdbTbcMoveToFirst(pCur->pCur);
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t streamStateSeekLast(SStreamState* pState, SStreamStateCur* pCur) {
|
||||||
|
//
|
||||||
|
return tdbTbcMoveToLast(pCur->pCur);
|
||||||
|
}
|
||||||
|
|
||||||
|
SStreamStateCur* streamStateSeekKeyNext(SStreamState* pState, const SWinKey* key) {
|
||||||
|
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
|
||||||
|
if (pCur == NULL) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t c;
|
||||||
|
if (tdbTbcMoveTo(pCur->pCur, key, sizeof(SWinKey), &c) < 0) {
|
||||||
|
taosMemoryFree(pCur);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
if (c > 0) return pCur;
|
||||||
|
|
||||||
|
if (tdbTbcMoveToNext(pCur->pCur) < 0) {
|
||||||
|
taosMemoryFree(pCur);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
return pCur;
|
||||||
|
}
|
||||||
|
|
||||||
|
SStreamStateCur* streamStateSeekKeyPrev(SStreamState* pState, const SWinKey* key) {
|
||||||
|
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
|
||||||
|
if (pCur == NULL) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t c;
|
||||||
|
if (tdbTbcMoveTo(pCur->pCur, key, sizeof(SWinKey), &c) < 0) {
|
||||||
|
taosMemoryFree(pCur);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
if (c < 0) return pCur;
|
||||||
|
|
||||||
|
if (tdbTbcMoveToPrev(pCur->pCur) < 0) {
|
||||||
|
taosMemoryFree(pCur);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
return pCur;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t streamStateCurNext(SStreamState* pState, SStreamStateCur* pCur) {
|
||||||
|
//
|
||||||
|
return tdbTbcMoveToNext(pCur->pCur);
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t streamStateCurPrev(SStreamState* pState, SStreamStateCur* pCur) {
|
||||||
|
//
|
||||||
|
return tdbTbcMoveToPrev(pCur->pCur);
|
||||||
|
}
|
||||||
|
void streamStateFreeCur(SStreamStateCur* pCur) {
|
||||||
|
tdbTbcClose(pCur->pCur);
|
||||||
|
taosMemoryFree(pCur);
|
||||||
|
}
|
||||||
|
|
||||||
|
void streamFreeVal(void* val) { tdbFree(val); }
|
||||||
|
|
Loading…
Reference in New Issue