merge rocksdb inst
This commit is contained in:
parent
984a69671e
commit
c4f36d32ae
|
@ -381,7 +381,7 @@ const char* compareParKeyName(void* name) { return ginitDict[5].key; }
|
||||||
const char* comparePartagKeyName(void* name) { return ginitDict[6].key; }
|
const char* comparePartagKeyName(void* name) { return ginitDict[6].key; }
|
||||||
|
|
||||||
int streamStateOpenBackend(void* backend, SStreamState* pState) {
|
int streamStateOpenBackend(void* backend, SStreamState* pState) {
|
||||||
qError("start to open backend, %p, %d-%d", pState, pState->streamId, pState->taskId);
|
qInfo("start to open backend, %p, %d-%d", pState, pState->streamId, pState->taskId);
|
||||||
SBackendHandle* handle = backend;
|
SBackendHandle* handle = backend;
|
||||||
|
|
||||||
char* err = NULL;
|
char* err = NULL;
|
||||||
|
@ -437,12 +437,11 @@ int streamStateOpenBackend(void* backend, SStreamState* pState) {
|
||||||
pState->pTdbState->pComparNode = streamBackendAddCompare(handle, &compare);
|
pState->pTdbState->pComparNode = streamBackendAddCompare(handle, &compare);
|
||||||
|
|
||||||
rocksdb_writeoptions_disable_WAL(pState->pTdbState->writeOpts, 1);
|
rocksdb_writeoptions_disable_WAL(pState->pTdbState->writeOpts, 1);
|
||||||
qError("end to open backend, %p", pState);
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
void streamStateCloseBackend(SStreamState* pState, bool remove) {
|
void streamStateCloseBackend(SStreamState* pState, bool remove) {
|
||||||
char* status[] = {"remove", "drop"};
|
char* status[] = {"remove", "drop"};
|
||||||
qError("start to %s backend, %p, %d-%d", status[remove == false ? 1 : 0], pState, pState->streamId, pState->taskId);
|
qInfo("start to %s backend, %p, %d-%d", status[remove == false ? 1 : 0], pState, pState->streamId, pState->taskId);
|
||||||
if (pState->pTdbState->rocksdb == NULL) {
|
if (pState->pTdbState->rocksdb == NULL) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -723,49 +722,6 @@ char* streamDefaultIterVal_rocksdb(void* iter, int32_t* len) {
|
||||||
SStreamStateCur* pCur = iter;
|
SStreamStateCur* pCur = iter;
|
||||||
return (char*)rocksdb_iter_value(pCur->iter, (size_t*)len);
|
return (char*)rocksdb_iter_value(pCur->iter, (size_t*)len);
|
||||||
}
|
}
|
||||||
// typedef struct {
|
|
||||||
// char* start;
|
|
||||||
// char* end;
|
|
||||||
// void* result;
|
|
||||||
// } StreamFilterArg;
|
|
||||||
|
|
||||||
// typedef int (*streamfilterFunc)(StreamFilterArg* arg);
|
|
||||||
|
|
||||||
// int32_t streamDefaultIterFilter_rocksdb(SStreamState* pState, streamfilterFunc filterFunc, StreamFilterArg* arg) {
|
|
||||||
// int code = 0;
|
|
||||||
// char* err = NULL;
|
|
||||||
|
|
||||||
// rocksdb_snapshot_t* snapshot = NULL;
|
|
||||||
// rocksdb_readoptions_t* readopts = NULL;
|
|
||||||
// rocksdb_iterator_t* pIter = streamStateIterCreate(pState, "default", &snapshot, &readopts);
|
|
||||||
// if (pIter == NULL) {
|
|
||||||
// return -1;
|
|
||||||
// }
|
|
||||||
// char* start = arg->start;
|
|
||||||
// char* end = arg->end;
|
|
||||||
// SArray* result = arg->result;
|
|
||||||
|
|
||||||
// rocksdb_iter_seek(pIter, start, strlen(start));
|
|
||||||
// while (rocksdb_iter_valid(pIter)) {
|
|
||||||
// const char* key = rocksdb_iter_key(pIter, NULL);
|
|
||||||
// if (end != NULL && strcmp(key, end) > 0) {
|
|
||||||
// break;
|
|
||||||
// }
|
|
||||||
// if (strncmp(key, start, strlen(start)) == 0 && strlen(key) >= strlen(start) + 1) {
|
|
||||||
// int64_t checkPoint = 0;
|
|
||||||
// // if (sscanf(key + strlen(key), ":%" PRId64 "", &checkPoint) == 1) {
|
|
||||||
// // taosArrayPush(result, &checkPoint);
|
|
||||||
// // }
|
|
||||||
// } else {
|
|
||||||
// break;
|
|
||||||
// }
|
|
||||||
// rocksdb_iter_next(pIter);
|
|
||||||
// }
|
|
||||||
// rocksdb_release_snapshot(pState->pTdbState->rocksdb, snapshot);
|
|
||||||
// rocksdb_readoptions_destroy(readopts);
|
|
||||||
// rocksdb_iter_destroy(pIter);
|
|
||||||
// return code;
|
|
||||||
// }
|
|
||||||
int32_t streamDefaultIterGet_rocksdb(SStreamState* pState, const void* start, const void* end, SArray* result) {
|
int32_t streamDefaultIterGet_rocksdb(SStreamState* pState, const void* start, const void* end, SArray* result) {
|
||||||
int code = 0;
|
int code = 0;
|
||||||
char* err = NULL;
|
char* err = NULL;
|
||||||
|
@ -860,43 +816,6 @@ int32_t streamStateClear_rocksdb(SStreamState* pState) {
|
||||||
taosMemoryFree(err);
|
taosMemoryFree(err);
|
||||||
}
|
}
|
||||||
|
|
||||||
// del one by one
|
|
||||||
|
|
||||||
// char buf[128] = {0};
|
|
||||||
// char* s = "null";
|
|
||||||
// SWinKey key = {.ts = 0, .groupId = 0};
|
|
||||||
// SStateKey skey = {.key = key, .opNum = pState->number};
|
|
||||||
// int sLen = stateKeyEncode(&skey, buf);
|
|
||||||
|
|
||||||
// streamStatePut_rocksdb(pState, &key, s, strlen(s));
|
|
||||||
|
|
||||||
// rocksdb_readoptions_t* opt = NULL;
|
|
||||||
// rocksdb_iterator_t* iter = streamStateIterCreate(pState, "state", NULL, &opt);
|
|
||||||
// rocksdb_iter_seek(iter, buf, sLen);
|
|
||||||
|
|
||||||
// char* err = NULL;
|
|
||||||
// while (rocksdb_iter_valid(iter)) {
|
|
||||||
// int32_t kLen = 0;
|
|
||||||
// char* key = (char*)rocksdb_iter_key(iter, (size_t*)&kLen);
|
|
||||||
|
|
||||||
// SStateKey ckey = {0};
|
|
||||||
// stateKeyDecode((void*)&ckey, key);
|
|
||||||
// if (ckey.opNum != pState->number) {
|
|
||||||
// break;
|
|
||||||
// }
|
|
||||||
// if (stateKeyCmpr(&skey, sizeof(skey), &ckey, sizeof(ckey)) > 0) {
|
|
||||||
// break;
|
|
||||||
// }
|
|
||||||
|
|
||||||
// rocksdb_delete_cf(pState->pTdbState->rocksdb, pState->pTdbState->writeOpts, pState->pTdbState->pHandle[0], key,
|
|
||||||
// kLen, &err);
|
|
||||||
// if (err != NULL) {
|
|
||||||
// taosMemoryFree(err);
|
|
||||||
// }
|
|
||||||
// if (rocksdb_iter_valid(iter)) rocksdb_iter_next(iter);
|
|
||||||
// }
|
|
||||||
// rocksdb_iter_destroy(iter);
|
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue