Merge pull request #28273 from taosdata/fix/3.0/TD-32184
fix:[TD-32184] Support backwards compatibility for function's result info.
This commit is contained in:
commit
4c6dfe3308
|
@ -29,6 +29,7 @@ struct SqlFunctionCtx;
|
|||
struct SResultRowEntryInfo;
|
||||
|
||||
struct SFunctionNode;
|
||||
struct SExprSupp;
|
||||
typedef struct SScalarParam SScalarParam;
|
||||
typedef struct SStreamState SStreamState;
|
||||
|
||||
|
@ -43,6 +44,7 @@ typedef int32_t (*FExecProcess)(struct SqlFunctionCtx *pCtx);
|
|||
typedef int32_t (*FExecFinalize)(struct SqlFunctionCtx *pCtx, SSDataBlock *pBlock);
|
||||
typedef int32_t (*FScalarExecProcess)(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
|
||||
typedef int32_t (*FExecCombine)(struct SqlFunctionCtx *pDestCtx, struct SqlFunctionCtx *pSourceCtx);
|
||||
typedef int32_t (*FExecDecode)(struct SqlFunctionCtx *pCtx, const char *buf, struct SResultRowEntryInfo *pResultCellInfo, int32_t version);
|
||||
typedef int32_t (*processFuncByRow)(SArray* pCtx); // array of SqlFunctionCtx
|
||||
|
||||
typedef struct SScalarFuncExecFuncs {
|
||||
|
@ -57,6 +59,7 @@ typedef struct SFuncExecFuncs {
|
|||
FExecFinalize finalize;
|
||||
FExecCombine combine;
|
||||
FExecCleanUp cleanup;
|
||||
FExecDecode decode;
|
||||
processFuncByRow processFuncByRow;
|
||||
} SFuncExecFuncs;
|
||||
|
||||
|
@ -65,6 +68,8 @@ typedef struct SFuncExecFuncs {
|
|||
#define TOP_BOTTOM_QUERY_LIMIT 100
|
||||
#define FUNCTIONS_NAME_MAX_LENGTH 32
|
||||
|
||||
#define FUNCTION_RESULT_INFO_VERSION 1
|
||||
|
||||
typedef struct SResultRowEntryInfo {
|
||||
bool initialized : 1; // output buffer has been initialized
|
||||
bool complete : 1; // query has completed
|
||||
|
@ -165,6 +170,11 @@ typedef struct STdbState {
|
|||
void *txn;
|
||||
} STdbState;
|
||||
|
||||
typedef struct SResultRowStore {
|
||||
int32_t (*resultRowPut)(struct SExprSupp *pSup, const char* inBuf, size_t inBufSize, char **outBuf, size_t *outBufSize);
|
||||
int32_t (*resultRowGet)(struct SExprSupp *pSup, const char* inBuf, size_t inBufSize, char **outBuf, size_t *outBufSize);
|
||||
} SResultRowStore;
|
||||
|
||||
struct SStreamState {
|
||||
STdbState *pTdbState;
|
||||
struct SStreamFileState *pFileState;
|
||||
|
@ -175,6 +185,8 @@ struct SStreamState {
|
|||
int64_t streamBackendRid;
|
||||
int8_t dump;
|
||||
int32_t tsIndex;
|
||||
SResultRowStore pResultRowStore;
|
||||
struct SExprSupp *pExprSupp;
|
||||
};
|
||||
|
||||
typedef struct SFunctionStateStore {
|
||||
|
|
|
@ -48,6 +48,7 @@ typedef struct SGroupResInfo {
|
|||
} SGroupResInfo;
|
||||
|
||||
typedef struct SResultRow {
|
||||
int32_t version;
|
||||
int32_t pageId; // pageId & rowId is the position of current result in disk-based output buffer
|
||||
int32_t offset : 29; // row index in buffer page
|
||||
bool startInterp; // the time window start timestamp has done the interpolation already.
|
||||
|
@ -152,6 +153,9 @@ static FORCE_INLINE SResultRow* getResultRowByPos(SDiskbasedBuf* pBuf, SResultRo
|
|||
return pRow;
|
||||
}
|
||||
|
||||
int32_t getResultRowFromBuf(struct SExprSupp *pSup, const char* inBuf, size_t inBufSize, char **outBuf, size_t *outBufSize);
|
||||
int32_t putResultRowToBuf(struct SExprSupp *pSup, const char* inBuf, size_t inBufSize, char **outBuf, size_t *outBufSize);
|
||||
|
||||
int32_t initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SSHashObj* pHashmap, int32_t order);
|
||||
void cleanupGroupResInfo(SGroupResInfo* pGroupResInfo);
|
||||
|
||||
|
|
|
@ -88,11 +88,116 @@ size_t getResultRowSize(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
|
|||
rowSize += pCtx[i].resDataInfo.interBufSize;
|
||||
}
|
||||
|
||||
rowSize += (numOfOutput * sizeof(bool));
|
||||
// expand rowSize to mark if col is null for top/bottom result(saveTupleData)
|
||||
return rowSize;
|
||||
}
|
||||
|
||||
// Convert buf read from rocksdb to result row
|
||||
int32_t getResultRowFromBuf(SExprSupp *pSup, const char* inBuf, size_t inBufSize, char **outBuf, size_t *outBufSize) {
|
||||
if (inBuf == NULL || pSup == NULL) {
|
||||
qError("invalid input parameters, inBuf:%p, pSup:%p", inBuf, pSup);
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
}
|
||||
SqlFunctionCtx *pCtx = pSup->pCtx;
|
||||
int32_t *offset = pSup->rowEntryInfoOffset;
|
||||
SResultRow *pResultRow = NULL;
|
||||
size_t processedSize = 0;
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
||||
// calculate the size of output buffer
|
||||
*outBufSize = getResultRowSize(pCtx, pSup->numOfExprs);
|
||||
*outBuf = taosMemoryMalloc(*outBufSize);
|
||||
if (*outBuf == NULL) {
|
||||
qError("failed to allocate memory for output buffer, size:%zu", *outBufSize);
|
||||
return terrno;
|
||||
}
|
||||
pResultRow = (SResultRow*)*outBuf;
|
||||
(void)memcpy(pResultRow, inBuf, sizeof(SResultRow));
|
||||
inBuf += sizeof(SResultRow);
|
||||
processedSize += sizeof(SResultRow);
|
||||
|
||||
for (int32_t i = 0; i < pSup->numOfExprs; ++i) {
|
||||
int32_t len = *(int32_t*)inBuf;
|
||||
inBuf += sizeof(int32_t);
|
||||
processedSize += sizeof(int32_t);
|
||||
if (pResultRow->version != FUNCTION_RESULT_INFO_VERSION && pCtx->fpSet.decode) {
|
||||
code = pCtx->fpSet.decode(&pCtx[i], inBuf, getResultEntryInfo(pResultRow, i, offset), pResultRow->version);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("failed to decode result row, code:%d", code);
|
||||
return code;
|
||||
}
|
||||
} else {
|
||||
(void)memcpy(getResultEntryInfo(pResultRow, i, offset), inBuf, len);
|
||||
}
|
||||
inBuf += len;
|
||||
processedSize += len;
|
||||
}
|
||||
|
||||
if (processedSize < inBufSize) {
|
||||
// stream stores extra data after result row
|
||||
size_t leftLen = inBufSize - processedSize;
|
||||
TAOS_MEMORY_REALLOC(*outBuf, *outBufSize + leftLen);
|
||||
if (*outBuf == NULL) {
|
||||
qError("failed to reallocate memory for output buffer, size:%zu", *outBufSize + leftLen);
|
||||
return terrno;
|
||||
}
|
||||
(void)memcpy(*outBuf + *outBufSize, inBuf, leftLen);
|
||||
inBuf += leftLen;
|
||||
processedSize += leftLen;
|
||||
*outBufSize += leftLen;
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
// Convert result row to buf for rocksdb
|
||||
int32_t putResultRowToBuf(SExprSupp *pSup, const char* inBuf, size_t inBufSize, char **outBuf, size_t *outBufSize) {
|
||||
if (pSup == NULL || inBuf == NULL || outBuf == NULL || outBufSize == NULL) {
|
||||
qError("invalid input parameters, inBuf:%p, pSup:%p, outBufSize:%p, outBuf:%p", inBuf, pSup, outBufSize, outBuf);
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
}
|
||||
|
||||
SqlFunctionCtx *pCtx = pSup->pCtx;
|
||||
int32_t *offset = pSup->rowEntryInfoOffset;
|
||||
SResultRow *pResultRow = (SResultRow*)inBuf;
|
||||
size_t rowSize = getResultRowSize(pCtx, pSup->numOfExprs);
|
||||
|
||||
if (rowSize > inBufSize) {
|
||||
qError("invalid input buffer size, rowSize:%zu, inBufSize:%zu", rowSize, inBufSize);
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
}
|
||||
|
||||
// calculate the size of output buffer
|
||||
*outBufSize = rowSize + sizeof(int32_t) * pSup->numOfExprs;
|
||||
if (rowSize < inBufSize) {
|
||||
*outBufSize += inBufSize - rowSize;
|
||||
}
|
||||
|
||||
*outBuf = taosMemoryMalloc(*outBufSize);
|
||||
if (*outBuf == NULL) {
|
||||
qError("failed to allocate memory for output buffer, size:%zu", *outBufSize);
|
||||
return terrno;
|
||||
}
|
||||
|
||||
char *pBuf = *outBuf;
|
||||
pResultRow->version = FUNCTION_RESULT_INFO_VERSION;
|
||||
(void)memcpy(pBuf, pResultRow, sizeof(SResultRow));
|
||||
pBuf += sizeof(SResultRow);
|
||||
for (int32_t i = 0; i < pSup->numOfExprs; ++i) {
|
||||
size_t len = sizeof(SResultRowEntryInfo) + pCtx[i].resDataInfo.interBufSize;
|
||||
*(int32_t *) pBuf = (int32_t)len;
|
||||
pBuf += sizeof(int32_t);
|
||||
(void)memcpy(pBuf, getResultEntryInfo(pResultRow, i, offset), len);
|
||||
pBuf += len;
|
||||
}
|
||||
|
||||
if (rowSize < inBufSize) {
|
||||
// stream stores extra data after result row
|
||||
size_t leftLen = inBufSize - rowSize;
|
||||
(void)memcpy(pBuf, inBuf + rowSize, leftLen);
|
||||
pBuf += leftLen;
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static void freeEx(void* p) { taosMemoryFree(*(void**)p); }
|
||||
|
||||
void cleanupGroupResInfo(SGroupResInfo* pGroupResInfo) {
|
||||
|
|
|
@ -2006,6 +2006,12 @@ int32_t createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiN
|
|||
pInfo->stateStore = pTaskInfo->storageAPI.stateStore;
|
||||
int32_t funResSize = getMaxFunResSize(&pOperator->exprSupp, numOfCols);
|
||||
pInfo->pState->pFileState = NULL;
|
||||
|
||||
// used for backward compatibility of function's result info
|
||||
pInfo->pState->pResultRowStore.resultRowGet = getResultRowFromBuf;
|
||||
pInfo->pState->pResultRowStore.resultRowPut = putResultRowToBuf;
|
||||
pInfo->pState->pExprSupp = &pOperator->exprSupp;
|
||||
|
||||
code =
|
||||
pAPI->stateStore.streamFileStateInit(tsStreamBufferSize, sizeof(SWinKey), pInfo->aggSup.resultRowSize, funResSize,
|
||||
compareTs, pInfo->pState, pInfo->twAggSup.deleteMark, GET_TASKID(pTaskInfo),
|
||||
|
@ -2223,6 +2229,12 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SExprSupp* pExpSup, in
|
|||
pSup->stateStore.streamStateSetNumber(pSup->pState, -1, tsIndex);
|
||||
int32_t funResSize = getMaxFunResSize(pExpSup, numOfOutput);
|
||||
pSup->pState->pFileState = NULL;
|
||||
|
||||
// used for backward compatibility of function's result info
|
||||
pSup->pState->pResultRowStore.resultRowGet = getResultRowFromBuf;
|
||||
pSup->pState->pResultRowStore.resultRowPut = putResultRowToBuf;
|
||||
pSup->pState->pExprSupp = pExpSup;
|
||||
|
||||
code = pSup->stateStore.streamFileStateInit(tsStreamBufferSize, sizeof(SSessionKey), pSup->resultRowSize, funResSize,
|
||||
sesionTs, pSup->pState, pTwAggSup->deleteMark, taskIdStr,
|
||||
pHandle->checkpointId, STREAM_STATE_BUFF_SORT, &pSup->pState->pFileState);
|
||||
|
@ -5385,6 +5397,12 @@ int32_t createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
|
|||
|
||||
pInfo->stateStore = pTaskInfo->storageAPI.stateStore;
|
||||
pInfo->pState->pFileState = NULL;
|
||||
|
||||
// used for backward compatibility of function's result info
|
||||
pInfo->pState->pResultRowStore.resultRowGet = getResultRowFromBuf;
|
||||
pInfo->pState->pResultRowStore.resultRowPut = putResultRowToBuf;
|
||||
pInfo->pState->pExprSupp = &pOperator->exprSupp;
|
||||
|
||||
code = pTaskInfo->storageAPI.stateStore.streamFileStateInit(
|
||||
tsStreamBufferSize, sizeof(SWinKey), pInfo->aggSup.resultRowSize, funResSize, compareTs, pInfo->pState,
|
||||
pInfo->twAggSup.deleteMark, GET_TASKID(pTaskInfo), pHandle->checkpointId, STREAM_STATE_BUFF_HASH,
|
||||
|
|
|
@ -80,7 +80,7 @@ typedef struct {
|
|||
TdThreadRwlock chkpDirLock;
|
||||
int64_t dataWritten;
|
||||
|
||||
void* pMeta;
|
||||
void* pMeta;
|
||||
int8_t removeAllFiles;
|
||||
|
||||
} STaskDbWrapper;
|
||||
|
@ -153,7 +153,7 @@ void taskDbUpdateChkpId(void* pTaskDb, int64_t chkpId);
|
|||
void* taskDbAddRef(void* pTaskDb);
|
||||
void taskDbRemoveRef(void* pTaskDb);
|
||||
|
||||
void taskDbSetClearFileFlag(void* pTaskDb);
|
||||
void taskDbSetClearFileFlag(void* pTaskDb);
|
||||
|
||||
int streamStateOpenBackend(void* backend, SStreamState* pState);
|
||||
void streamStateCloseBackend(SStreamState* pState, bool remove);
|
||||
|
@ -169,7 +169,7 @@ int32_t streamStateGetFirst_rocksdb(SStreamState* pState, SWinKey* key);
|
|||
int32_t streamStateGetGroupKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen);
|
||||
int32_t streamStateAddIfNotExist_rocksdb(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen);
|
||||
void streamStateCurPrev_rocksdb(SStreamStateCur* pCur);
|
||||
int32_t streamStateGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen);
|
||||
int32_t streamStateGetKVByCur_rocksdb(SStreamState* pState, SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen);
|
||||
SStreamStateCur* streamStateGetAndCheckCur_rocksdb(SStreamState* pState, SWinKey* key);
|
||||
SStreamStateCur* streamStateSeekKeyNext_rocksdb(SStreamState* pState, const SWinKey* key);
|
||||
SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState);
|
||||
|
@ -191,7 +191,8 @@ SStreamStateCur* streamStateSessionSeekKeyPrev_rocksdb(SStreamState* pState, con
|
|||
SStreamStateCur* streamStateSessionSeekToLast_rocksdb(SStreamState* pState, int64_t groupId);
|
||||
int32_t streamStateSessionCurPrev_rocksdb(SStreamStateCur* pCur);
|
||||
|
||||
int32_t streamStateSessionGetKVByCur_rocksdb(SStreamStateCur* pCur, SSessionKey* pKey, void** pVal, int32_t* pVLen);
|
||||
int32_t streamStateSessionGetKVByCur_rocksdb(SStreamState* pState, SStreamStateCur* pCur, SSessionKey* pKey,
|
||||
void** pVal, int32_t* pVLen);
|
||||
int32_t streamStateSessionGetKeyByRange_rocksdb(SStreamState* pState, const SSessionKey* key, SSessionKey* curKey);
|
||||
int32_t streamStateSessionAddIfNotExist_rocksdb(SStreamState* pState, SSessionKey* key, TSKEY gap, void** pVal,
|
||||
int32_t* pVLen);
|
||||
|
@ -255,11 +256,11 @@ int32_t taskDbDestroySnap(void* arg, SArray* pSnapInfo);
|
|||
|
||||
int32_t taskDbDoCheckpoint(void* arg, int64_t chkpId, int64_t processId);
|
||||
|
||||
int32_t bkdMgtCreate(char* path, SBkdMgt **bm);
|
||||
int32_t bkdMgtAddChkp(SBkdMgt* bm, char* task, char* path);
|
||||
int32_t bkdMgtGetDelta(SBkdMgt* bm, char* taskId, int64_t chkpId, SArray* list, char* name);
|
||||
int32_t bkdMgtDumpTo(SBkdMgt* bm, char* taskId, char* dname);
|
||||
void bkdMgtDestroy(SBkdMgt* bm);
|
||||
int32_t bkdMgtCreate(char* path, SBkdMgt** bm);
|
||||
int32_t bkdMgtAddChkp(SBkdMgt* bm, char* task, char* path);
|
||||
int32_t bkdMgtGetDelta(SBkdMgt* bm, char* taskId, int64_t chkpId, SArray* list, char* name);
|
||||
int32_t bkdMgtDumpTo(SBkdMgt* bm, char* taskId, char* dname);
|
||||
void bkdMgtDestroy(SBkdMgt* bm);
|
||||
|
||||
int32_t taskDbGenChkpUploadData(void* arg, void* bkdMgt, int64_t chkpId, int8_t type, char** path, SArray* list,
|
||||
const char* id);
|
||||
|
|
|
@ -3289,13 +3289,40 @@ int32_t streamStatePut_rocksdb(SStreamState* pState, const SWinKey* key, const v
|
|||
int code = 0;
|
||||
|
||||
SStateKey sKey = {.key = *key, .opNum = pState->number};
|
||||
STREAM_STATE_PUT_ROCKSDB(pState, "state", &sKey, (void*)value, vLen);
|
||||
char* dst = NULL;
|
||||
size_t size = 0;
|
||||
if (pState->pResultRowStore.resultRowPut == NULL || pState->pExprSupp == NULL) {
|
||||
STREAM_STATE_PUT_ROCKSDB(pState, "state", &sKey, (void*)value, (int32_t)vLen);
|
||||
} else {
|
||||
code = (pState->pResultRowStore.resultRowPut)(pState->pExprSupp, value, vLen, &dst, &size);
|
||||
if (code != 0) {
|
||||
return code;
|
||||
}
|
||||
STREAM_STATE_PUT_ROCKSDB(pState, "state", &sKey, (void*)dst, (int32_t)size);
|
||||
taosMemoryFree(dst);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
int32_t streamStateGet_rocksdb(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) {
|
||||
int code = 0;
|
||||
SStateKey sKey = {.key = *key, .opNum = pState->number};
|
||||
STREAM_STATE_GET_ROCKSDB(pState, "state", &sKey, pVal, pVLen);
|
||||
|
||||
char* tVal = NULL;
|
||||
size_t tValLen = 0;
|
||||
STREAM_STATE_GET_ROCKSDB(pState, "state", &sKey, &tVal, &tValLen);
|
||||
if (code != 0) {
|
||||
taosMemoryFree(tVal);
|
||||
return code;
|
||||
}
|
||||
if (pState->pResultRowStore.resultRowGet == NULL || pState->pExprSupp == NULL) {
|
||||
*pVal = tVal;
|
||||
*pVLen = tValLen;
|
||||
return code;
|
||||
}
|
||||
size_t pValLen = 0;
|
||||
code = (pState->pResultRowStore.resultRowGet)(pState->pExprSupp, tVal, tValLen, (char**)pVal, &pValLen);
|
||||
*pVLen = (int32_t)pValLen;
|
||||
taosMemoryFree(tVal);
|
||||
return code;
|
||||
}
|
||||
int32_t streamStateDel_rocksdb(SStreamState* pState, const SWinKey* key) {
|
||||
|
@ -3351,7 +3378,7 @@ int32_t streamStateGetFirst_rocksdb(SStreamState* pState, SWinKey* key) {
|
|||
}
|
||||
|
||||
SStreamStateCur* pCur = streamStateSeekKeyNext_rocksdb(pState, &tmp);
|
||||
code = streamStateGetKVByCur_rocksdb(pCur, key, NULL, 0);
|
||||
code = streamStateGetKVByCur_rocksdb(pState, pCur, key, NULL, 0);
|
||||
if (code != 0) {
|
||||
return code;
|
||||
}
|
||||
|
@ -3395,7 +3422,8 @@ void streamStateCurPrev_rocksdb(SStreamStateCur* pCur) {
|
|||
rocksdb_iter_prev(pCur->iter);
|
||||
}
|
||||
}
|
||||
int32_t streamStateGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) {
|
||||
int32_t streamStateGetKVByCur_rocksdb(SStreamState* pState, SStreamStateCur* pCur, SWinKey* pKey, const void** pVal,
|
||||
int32_t* pVLen) {
|
||||
if (!pCur) return -1;
|
||||
SStateKey tkey;
|
||||
SStateKey* pKtmp = &tkey;
|
||||
|
@ -3411,7 +3439,35 @@ int32_t streamStateGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, cons
|
|||
if (pVLen != NULL) {
|
||||
size_t vlen = 0;
|
||||
const char* valStr = rocksdb_iter_value(pCur->iter, &vlen);
|
||||
*pVLen = valueDecode((void*)valStr, vlen, NULL, (char**)pVal);
|
||||
char* val = NULL;
|
||||
int32_t len = valueDecode((void*)valStr, vlen, NULL, (char**)val);
|
||||
if (len <= 0) {
|
||||
taosMemoryFree(val);
|
||||
return -1;
|
||||
}
|
||||
|
||||
char* tVal = val;
|
||||
size_t tVlen = len;
|
||||
|
||||
if (pVal != NULL) {
|
||||
if (pState != NULL && pState->pResultRowStore.resultRowGet != NULL && pState->pExprSupp != NULL) {
|
||||
int code =
|
||||
(pState->pResultRowStore.resultRowGet)(pState->pExprSupp, val, len, (char**)&tVal, (size_t*)&tVlen);
|
||||
if (code != 0) {
|
||||
taosMemoryFree(val);
|
||||
return code;
|
||||
}
|
||||
taosMemoryFree(val);
|
||||
*pVal = (char*)tVal;
|
||||
} else {
|
||||
stInfo("streamStateGetKVByCur_rocksdb, pState = %p, pResultRowStore = %p, pExprSupp = %p", pState,
|
||||
pState->pResultRowStore.resultRowGet, pState->pExprSupp);
|
||||
*pVal = (char*)tVal;
|
||||
}
|
||||
} else {
|
||||
taosMemoryFree(val);
|
||||
}
|
||||
*pVLen = (int32_t)tVlen;
|
||||
}
|
||||
|
||||
*pKey = pKtmp->key;
|
||||
|
@ -3541,14 +3597,44 @@ SStreamStateCur* streamStateGetCur_rocksdb(SStreamState* pState, const SWinKey*
|
|||
|
||||
// func cf
|
||||
int32_t streamStateFuncPut_rocksdb(SStreamState* pState, const STupleKey* key, const void* value, int32_t vLen) {
|
||||
int code = 0;
|
||||
STREAM_STATE_PUT_ROCKSDB(pState, "func", key, (void*)value, vLen);
|
||||
int code = 0;
|
||||
char* dst = NULL;
|
||||
size_t size = 0;
|
||||
if (pState->pResultRowStore.resultRowPut == NULL || pState->pExprSupp == NULL) {
|
||||
STREAM_STATE_PUT_ROCKSDB(pState, "func", key, (void*)value, (int32_t)vLen);
|
||||
return code;
|
||||
}
|
||||
code = (pState->pResultRowStore.resultRowPut)(pState->pExprSupp, value, vLen, &dst, &size);
|
||||
if (code != 0) {
|
||||
return code;
|
||||
}
|
||||
STREAM_STATE_PUT_ROCKSDB(pState, "func", key, (void*)dst, (int32_t)size);
|
||||
taosMemoryFree(dst);
|
||||
|
||||
return code;
|
||||
}
|
||||
int32_t streamStateFuncGet_rocksdb(SStreamState* pState, const STupleKey* key, void** pVal, int32_t* pVLen) {
|
||||
int code = 0;
|
||||
STREAM_STATE_GET_ROCKSDB(pState, "func", key, pVal, pVLen);
|
||||
return 0;
|
||||
int code = 0;
|
||||
char* tVal = NULL;
|
||||
size_t tValLen = 0;
|
||||
STREAM_STATE_GET_ROCKSDB(pState, "func", key, tVal, &tValLen);
|
||||
if (code != 0) {
|
||||
taosMemoryFree(tVal);
|
||||
return code;
|
||||
}
|
||||
|
||||
if (pState->pResultRowStore.resultRowGet == NULL || pState->pExprSupp == NULL) {
|
||||
*pVal = tVal;
|
||||
*pVLen = tValLen;
|
||||
return code;
|
||||
}
|
||||
|
||||
size_t pValLen = 0;
|
||||
code = (pState->pResultRowStore.resultRowGet)(pState->pExprSupp, tVal, tValLen, (char**)pVal, &pValLen);
|
||||
*pVLen = (int32_t)pValLen;
|
||||
|
||||
taosMemoryFree(tVal);
|
||||
return code;
|
||||
}
|
||||
int32_t streamStateFuncDel_rocksdb(SStreamState* pState, const STupleKey* key) {
|
||||
int code = 0;
|
||||
|
@ -3563,7 +3649,20 @@ int32_t streamStateSessionPut_rocksdb(SStreamState* pState, const SSessionKey* k
|
|||
if (value == NULL || vLen == 0) {
|
||||
stError("streamStateSessionPut_rocksdb val: %p, len: %d", value, vLen);
|
||||
}
|
||||
STREAM_STATE_PUT_ROCKSDB(pState, "sess", &sKey, value, vLen);
|
||||
char* dst = NULL;
|
||||
size_t size = 0;
|
||||
if (pState->pResultRowStore.resultRowPut == NULL || pState->pExprSupp == NULL) {
|
||||
STREAM_STATE_PUT_ROCKSDB(pState, "sess", &sKey, (void*)value, (int32_t)vLen);
|
||||
return code;
|
||||
}
|
||||
|
||||
code = (pState->pResultRowStore.resultRowPut)(pState->pExprSupp, value, vLen, &dst, &size);
|
||||
if (code != 0) {
|
||||
return code;
|
||||
}
|
||||
STREAM_STATE_PUT_ROCKSDB(pState, "sess", &sKey, dst, (int32_t)size);
|
||||
taosMemoryFree(dst);
|
||||
|
||||
return code;
|
||||
}
|
||||
int32_t streamStateSessionGet_rocksdb(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen) {
|
||||
|
@ -3574,7 +3673,7 @@ int32_t streamStateSessionGet_rocksdb(SStreamState* pState, SSessionKey* key, vo
|
|||
void* tmp = NULL;
|
||||
int32_t vLen = 0;
|
||||
|
||||
code = streamStateSessionGetKVByCur_rocksdb(pCur, &resKey, &tmp, &vLen);
|
||||
code = streamStateSessionGetKVByCur_rocksdb(pState, pCur, &resKey, &tmp, &vLen);
|
||||
if (code == 0 && key->win.skey == resKey.win.skey) {
|
||||
*key = resKey;
|
||||
|
||||
|
@ -3813,7 +3912,8 @@ SStreamStateCur* streamStateSessionSeekKeyPrev_rocksdb(SStreamState* pState, con
|
|||
return pCur;
|
||||
}
|
||||
|
||||
int32_t streamStateSessionGetKVByCur_rocksdb(SStreamStateCur* pCur, SSessionKey* pKey, void** pVal, int32_t* pVLen) {
|
||||
int32_t streamStateSessionGetKVByCur_rocksdb(SStreamState* pState, SStreamStateCur* pCur, SSessionKey* pKey,
|
||||
void** pVal, int32_t* pVLen) {
|
||||
if (!pCur) {
|
||||
return -1;
|
||||
}
|
||||
|
@ -3847,13 +3947,27 @@ int32_t streamStateSessionGetKVByCur_rocksdb(SStreamStateCur* pCur, SSessionKey*
|
|||
return -1;
|
||||
}
|
||||
|
||||
char* tVal = val;
|
||||
size_t tVlen = len;
|
||||
|
||||
if (pVal != NULL) {
|
||||
*pVal = (char*)val;
|
||||
if (pState != NULL && pState->pResultRowStore.resultRowGet != NULL && pState->pExprSupp != NULL) {
|
||||
int code = (pState->pResultRowStore.resultRowGet)(pState->pExprSupp, val, len, (char**)&tVal, (size_t*)&tVlen);
|
||||
if (code != 0) {
|
||||
taosMemoryFree(val);
|
||||
return code;
|
||||
}
|
||||
taosMemoryFree(val);
|
||||
*pVal = (char*)tVal;
|
||||
} else {
|
||||
*pVal = (char*)tVal;
|
||||
}
|
||||
} else {
|
||||
taosMemoryFree(val);
|
||||
}
|
||||
|
||||
if (pVLen != NULL) *pVLen = len;
|
||||
if (pVLen != NULL) *pVLen = (int32_t)tVlen;
|
||||
|
||||
*pKey = pKTmp->key;
|
||||
return 0;
|
||||
}
|
||||
|
@ -3867,6 +3981,7 @@ int32_t streamStateFillPut_rocksdb(SStreamState* pState, const SWinKey* key, con
|
|||
|
||||
int32_t streamStateFillGet_rocksdb(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) {
|
||||
int code = 0;
|
||||
|
||||
STREAM_STATE_GET_ROCKSDB(pState, "fill", key, pVal, pVLen);
|
||||
return code;
|
||||
}
|
||||
|
@ -4041,7 +4156,7 @@ int32_t streamStateSessionGetKeyByRange_rocksdb(SStreamState* pState, const SSes
|
|||
c = stateSessionKeyCmpr(&sKey, sizeof(sKey), &iKey, sizeof(iKey));
|
||||
|
||||
SSessionKey resKey = *key;
|
||||
int32_t code = streamStateSessionGetKVByCur_rocksdb(pCur, &resKey, NULL, 0);
|
||||
int32_t code = streamStateSessionGetKVByCur_rocksdb(pState, pCur, &resKey, NULL, NULL);
|
||||
if (code == 0 && sessionRangeKeyCmpr(key, &resKey) == 0) {
|
||||
*curKey = resKey;
|
||||
streamStateFreeCur(pCur);
|
||||
|
@ -4050,7 +4165,7 @@ int32_t streamStateSessionGetKeyByRange_rocksdb(SStreamState* pState, const SSes
|
|||
|
||||
if (c > 0) {
|
||||
streamStateCurNext_rocksdb(pCur);
|
||||
code = streamStateSessionGetKVByCur_rocksdb(pCur, &resKey, NULL, 0);
|
||||
code = streamStateSessionGetKVByCur_rocksdb(pState, pCur, &resKey, NULL, NULL);
|
||||
if (code == 0 && sessionRangeKeyCmpr(key, &resKey) == 0) {
|
||||
*curKey = resKey;
|
||||
streamStateFreeCur(pCur);
|
||||
|
@ -4058,7 +4173,7 @@ int32_t streamStateSessionGetKeyByRange_rocksdb(SStreamState* pState, const SSes
|
|||
}
|
||||
} else if (c < 0) {
|
||||
streamStateCurPrev(pState, pCur);
|
||||
code = streamStateSessionGetKVByCur_rocksdb(pCur, &resKey, NULL, 0);
|
||||
code = streamStateSessionGetKVByCur_rocksdb(pState, pCur, &resKey, NULL, NULL);
|
||||
if (code == 0 && sessionRangeKeyCmpr(key, &resKey) == 0) {
|
||||
*curKey = resKey;
|
||||
streamStateFreeCur(pCur);
|
||||
|
@ -4088,7 +4203,7 @@ int32_t streamStateSessionAddIfNotExist_rocksdb(SStreamState* pState, SSessionKe
|
|||
}
|
||||
|
||||
SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentPrev_rocksdb(pState, key);
|
||||
int32_t code = streamStateSessionGetKVByCur_rocksdb(pCur, key, pVal, pVLen);
|
||||
int32_t code = streamStateSessionGetKVByCur_rocksdb(pState, pCur, key, pVal, pVLen);
|
||||
|
||||
if (code == 0) {
|
||||
if (sessionRangeKeyCmpr(&searchKey, key) == 0) {
|
||||
|
@ -4105,7 +4220,7 @@ int32_t streamStateSessionAddIfNotExist_rocksdb(SStreamState* pState, SSessionKe
|
|||
pCur = streamStateSessionSeekKeyNext_rocksdb(pState, key);
|
||||
}
|
||||
|
||||
code = streamStateSessionGetKVByCur_rocksdb(pCur, key, pVal, pVLen);
|
||||
code = streamStateSessionGetKVByCur_rocksdb(pState, pCur, key, pVal, pVLen);
|
||||
if (code == 0) {
|
||||
if (sessionRangeKeyCmpr(&searchKey, key) == 0) {
|
||||
memcpy(tmp, *pVal, *pVLen);
|
||||
|
@ -4132,7 +4247,7 @@ void streamStateSessionClear_rocksdb(SStreamState* pState) {
|
|||
SSessionKey delKey = {0};
|
||||
void* buf = NULL;
|
||||
int32_t size = 0;
|
||||
int32_t code = streamStateSessionGetKVByCur_rocksdb(pCur, &delKey, &buf, &size);
|
||||
int32_t code = streamStateSessionGetKVByCur_rocksdb(pState, pCur, &delKey, &buf, &size);
|
||||
if (code == 0 && size > 0) {
|
||||
memset(buf, 0, size);
|
||||
// refactor later
|
||||
|
@ -4160,7 +4275,7 @@ int32_t streamStateStateAddIfNotExist_rocksdb(SStreamState* pState, SSessionKey*
|
|||
}
|
||||
|
||||
SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentPrev_rocksdb(pState, key);
|
||||
int32_t code = streamStateSessionGetKVByCur_rocksdb(pCur, key, pVal, pVLen);
|
||||
int32_t code = streamStateSessionGetKVByCur_rocksdb(pState, pCur, key, pVal, pVLen);
|
||||
if (code == 0) {
|
||||
if (key->win.skey <= tmpKey.win.skey && tmpKey.win.ekey <= key->win.ekey) {
|
||||
memcpy(tmp, *pVal, valSize);
|
||||
|
@ -4180,7 +4295,7 @@ int32_t streamStateStateAddIfNotExist_rocksdb(SStreamState* pState, SSessionKey*
|
|||
pCur = streamStateSessionSeekKeyNext_rocksdb(pState, key);
|
||||
}
|
||||
taosMemoryFreeClear(*pVal);
|
||||
code = streamStateSessionGetKVByCur_rocksdb(pCur, key, pVal, pVLen);
|
||||
code = streamStateSessionGetKVByCur_rocksdb(pState, pCur, key, pVal, pVLen);
|
||||
if (code == 0) {
|
||||
void* stateKey = (char*)(*pVal) + (valSize - keyDataLen);
|
||||
if (fn(pKeyData, stateKey) == true) {
|
||||
|
@ -4204,14 +4319,34 @@ _end:
|
|||
#ifdef BUILD_NO_CALL
|
||||
// partag cf
|
||||
int32_t streamStatePutParTag_rocksdb(SStreamState* pState, int64_t groupId, const void* tag, int32_t tagLen) {
|
||||
int code = 0;
|
||||
STREAM_STATE_PUT_ROCKSDB(pState, "partag", &groupId, tag, tagLen);
|
||||
int code = 0;
|
||||
char* dst = NULL;
|
||||
size_t size = 0;
|
||||
if (pState->pResultRowStore.resultRowPut == NULL || pState->pExprSupp == NULL) {
|
||||
STREAM_STATE_PUT_ROCKSDB(pState, "partag", &groupId, tag, tagLen);
|
||||
return code;
|
||||
}
|
||||
code = (pState->pResultRowStore.resultRowPut)(pState->pExprSupp, value, vLen, &dst, &size);
|
||||
if (code != 0) {
|
||||
return code;
|
||||
}
|
||||
STREAM_STATE_PUT_ROCKSDB(pState, "partag", &groupId, dst, size);
|
||||
taosMemoryFree(dst);
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t streamStateGetParTag_rocksdb(SStreamState* pState, int64_t groupId, void** tagVal, int32_t* tagLen) {
|
||||
int code = 0;
|
||||
STREAM_STATE_GET_ROCKSDB(pState, "partag", &groupId, tagVal, tagLen);
|
||||
int code = 0;
|
||||
char* tVal;
|
||||
size_t tValLen = 0;
|
||||
STREAM_STATE_GET_ROCKSDB(pState, "partag", &groupId, &tVal, &tValLen);
|
||||
if (code != 0) {
|
||||
taosMemoryFree(tVal);
|
||||
return code;
|
||||
}
|
||||
code = (pState->pResultRowStore.resultRowGet)(pState->pExprSupp, tVal, tValLen, (char**)tagVal, (size_t*)tagLen);
|
||||
taosMemoryFree(tVal);
|
||||
|
||||
return code;
|
||||
}
|
||||
#endif
|
||||
|
@ -4348,6 +4483,7 @@ void streamStateClearBatch(void* pBatch) { rocksdb_writebatch_clear((rocksdb_
|
|||
void streamStateDestroyBatch(void* pBatch) { rocksdb_writebatch_destroy((rocksdb_writebatch_t*)pBatch); }
|
||||
int32_t streamStatePutBatch(SStreamState* pState, const char* cfKeyName, rocksdb_writebatch_t* pBatch, void* key,
|
||||
void* val, int32_t vlen, int64_t ttl) {
|
||||
int32_t code = 0;
|
||||
STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
|
||||
TAOS_UNUSED(atomic_add_fetch_64(&wrapper->dataWritten, 1));
|
||||
|
||||
|
@ -4377,10 +4513,23 @@ int32_t streamStatePutBatch(SStreamState* pState, const char* cfKeyName, rocksdb
|
|||
|
||||
int32_t streamStatePutBatchOptimize(SStreamState* pState, int32_t cfIdx, rocksdb_writebatch_t* pBatch, void* key,
|
||||
void* val, int32_t vlen, int64_t ttl, void* tmpBuf) {
|
||||
int32_t code = 0;
|
||||
char buf[128] = {0};
|
||||
|
||||
char* dst = NULL;
|
||||
size_t size = 0;
|
||||
if (pState->pResultRowStore.resultRowPut == NULL || pState->pExprSupp == NULL) {
|
||||
dst = val;
|
||||
size = vlen;
|
||||
} else {
|
||||
code = (pState->pResultRowStore.resultRowPut)(pState->pExprSupp, val, vlen, &dst, &size);
|
||||
if (code != 0) {
|
||||
return code;
|
||||
}
|
||||
}
|
||||
int32_t klen = ginitDict[cfIdx].enFunc((void*)key, buf);
|
||||
char* ttlV = tmpBuf;
|
||||
int32_t ttlVLen = ginitDict[cfIdx].enValueFunc(val, vlen, ttl, &ttlV);
|
||||
int32_t ttlVLen = ginitDict[cfIdx].enValueFunc(dst, size, ttl, &ttlV);
|
||||
|
||||
STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
|
||||
|
||||
|
@ -4389,6 +4538,8 @@ int32_t streamStatePutBatchOptimize(SStreamState* pState, int32_t cfIdx, rocksdb
|
|||
rocksdb_column_family_handle_t* pCf = wrapper->pCf[ginitDict[cfIdx].idx];
|
||||
rocksdb_writebatch_put_cf((rocksdb_writebatch_t*)pBatch, pCf, buf, (size_t)klen, ttlV, (size_t)ttlVLen);
|
||||
|
||||
taosMemoryFree(dst);
|
||||
|
||||
if (tmpBuf == NULL) {
|
||||
taosMemoryFree(ttlV);
|
||||
}
|
||||
|
|
|
@ -284,7 +284,7 @@ _end:
|
|||
|
||||
int32_t getSessionRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen,
|
||||
int32_t* pWinCode) {
|
||||
SWinKey* pTmpkey = pKey;
|
||||
SWinKey* pTmpkey = pKey;
|
||||
SSessionKey pWinKey = {.groupId = pTmpkey->groupId, .win.skey = pTmpkey->ts, .win.ekey = pTmpkey->ts};
|
||||
return getSessionWinResultBuff(pFileState, &pWinKey, 0, pVal, pVLen, pWinCode);
|
||||
}
|
||||
|
@ -343,7 +343,8 @@ _end:
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t getSessionFlushedBuff(SStreamFileState* pFileState, SSessionKey* pKey, void** pVal, int32_t* pVLen, int32_t* pWinCode) {
|
||||
int32_t getSessionFlushedBuff(SStreamFileState* pFileState, SSessionKey* pKey, void** pVal, int32_t* pVLen,
|
||||
int32_t* pWinCode) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t lino = 0;
|
||||
SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
|
||||
|
@ -353,7 +354,7 @@ int32_t getSessionFlushedBuff(SStreamFileState* pFileState, SSessionKey* pKey, v
|
|||
}
|
||||
pNewPos->needFree = true;
|
||||
pNewPos->beFlushed = true;
|
||||
void* pBuff = NULL;
|
||||
void* pBuff = NULL;
|
||||
(*pWinCode) = streamStateSessionGet_rocksdb(getStateFileStore(pFileState), pKey, &pBuff, pVLen);
|
||||
if ((*pWinCode) != TSDB_CODE_SUCCESS) {
|
||||
goto _end;
|
||||
|
@ -575,7 +576,7 @@ static void transformCursor(SStreamFileState* pFileState, SStreamStateCur* pCur)
|
|||
static void checkAndTransformCursor(SStreamFileState* pFileState, const uint64_t groupId, SArray* pWinStates,
|
||||
SStreamStateCur** ppCur) {
|
||||
SSessionKey key = {.groupId = groupId};
|
||||
int32_t code = streamStateSessionGetKVByCur_rocksdb(*ppCur, &key, NULL, NULL);
|
||||
int32_t code = streamStateSessionGetKVByCur_rocksdb(getStateFileStore(pFileState), *ppCur, &key, NULL, NULL);
|
||||
if (taosArrayGetSize(pWinStates) > 0 &&
|
||||
(code == TSDB_CODE_FAILED || sessionStateKeyCompare(&key, pWinStates, 0) >= 0)) {
|
||||
if (!(*ppCur)) {
|
||||
|
@ -653,7 +654,7 @@ SStreamStateCur* countWinStateSeekKeyPrev(SStreamFileState* pFileState, const SS
|
|||
SSessionKey key = {0};
|
||||
void* pVal = NULL;
|
||||
int len = 0;
|
||||
int32_t code = streamStateSessionGetKVByCur_rocksdb(pCur, &key, &pVal, &len);
|
||||
int32_t code = streamStateSessionGetKVByCur_rocksdb(getStateFileStore(pFileState), pCur, &key, &pVal, &len);
|
||||
if (code == TSDB_CODE_FAILED) {
|
||||
streamStateFreeCur(pCur);
|
||||
return pBuffCur;
|
||||
|
@ -667,7 +668,7 @@ SStreamStateCur* countWinStateSeekKeyPrev(SStreamFileState* pFileState, const SS
|
|||
}
|
||||
streamStateCurPrev(pFileStore, pCur);
|
||||
while (1) {
|
||||
code = streamStateSessionGetKVByCur_rocksdb(pCur, &key, &pVal, &len);
|
||||
code = streamStateSessionGetKVByCur_rocksdb(NULL, pCur, &key, &pVal, &len);
|
||||
if (code == TSDB_CODE_FAILED) {
|
||||
streamStateCurNext(pFileStore, pCur);
|
||||
return pCur;
|
||||
|
@ -710,7 +711,7 @@ int32_t sessionWinStateGetKVByCur(SStreamStateCur* pCur, SSessionKey* pKey, void
|
|||
*pKey = *(SSessionKey*)(pPos->pKey);
|
||||
} else {
|
||||
void* pData = NULL;
|
||||
code = streamStateSessionGetKVByCur_rocksdb(pCur, pKey, &pData, pVLen);
|
||||
code = streamStateSessionGetKVByCur_rocksdb(getStateFileStore(pCur->pStreamFileState), pCur, pKey, &pData, pVLen);
|
||||
if (taosArrayGetSize(pWinStates) > 0 &&
|
||||
(code == TSDB_CODE_FAILED || sessionStateRangeKeyCompare(pKey, pWinStates, 0) >= 0)) {
|
||||
transformCursor(pCur->pStreamFileState, pCur);
|
||||
|
@ -915,7 +916,7 @@ _end:
|
|||
|
||||
int32_t getCountWinStateFromDisc(SStreamState* pState, SSessionKey* pKey, void** pVal, int32_t* pVLen) {
|
||||
SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentNext_rocksdb(pState, pKey);
|
||||
int32_t code = streamStateSessionGetKVByCur_rocksdb(pCur, pKey, pVal, pVLen);
|
||||
int32_t code = streamStateSessionGetKVByCur_rocksdb(pState, pCur, pKey, pVal, pVLen);
|
||||
streamStateFreeCur(pCur);
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
|
@ -923,7 +924,7 @@ int32_t getCountWinStateFromDisc(SStreamState* pState, SSessionKey* pKey, void**
|
|||
pCur = streamStateSessionSeekKeyPrev_rocksdb(pState, pKey);
|
||||
}
|
||||
|
||||
code = streamStateSessionGetKVByCur_rocksdb(pCur, pKey, pVal, pVLen);
|
||||
code = streamStateSessionGetKVByCur_rocksdb(pState, pCur, pKey, pVal, pVLen);
|
||||
streamStateFreeCur(pCur);
|
||||
return code;
|
||||
}
|
||||
|
@ -1060,7 +1061,8 @@ _end:
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t createCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, COUNT_TYPE winCount, void** pVal, int32_t* pVLen) {
|
||||
int32_t createCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, COUNT_TYPE winCount, void** pVal,
|
||||
int32_t* pVLen) {
|
||||
SSessionKey* pWinKey = pKey;
|
||||
const TSKEY gap = 0;
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
@ -1098,7 +1100,7 @@ int32_t createCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey
|
|||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
}
|
||||
qDebug("===stream===0 get state win:%" PRId64 ",%" PRId64 " from disc, res %d", pWinKey->win.skey,
|
||||
pWinKey->win.ekey, code_file);
|
||||
pWinKey->win.ekey, code_file);
|
||||
}
|
||||
} else {
|
||||
code = addNewSessionWindow(pFileState, pWinStates, pWinKey, (SRowBuffPos**)pVal);
|
||||
|
|
|
@ -302,7 +302,7 @@ SStreamStateCur* streamStateGetAndCheckCur(SStreamState* pState, SWinKey* key) {
|
|||
}
|
||||
|
||||
int32_t streamStateGetKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) {
|
||||
return streamStateGetKVByCur_rocksdb(pCur, pKey, pVal, pVLen);
|
||||
return streamStateGetKVByCur_rocksdb(getStateFileStore(pCur->pStreamFileState), pCur, pKey, pVal, pVLen);
|
||||
}
|
||||
|
||||
int32_t streamStateFillGetKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) {
|
||||
|
@ -529,6 +529,9 @@ void streamStateCopyBackend(SStreamState* src, SStreamState* dst) {
|
|||
}
|
||||
dst->dump = 1;
|
||||
dst->pTdbState->pOwner->pBackend = src->pTdbState->pOwner->pBackend;
|
||||
dst->pResultRowStore.resultRowPut = src->pResultRowStore.resultRowPut;
|
||||
dst->pResultRowStore.resultRowGet = src->pResultRowStore.resultRowGet;
|
||||
dst->pExprSupp = src->pExprSupp;
|
||||
return;
|
||||
}
|
||||
SStreamStateCur* createStreamStateCursor() {
|
||||
|
|
|
@ -698,7 +698,7 @@ void flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, boo
|
|||
|
||||
int idx = streamStateGetCfIdx(pFileState->pFileStore, pFileState->cfName);
|
||||
|
||||
int32_t len = pFileState->rowSize + sizeof(uint64_t) + sizeof(int32_t) + 64;
|
||||
int32_t len = (pFileState->rowSize + sizeof(uint64_t) + sizeof(int32_t) + 64) * 2;
|
||||
char* buf = taosMemoryCalloc(1, len);
|
||||
if (!buf) {
|
||||
code = terrno;
|
||||
|
@ -849,7 +849,7 @@ int32_t recoverSesssion(SStreamFileState* pFileState, int64_t ckId) {
|
|||
void* pVal = NULL;
|
||||
int32_t vlen = 0;
|
||||
SSessionKey key = {0};
|
||||
winRes = streamStateSessionGetKVByCur_rocksdb(pCur, &key, &pVal, &vlen);
|
||||
winRes = streamStateSessionGetKVByCur_rocksdb(getStateFileStore(pFileState), pCur, &key, &pVal, &vlen);
|
||||
if (winRes != TSDB_CODE_SUCCESS) {
|
||||
break;
|
||||
}
|
||||
|
@ -903,7 +903,7 @@ int32_t recoverSnapshot(SStreamFileState* pFileState, int64_t ckId) {
|
|||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
}
|
||||
|
||||
winCode = streamStateGetKVByCur_rocksdb(pCur, pNewPos->pKey, (const void**)&pVal, &vlen);
|
||||
winCode = streamStateGetKVByCur_rocksdb(getStateFileStore(pFileState), pCur, pNewPos->pKey, (const void**)&pVal, &vlen);
|
||||
if (winCode != TSDB_CODE_SUCCESS || pFileState->getTs(pNewPos->pKey) < pFileState->flushMark) {
|
||||
destroyRowBuffPos(pNewPos);
|
||||
SListNode* pNode = tdListPopTail(pFileState->usedBuffs);
|
||||
|
@ -912,6 +912,7 @@ int32_t recoverSnapshot(SStreamFileState* pFileState, int64_t ckId) {
|
|||
break;
|
||||
}
|
||||
if (vlen != pFileState->rowSize) {
|
||||
qError("row size mismatch, expect:%d, actual:%d", pFileState->rowSize, vlen);
|
||||
code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
}
|
||||
|
|
|
@ -228,17 +228,17 @@ void *backendOpen() {
|
|||
memset(&key, 0, sizeof(key));
|
||||
char *val = NULL;
|
||||
int32_t vlen = 0;
|
||||
code = streamStateSessionGetKVByCur_rocksdb(pCurr, &key, (void **)&val, &vlen);
|
||||
code = streamStateSessionGetKVByCur_rocksdb(NULL, pCurr, &key, (void **)&val, &vlen);
|
||||
ASSERT(code == 0);
|
||||
pCurr = streamStateSessionSeekKeyPrev_rocksdb(p, &key);
|
||||
|
||||
code = streamStateSessionGetKVByCur_rocksdb(pCurr, &key, (void **)&val, &vlen);
|
||||
code = streamStateSessionGetKVByCur_rocksdb(NULL, pCurr, &key, (void **)&val, &vlen);
|
||||
ASSERT(code == 0);
|
||||
|
||||
ASSERT(key.groupId == 0 && key.win.ekey == tsArray[tsArray.size() - 2]);
|
||||
|
||||
pCurr = streamStateSessionSeekKeyNext_rocksdb(p, &key);
|
||||
code = streamStateSessionGetKVByCur_rocksdb(pCurr, &key, (void **)&val, &vlen);
|
||||
code = streamStateSessionGetKVByCur_rocksdb(NULL, pCurr, &key, (void **)&val, &vlen);
|
||||
ASSERT(code == 0);
|
||||
ASSERT(vlen == strlen("Value"));
|
||||
ASSERT(key.groupId == 0 && key.win.skey == tsArray[tsArray.size() - 1]);
|
||||
|
|
Loading…
Reference in New Issue