add backend

This commit is contained in:
yihaoDeng 2023-03-28 02:25:11 +00:00
parent 47bd13a7b1
commit e1046015ae
2 changed files with 32 additions and 16 deletions

View File

@ -2709,7 +2709,7 @@ int32_t getOutputBuf(SStreamState* pState, SWinKey* pKey, SResultRow** pResult,
} }
int32_t streamStateAddIfNotExist2(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) { int32_t streamStateAddIfNotExist2(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) {
qWarn("streamStateAddIfNotExist"); // qWarn("streamStateAddIfNotExist");
char* tVal = NULL; char* tVal = NULL;
int32_t size = 0; int32_t size = 0;
int32_t code = streamStateGet(pState, key, (void**)&tVal, &size); int32_t code = streamStateGet(pState, key, (void**)&tVal, &size);
@ -2803,8 +2803,8 @@ int32_t buildDataBlockFromGroupRes(SOperatorInfo* pOperator, SStreamState* pStat
pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowEntryOffset); pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowEntryOffset);
SResultRowEntryInfo* pEnryInfo = pCtx[j].resultInfo; SResultRowEntryInfo* pEnryInfo = pCtx[j].resultInfo;
qWarn("initd:%d, complete:%d, null:%d, res:%d", pEnryInfo->initialized, pEnryInfo->complete, pEnryInfo->isNullRes, qDebug("initd:%d, complete:%d, null:%d, res:%d", pEnryInfo->initialized, pEnryInfo->complete,
pEnryInfo->numOfRes); pEnryInfo->isNullRes, pEnryInfo->numOfRes);
if (pCtx[j].fpSet.finalize) { if (pCtx[j].fpSet.finalize) {
int32_t code1 = pCtx[j].fpSet.finalize(&pCtx[j], pBlock); int32_t code1 = pCtx[j].fpSet.finalize(&pCtx[j], pBlock);
if (TAOS_FAILED(code1)) { if (TAOS_FAILED(code1)) {

View File

@ -451,6 +451,7 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa
char* val = rocksdb_get_cf(db, opts, pHandle, (const char*)buf, sizeof(*key), (size_t*)&len, &err); \ char* val = rocksdb_get_cf(db, opts, pHandle, (const char*)buf, sizeof(*key), (size_t*)&len, &err); \
if (val == NULL) { \ if (val == NULL) { \
qWarn("streamState str: %s failed to read from %s, err: not exist", toString, funcname); \ qWarn("streamState str: %s failed to read from %s, err: not exist", toString, funcname); \
if (err != NULL) taosMemoryFree(err); \
code = -1; \ code = -1; \
} else { \ } else { \
if (pVal != NULL) *pVal = val; \ if (pVal != NULL) *pVal = val; \
@ -551,20 +552,35 @@ int32_t streamStateFillDel_rocksdb(SStreamState* pState, const SWinKey* key) {
int32_t streamStateClear_rocksdb(SStreamState* pState) { int32_t streamStateClear_rocksdb(SStreamState* pState) {
qDebug("streamStateClear_rocksdb"); qDebug("streamStateClear_rocksdb");
SWinKey key = {.ts = 0, .groupId = 0};
SStateKey sKey = {.key = {.ts = 0, .groupId = 0}, .opNum = pState->number};
SStateKey eKey = {.key = {.ts = UINT64_MAX, .groupId = INT64_MAX}, .opNum = pState->number};
char sKeyStr[128] = {0};
char eKeyStr[128] = {0};
int sLen = stateKeyEncode(&sKey, sKeyStr);
int eLen = stateKeyEncode(&sKey, eKeyStr);
char* err = NULL;
rocksdb_delete_range_cf(pState->pTdbState->rocksdb, pState->pTdbState->writeOpts, pState->pTdbState->pHandle[0],
sKeyStr, sLen, eKeyStr, eLen, &err);
if (err != NULL) {
qWarn("failed to delete range cf(default)");
}
// batch clear later // batch clear later
streamStatePut_rocksdb(pState, &key, NULL, 0); // streamStatePut_rocksdb(pState, &key, NULL, 0);
while (1) { // while (1) {
SStreamStateCur* pCur = streamStateSeekKeyNext_rocksdb(pState, &key); // SStreamStateCur* pCur = streamStateSeekKeyNext_rocksdb(pState, &key);
SWinKey delKey = {0}; // SWinKey delKey = {0};
int32_t code = streamStateGetKVByCur_rocksdb(pCur, &delKey, NULL, 0); // int32_t code = streamStateGetKVByCur_rocksdb(pCur, &delKey, NULL, 0);
streamStateFreeCur(pCur); // streamStateFreeCur(pCur);
if (code == 0) { // if (code == 0) {
streamStateDel_rocksdb(pState, &delKey); // streamStateDel_rocksdb(pState, &delKey);
} else { // } else {
break; // break;
} // }
} // }
return 0; return 0;
} }