Merge remote-tracking branch 'origin/enh/rocksRevert' into enh/rocksRevert
This commit is contained in:
commit
d420734971
|
@ -88,6 +88,8 @@ int32_t streamStateGetByPos(SStreamState* pState, void* pos, void** pVal);
|
|||
int32_t streamStateDel(SStreamState* pState, const SWinKey* key);
|
||||
int32_t streamStateClear(SStreamState* pState);
|
||||
void streamStateSetNumber(SStreamState* pState, int32_t number);
|
||||
int32_t streamStateSaveInfo(SStreamState* pState, void* pKey, int32_t keyLen, void* pVal, int32_t vLen);
|
||||
int32_t streamStateGetInfo(SStreamState* pState, void* pKey, int32_t keyLen, void** pVal, int32_t* pLen);
|
||||
|
||||
int32_t streamStateSessionAddIfNotExist(SStreamState* pState, SSessionKey* key, TSKEY gap, void** pVal, int32_t* pVLen);
|
||||
int32_t streamStateSessionPut(SStreamState* pState, const SSessionKey* key, const void* value, int32_t vLen);
|
||||
|
|
|
@ -36,6 +36,7 @@ int32_t scanDebug = 0;
|
|||
#define MULTI_READER_MAX_TABLE_NUM 5000
|
||||
#define SET_REVERSE_SCAN_FLAG(_info) ((_info)->scanFlag = REVERSE_SCAN)
|
||||
#define SWITCH_ORDER(n) (((n) = ((n) == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC))
|
||||
#define STREAM_SCAN_OP_NAME "StreamScanOperator"
|
||||
|
||||
typedef struct STableMergeScanExecInfo {
|
||||
SFileBlockLoadRecorder blockRecorder;
|
||||
|
@ -1771,7 +1772,7 @@ int32_t streamScanOperatorEncode(SStreamScanInfo* pInfo, void** pBuff) {
|
|||
|
||||
// other properties are recovered from the execution plan
|
||||
void streamScanOperatorDeocde(void* pBuff, int32_t len, SStreamScanInfo* pInfo) {
|
||||
if (!pBuff) {
|
||||
if (!pBuff || len == 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -2054,10 +2055,12 @@ FETCH_NEXT_BLOCK:
|
|||
updateInfoDestoryColseWinSBF(pInfo->pUpdateInfo);
|
||||
doClearBufferedBlocks(pInfo);
|
||||
qDebug("stream scan return empty, consume block %d", totBlockNum);
|
||||
// void* buff = NULL;
|
||||
// int32_t len = streamScanOperatorEncode(pInfo, &buff);
|
||||
// todo(liuyao) save buff
|
||||
// taosMemoryFreeClear(buff);
|
||||
void* buff = NULL;
|
||||
int32_t len = streamScanOperatorEncode(pInfo, &buff);
|
||||
if (len > 0) {
|
||||
streamStateSaveInfo(pInfo->pState, STREAM_SCAN_OP_NAME, strlen(STREAM_SCAN_OP_NAME), buff, len);
|
||||
}
|
||||
taosMemoryFreeClear(buff);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
@ -2484,12 +2487,12 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
|
|||
pInfo->twAggSup.maxTs = INT64_MIN;
|
||||
pInfo->pState = NULL;
|
||||
|
||||
// todo(liuyao) get buff from rocks db;
|
||||
void* buff = NULL;
|
||||
int32_t len = 0;
|
||||
streamStateGetInfo(pTaskInfo->streamInfo.pState, STREAM_SCAN_OP_NAME, strlen(STREAM_SCAN_OP_NAME), &buff, &len);
|
||||
streamScanOperatorDeocde(buff, len, pInfo);
|
||||
|
||||
setOperatorInfo(pOperator, "StreamScanOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN, false, OP_NOT_OPENED, pInfo,
|
||||
setOperatorInfo(pOperator, STREAM_SCAN_OP_NAME, QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN, false, OP_NOT_OPENED, pInfo,
|
||||
pTaskInfo);
|
||||
pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock);
|
||||
|
||||
|
|
|
@ -2846,6 +2846,7 @@ void initDownStream(SOperatorInfo* downstream, SStreamAggSupporter* pAggSup, uin
|
|||
}
|
||||
SStreamScanInfo* pScanInfo = downstream->info;
|
||||
pScanInfo->windowSup = (SWindowSupporter){.pStreamAggSup = pAggSup, .gap = pAggSup->gap, .parentType = type};
|
||||
pScanInfo->pState = pAggSup->pState;
|
||||
if (!pScanInfo->igCheckUpdate && !pScanInfo->pUpdateInfo) {
|
||||
pScanInfo->pUpdateInfo = updateInfoInit(60000, TSDB_TIME_PRECISION_MILLI, pTwSup->waterMark);
|
||||
}
|
||||
|
|
|
@ -395,6 +395,32 @@ int32_t streamStateClear(SStreamState* pState) {
|
|||
|
||||
void streamStateSetNumber(SStreamState* pState, int32_t number) { pState->number = number; }
|
||||
|
||||
int32_t streamStateSaveInfo(SStreamState* pState, void* pKey, int32_t keyLen, void* pVal, int32_t vLen) {
|
||||
#ifdef USE_ROCKSDB
|
||||
int32_t code = 0;
|
||||
void* batch = streamStateCreateBatch();
|
||||
code = streamStatePutBatch(pState, "default", batch, pKey, pVal, vLen);
|
||||
if (code != 0) {
|
||||
return code;
|
||||
}
|
||||
code = streamStatePutBatch_rocksdb(pState, batch);
|
||||
streamStateDestroyBatch(batch);
|
||||
return code;
|
||||
#else
|
||||
return 0;
|
||||
#endif
|
||||
}
|
||||
|
||||
int32_t streamStateGetInfo(SStreamState* pState, void* pKey, int32_t keyLen, void** pVal, int32_t* pLen) {
|
||||
#ifdef USE_ROCKSDB
|
||||
int32_t code = 0;
|
||||
code = streamDefaultGet_rocksdb(pState, pKey, pVal, pLen);
|
||||
return code;
|
||||
#else
|
||||
return 0;
|
||||
#endif
|
||||
}
|
||||
|
||||
int32_t streamStateAddIfNotExist(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) {
|
||||
#ifdef USE_ROCKSDB
|
||||
return streamStateGet(pState, key, pVal, pVLen);
|
||||
|
@ -1066,7 +1092,7 @@ void streamStateDestroy(SStreamState* pState) {
|
|||
#ifdef USE_ROCKSDB
|
||||
streamFileStateDestroy(pState->pFileState);
|
||||
streamStateDestroy_rocksdb(pState);
|
||||
taosMemoryFreeClear(pState->parNameMap);
|
||||
tSimpleHashCleanup(pState->parNameMap);
|
||||
// do nothong
|
||||
#endif
|
||||
taosMemoryFreeClear(pState->pTdbState);
|
||||
|
|
|
@ -274,7 +274,10 @@ void updateInfoDestoryColseWinSBF(SUpdateInfo *pInfo) {
|
|||
}
|
||||
|
||||
int32_t updateInfoSerialize(void *buf, int32_t bufLen, const SUpdateInfo *pInfo) {
|
||||
ASSERT(pInfo);
|
||||
if(!pInfo) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
SEncoder encoder = {0};
|
||||
tEncoderInit(&encoder, buf, bufLen);
|
||||
if (tStartEncode(&encoder) < 0) return -1;
|
||||
|
|
|
@ -48,23 +48,34 @@ sleep 100
|
|||
#===================================================================
|
||||
print =============== query data from child table
|
||||
|
||||
$loop_count = 0
|
||||
|
||||
loop0:
|
||||
|
||||
sleep 200
|
||||
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 10 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select `_wstart`,`min(k)`,`max(k)`,sum_alias from outstb
|
||||
print rows: $rows
|
||||
print $data00 $data01 $data02 $data03
|
||||
if $rows != 1 then
|
||||
return -1
|
||||
goto loop0
|
||||
endi
|
||||
|
||||
if $data01 != 234 then
|
||||
return -1
|
||||
goto loop0
|
||||
endi
|
||||
|
||||
if $data02 != 234 then
|
||||
return -1
|
||||
goto loop0
|
||||
endi
|
||||
|
||||
if $data03 != 234 then
|
||||
return -1
|
||||
goto loop0
|
||||
endi
|
||||
|
||||
#===================================================================
|
||||
|
@ -77,36 +88,47 @@ sleep 100
|
|||
#===================================================================
|
||||
print =============== query data from child table
|
||||
|
||||
$loop_count = 0
|
||||
|
||||
loop1:
|
||||
|
||||
sleep 200
|
||||
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 10 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select `_wstart`,`min(k)`,`max(k)`,sum_alias from outstb
|
||||
print rows: $rows
|
||||
print $data00 $data01 $data02 $data03
|
||||
print $data10 $data11 $data12 $data13
|
||||
if $rows != 2 then
|
||||
return -1
|
||||
goto loop1
|
||||
endi
|
||||
|
||||
if $data01 != 234 then
|
||||
return -1
|
||||
goto loop1
|
||||
endi
|
||||
|
||||
if $data02 != 234 then
|
||||
return -1
|
||||
goto loop1
|
||||
endi
|
||||
|
||||
if $data03 != 234 then
|
||||
return -1
|
||||
goto loop1
|
||||
endi
|
||||
|
||||
if $data11 != -111 then
|
||||
return -1
|
||||
goto loop1
|
||||
endi
|
||||
|
||||
if $data12 != -111 then
|
||||
return -1
|
||||
goto loop1
|
||||
endi
|
||||
|
||||
if $data13 != -111 then
|
||||
return -1
|
||||
goto loop1
|
||||
endi
|
||||
|
||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||
|
|
Loading…
Reference in New Issue