stream interp force window close

This commit is contained in:
54liuyao 2024-08-06 16:14:15 +08:00
parent 2f9490d8bd
commit bdaf0d419e
25 changed files with 4683 additions and 5370 deletions

View File

@ -178,6 +178,7 @@ typedef enum EStreamType {
STREAM_TRANS_STATE, STREAM_TRANS_STATE,
STREAM_MID_RETRIEVE, STREAM_MID_RETRIEVE,
STREAM_PARTITION_DELETE_DATA, STREAM_PARTITION_DELETE_DATA,
STREAM_GET_RESULT,
} EStreamType; } EStreamType;
#pragma pack(push, 1) #pragma pack(push, 1)
@ -407,6 +408,10 @@ typedef struct STUidTagInfo {
#define TABLE_NAME_COLUMN_INDEX 6 #define TABLE_NAME_COLUMN_INDEX 6
#define PRIMARY_KEY_COLUMN_INDEX 7 #define PRIMARY_KEY_COLUMN_INDEX 7
//steam get result block column
#define DATA_TS_COLUMN_INDEX 0
#define DATA_VERSION_COLUMN_INDEX 1
// stream create table block column // stream create table block column
#define UD_TABLE_NAME_COLUMN_INDEX 0 #define UD_TABLE_NAME_COLUMN_INDEX 0
#define UD_GROUPID_COLUMN_INDEX 1 #define UD_GROUPID_COLUMN_INDEX 1

View File

@ -2750,6 +2750,8 @@ typedef struct {
#define STREAM_TRIGGER_AT_ONCE 1 #define STREAM_TRIGGER_AT_ONCE 1
#define STREAM_TRIGGER_WINDOW_CLOSE 2 #define STREAM_TRIGGER_WINDOW_CLOSE 2
#define STREAM_TRIGGER_MAX_DELAY 3 #define STREAM_TRIGGER_MAX_DELAY 3
#define STREAM_TRIGGER_FORCE_WINDOW_CLOSE 4
#define STREAM_DEFAULT_IGNORE_EXPIRED 1 #define STREAM_DEFAULT_IGNORE_EXPIRED 1
#define STREAM_FILL_HISTORY_ON 1 #define STREAM_FILL_HISTORY_ON 1
#define STREAM_FILL_HISTORY_OFF 0 #define STREAM_FILL_HISTORY_OFF 0

View File

@ -248,151 +248,153 @@
#define TK_TRIGGER 230 #define TK_TRIGGER 230
#define TK_AT_ONCE 231 #define TK_AT_ONCE 231
#define TK_WINDOW_CLOSE 232 #define TK_WINDOW_CLOSE 232
#define TK_IGNORE 233 #define TK_FORCE_WINDOW_CLOSE 233
#define TK_EXPIRED 234 #define TK_IGNORE 234
#define TK_FILL_HISTORY 235 #define TK_EXPIRED 235
#define TK_UPDATE 236 #define TK_FILL_HISTORY 236
#define TK_SUBTABLE 237 #define TK_UPDATE 237
#define TK_UNTREATED 238 #define TK_SUBTABLE 238
#define TK_KILL 239 #define TK_UNTREATED 239
#define TK_CONNECTION 240 #define TK_KILL 240
#define TK_TRANSACTION 241 #define TK_CONNECTION 241
#define TK_BALANCE 242 #define TK_TRANSACTION 242
#define TK_VGROUP 243 #define TK_BALANCE 243
#define TK_LEADER 244 #define TK_VGROUP 244
#define TK_MERGE 245 #define TK_LEADER 245
#define TK_REDISTRIBUTE 246 #define TK_MERGE 246
#define TK_SPLIT 247 #define TK_REDISTRIBUTE 247
#define TK_DELETE 248 #define TK_SPLIT 248
#define TK_INSERT 249 #define TK_DELETE 249
#define TK_NK_BIN 250 #define TK_INSERT 250
#define TK_NK_HEX 251 #define TK_NK_BIN 251
#define TK_NULL 252 #define TK_NK_HEX 252
#define TK_NK_QUESTION 253 #define TK_NULL 253
#define TK_NK_ALIAS 254 #define TK_NK_QUESTION 254
#define TK_NK_ARROW 255 #define TK_NK_ALIAS 255
#define TK_ROWTS 256 #define TK_NK_ARROW 256
#define TK_QSTART 257 #define TK_ROWTS 257
#define TK_QEND 258 #define TK_QSTART 258
#define TK_QDURATION 259 #define TK_QEND 259
#define TK_WSTART 260 #define TK_QDURATION 260
#define TK_WEND 261 #define TK_WSTART 261
#define TK_WDURATION 262 #define TK_WEND 262
#define TK_IROWTS 263 #define TK_WDURATION 263
#define TK_ISFILLED 264 #define TK_IROWTS 264
#define TK_CAST 265 #define TK_ISFILLED 265
#define TK_NOW 266 #define TK_CAST 266
#define TK_TODAY 267 #define TK_NOW 267
#define TK_TIMEZONE 268 #define TK_TODAY 268
#define TK_CLIENT_VERSION 269 #define TK_TIMEZONE 269
#define TK_SERVER_VERSION 270 #define TK_CLIENT_VERSION 270
#define TK_SERVER_STATUS 271 #define TK_SERVER_VERSION 271
#define TK_CURRENT_USER 272 #define TK_SERVER_STATUS 272
#define TK_CASE 273 #define TK_CURRENT_USER 273
#define TK_WHEN 274 #define TK_CASE 274
#define TK_THEN 275 #define TK_WHEN 275
#define TK_ELSE 276 #define TK_THEN 276
#define TK_BETWEEN 277 #define TK_ELSE 277
#define TK_IS 278 #define TK_BETWEEN 278
#define TK_NK_LT 279 #define TK_IS 279
#define TK_NK_GT 280 #define TK_NK_LT 280
#define TK_NK_LE 281 #define TK_NK_GT 281
#define TK_NK_GE 282 #define TK_NK_LE 282
#define TK_NK_NE 283 #define TK_NK_GE 283
#define TK_MATCH 284 #define TK_NK_NE 284
#define TK_NMATCH 285 #define TK_MATCH 285
#define TK_CONTAINS 286 #define TK_NMATCH 286
#define TK_IN 287 #define TK_CONTAINS 287
#define TK_JOIN 288 #define TK_IN 288
#define TK_INNER 289 #define TK_JOIN 289
#define TK_LEFT 290 #define TK_INNER 290
#define TK_RIGHT 291 #define TK_LEFT 291
#define TK_OUTER 292 #define TK_RIGHT 292
#define TK_SEMI 293 #define TK_OUTER 293
#define TK_ANTI 294 #define TK_SEMI 294
#define TK_ASOF 295 #define TK_ANTI 295
#define TK_WINDOW 296 #define TK_ASOF 296
#define TK_WINDOW_OFFSET 297 #define TK_WINDOW 297
#define TK_JLIMIT 298 #define TK_WINDOW_OFFSET 298
#define TK_SELECT 299 #define TK_JLIMIT 299
#define TK_NK_HINT 300 #define TK_SELECT 300
#define TK_DISTINCT 301 #define TK_NK_HINT 301
#define TK_WHERE 302 #define TK_DISTINCT 302
#define TK_PARTITION 303 #define TK_WHERE 303
#define TK_BY 304 #define TK_PARTITION 304
#define TK_SESSION 305 #define TK_BY 305
#define TK_STATE_WINDOW 306 #define TK_SESSION 306
#define TK_EVENT_WINDOW 307 #define TK_STATE_WINDOW 307
#define TK_COUNT_WINDOW 308 #define TK_EVENT_WINDOW 308
#define TK_SLIDING 309 #define TK_COUNT_WINDOW 309
#define TK_FILL 310 #define TK_SLIDING 310
#define TK_VALUE 311 #define TK_FILL 311
#define TK_VALUE_F 312 #define TK_VALUE 312
#define TK_NONE 313 #define TK_VALUE_F 313
#define TK_PREV 314 #define TK_NONE 314
#define TK_NULL_F 315 #define TK_PREV 315
#define TK_LINEAR 316 #define TK_NULL_F 316
#define TK_NEXT 317 #define TK_LINEAR 317
#define TK_HAVING 318 #define TK_NEXT 318
#define TK_RANGE 319 #define TK_HAVING 319
#define TK_EVERY 320 #define TK_RANGE 320
#define TK_ORDER 321 #define TK_EVERY 321
#define TK_SLIMIT 322 #define TK_ORDER 322
#define TK_SOFFSET 323 #define TK_SLIMIT 323
#define TK_LIMIT 324 #define TK_SOFFSET 324
#define TK_OFFSET 325 #define TK_LIMIT 325
#define TK_ASC 326 #define TK_OFFSET 326
#define TK_NULLS 327 #define TK_ASC 327
#define TK_ABORT 328 #define TK_NULLS 328
#define TK_AFTER 329 #define TK_ABORT 329
#define TK_ATTACH 330 #define TK_AFTER 330
#define TK_BEFORE 331 #define TK_ATTACH 331
#define TK_BEGIN 332 #define TK_BEFORE 332
#define TK_BITAND 333 #define TK_BEGIN 333
#define TK_BITNOT 334 #define TK_BITAND 334
#define TK_BITOR 335 #define TK_BITNOT 335
#define TK_BLOCKS 336 #define TK_BITOR 336
#define TK_CHANGE 337 #define TK_BLOCKS 337
#define TK_COMMA 338 #define TK_CHANGE 338
#define TK_CONCAT 339 #define TK_COMMA 339
#define TK_CONFLICT 340 #define TK_CONCAT 340
#define TK_COPY 341 #define TK_CONFLICT 341
#define TK_DEFERRED 342 #define TK_COPY 342
#define TK_DELIMITERS 343 #define TK_DEFERRED 343
#define TK_DETACH 344 #define TK_DELIMITERS 344
#define TK_DIVIDE 345 #define TK_DETACH 345
#define TK_DOT 346 #define TK_DIVIDE 346
#define TK_EACH 347 #define TK_DOT 347
#define TK_FAIL 348 #define TK_EACH 348
#define TK_FOR 349 #define TK_FAIL 349
#define TK_GLOB 350 #define TK_FOR 350
#define TK_ID 351 #define TK_GLOB 351
#define TK_IMMEDIATE 352 #define TK_ID 352
#define TK_IMPORT 353 #define TK_IMMEDIATE 353
#define TK_INITIALLY 354 #define TK_IMPORT 354
#define TK_INSTEAD 355 #define TK_INITIALLY 355
#define TK_ISNULL 356 #define TK_INSTEAD 356
#define TK_MODULES 357 #define TK_ISNULL 357
#define TK_NK_BITNOT 358 #define TK_MODULES 358
#define TK_NK_SEMI 359 #define TK_NK_BITNOT 359
#define TK_NOTNULL 360 #define TK_NK_SEMI 360
#define TK_OF 361 #define TK_NOTNULL 361
#define TK_PLUS 362 #define TK_OF 362
#define TK_PRIVILEGE 363 #define TK_PLUS 363
#define TK_RAISE 364 #define TK_PRIVILEGE 364
#define TK_RESTRICT 365 #define TK_RAISE 365
#define TK_ROW 366 #define TK_RESTRICT 366
#define TK_STAR 367 #define TK_ROW 367
#define TK_STATEMENT 368 #define TK_STAR 368
#define TK_STRICT 369 #define TK_STATEMENT 369
#define TK_STRING 370 #define TK_STRICT 370
#define TK_TIMES 371 #define TK_STRING 371
#define TK_VALUES 372 #define TK_TIMES 372
#define TK_VARIABLE 373 #define TK_VALUES 373
#define TK_WAL 374 #define TK_VARIABLE 374
#define TK_ENCODE 375 #define TK_WAL 375
#define TK_COMPRESS 376 #define TK_ENCODE 376
#define TK_LEVEL 377 #define TK_COMPRESS 377
#define TK_LEVEL 378
#define TK_NK_SPACE 600 #define TK_NK_SPACE 600
#define TK_NK_COMMENT 601 #define TK_NK_COMMENT 601

View File

@ -326,6 +326,9 @@ typedef struct {
int64_t number; int64_t number;
void* pStreamFileState; void* pStreamFileState;
int32_t buffIndex; int32_t buffIndex;
int32_t hashIter;
void* pHashData;
int64_t minGpId;
} SStreamStateCur; } SStreamStateCur;
typedef struct SStateStore { typedef struct SStateStore {
@ -352,6 +355,8 @@ typedef struct SStateStore {
int32_t (*streamStateFillPut)(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen); int32_t (*streamStateFillPut)(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen);
int32_t (*streamStateFillGet)(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen, int32_t (*streamStateFillGet)(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen,
int32_t* pWinCode); int32_t* pWinCode);
int32_t (*streamStateFillAddIfNotExist)(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen,
int32_t* pWinCode);
void (*streamStateFillDel)(SStreamState* pState, const SWinKey* key); void (*streamStateFillDel)(SStreamState* pState, const SWinKey* key);
int32_t (*streamStateFillGetNext)(SStreamState* pState, const SWinKey* pKey, SWinKey* pResKey, void** pVal, int32_t (*streamStateFillGetNext)(SStreamState* pState, const SWinKey* pKey, SWinKey* pResKey, void** pVal,
int32_t* pVLen, int32_t* pWinCode); int32_t* pVLen, int32_t* pWinCode);
@ -418,6 +423,11 @@ typedef struct SStateStore {
uint32_t selectRowSize, GetTsFun fp, void* pFile, TSKEY delMark, uint32_t selectRowSize, GetTsFun fp, void* pFile, TSKEY delMark,
const char* id, int64_t ckId, int8_t type); const char* id, int64_t ckId, int8_t type);
int32_t (*streamStateGroupPut)(SStreamState* pState, int64_t groupId, void* value, int32_t vLen);
SStreamStateCur* (*streamStateGroupGetCur)(SStreamState* pState);
void (*streamStateGroupCurNext)(SStreamStateCur* pCur);
int32_t (*streamStateGroupGetKVByCur)(SStreamStateCur* pCur, int64_t* pKey, void** pVal, int32_t* pVLen);
void (*streamFileStateDestroy)(struct SStreamFileState* pFileState); void (*streamFileStateDestroy)(struct SStreamFileState* pFileState);
void (*streamFileStateClear)(struct SStreamFileState* pFileState); void (*streamFileStateClear)(struct SStreamFileState* pFileState);
bool (*needClearDiskBuff)(struct SStreamFileState* pFileState); bool (*needClearDiskBuff)(struct SStreamFileState* pFileState);

View File

@ -76,11 +76,13 @@ int32_t streamStateStateAddIfNotExist(SStreamState* pState, SSessionKey* key, ch
// fill // fill
int32_t streamStateFillPut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen); int32_t streamStateFillPut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen);
int32_t streamStateFillGet(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen, int32_t* pWinCode); int32_t streamStateFillGet(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen, int32_t* pWinCode);
int32_t streamStateFillAddIfNotExist(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen,
int32_t* pWinCode);
void streamStateFillDel(SStreamState* pState, const SWinKey* key); void streamStateFillDel(SStreamState* pState, const SWinKey* key);
int32_t streamStateFillGetNext(SStreamState* pState, const SWinKey* pKey, SWinKey* pResKey, void** pVal, int32_t streamStateFillGetNext(SStreamState* pState, const SWinKey* pKey, SWinKey* pResKey, void** pVal, int32_t* pVLen,
int32_t* pVLen, int32_t* pWinCode); int32_t* pWinCode);
int32_t streamStateFillGetPrev(SStreamState* pState, const SWinKey* pKey, SWinKey* pResKey, void** pVal, int32_t streamStateFillGetPrev(SStreamState* pState, const SWinKey* pKey, SWinKey* pResKey, void** pVal, int32_t* pVLen,
int32_t* pVLen, int32_t* pWinCode); int32_t* pWinCode);
int32_t streamStateAddIfNotExist(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen, int32_t streamStateAddIfNotExist(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen,
int32_t* pWinCode); int32_t* pWinCode);
@ -109,6 +111,12 @@ void streamStateCurPrev(SStreamState* pState, SStreamStateCur* pCur);
int32_t streamStatePutParName(SStreamState* pState, int64_t groupId, const char* tbname); int32_t streamStatePutParName(SStreamState* pState, int64_t groupId, const char* tbname);
int32_t streamStateGetParName(SStreamState* pState, int64_t groupId, void** pVal, bool onlyCache, int32_t* pWinCode); int32_t streamStateGetParName(SStreamState* pState, int64_t groupId, void** pVal, bool onlyCache, int32_t* pWinCode);
// group id
int32_t streamStateGroupPut(SStreamState* pState, int64_t groupId, void* value, int32_t vLen);
SStreamStateCur* streamStateGroupGetCur(SStreamState* pState);
void streamStateGroupCurNext(SStreamStateCur* pCur);
int32_t streamStateGroupGetKVByCur(SStreamStateCur* pCur, int64_t* pKey, void** pVal, int32_t* pVLen);
void streamStateReloadInfo(SStreamState* pState, TSKEY ts); void streamStateReloadInfo(SStreamState* pState, TSKEY ts);
void streamStateCopyBackend(SStreamState* src, SStreamState* dst); void streamStateCopyBackend(SStreamState* src, SStreamState* dst);

View File

@ -56,6 +56,8 @@ bool needClearDiskBuff(SStreamFileState* pFileState);
void streamFileStateReleaseBuff(SStreamFileState* pFileState, SRowBuffPos* pPos, bool used); void streamFileStateReleaseBuff(SStreamFileState* pFileState, SRowBuffPos* pPos, bool used);
void streamFileStateClearBuff(SStreamFileState* pFileState, SRowBuffPos* pPos); void streamFileStateClearBuff(SStreamFileState* pFileState, SRowBuffPos* pPos);
int32_t addRowBuffIfNotExist(SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen,
int32_t* pWinCode);
int32_t getRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen, int32_t getRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen,
int32_t* pWinCode); int32_t* pWinCode);
void deleteRowBuff(SStreamFileState* pFileState, const void* pKey, int32_t keyLen); void deleteRowBuff(SStreamFileState* pFileState, const void* pKey, int32_t keyLen);
@ -98,6 +100,7 @@ void recoverSesssion(SStreamFileState* pFileState, int64_t ckId);
void sessionWinStateClear(SStreamFileState* pFileState); void sessionWinStateClear(SStreamFileState* pFileState);
void sessionWinStateCleanup(void* pBuff); void sessionWinStateCleanup(void* pBuff);
SStreamStateCur* createStateCursor(SStreamFileState* pFileState);
SStreamStateCur* sessionWinStateSeekKeyCurrentPrev(SStreamFileState* pFileState, const SSessionKey* pWinKey); SStreamStateCur* sessionWinStateSeekKeyCurrentPrev(SStreamFileState* pFileState, const SSessionKey* pWinKey);
SStreamStateCur* sessionWinStateSeekKeyCurrentNext(SStreamFileState* pFileState, const SSessionKey* pWinKey); SStreamStateCur* sessionWinStateSeekKeyCurrentNext(SStreamFileState* pFileState, const SSessionKey* pWinKey);
SStreamStateCur* sessionWinStateSeekKeyNext(SStreamFileState* pFileState, const SSessionKey* pWinKey); SStreamStateCur* sessionWinStateSeekKeyNext(SStreamFileState* pFileState, const SSessionKey* pWinKey);
@ -136,6 +139,11 @@ int32_t getHashSortPrevRow(SStreamFileState* pFileState, const SWinKey* pKey, SW
int32_t recoverFillSnapshot(SStreamFileState* pFileState, int64_t ckId); int32_t recoverFillSnapshot(SStreamFileState* pFileState, int64_t ckId);
void deleteHashSortRowBuff(SStreamFileState* pFileState, const SWinKey* pKey); void deleteHashSortRowBuff(SStreamFileState* pFileState, const SWinKey* pKey);
//group
int32_t streamFileStateGroupPut(SStreamFileState* pFileState, int64_t groupId, void* value, int32_t vLen);
void streamFileStateGroupCurNext(SStreamStateCur* pCur);
int32_t streamFileStateGroupGetKVByCur(SStreamStateCur* pCur, int64_t* pKey, void** pVal, int32_t* pVLen);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

View File

@ -1349,6 +1349,8 @@ static void mndShowStreamTrigger(char *dst, SStreamObj *pStream) {
strcpy(dst, "window close"); strcpy(dst, "window close");
} else if (trigger == STREAM_TRIGGER_MAX_DELAY) { } else if (trigger == STREAM_TRIGGER_MAX_DELAY) {
strcpy(dst, "max delay"); strcpy(dst, "max delay");
} else if (trigger == STREAM_TRIGGER_FORCE_WINDOW_CLOSE) {
strcpy(dst, "force window close");
} }
} }

View File

@ -49,6 +49,7 @@ void initStateStoreAPI(SStateStore* pStore) {
pStore->streamStateFillPut = streamStateFillPut; pStore->streamStateFillPut = streamStateFillPut;
pStore->streamStateFillGet = streamStateFillGet; pStore->streamStateFillGet = streamStateFillGet;
pStore->streamStateFillAddIfNotExist = streamStateFillAddIfNotExist;
pStore->streamStateFillDel = streamStateFillDel; pStore->streamStateFillDel = streamStateFillDel;
pStore->streamStateFillGetNext = streamStateFillGetNext; pStore->streamStateFillGetNext = streamStateFillGetNext;
pStore->streamStateFillGetPrev = streamStateFillGetPrev; pStore->streamStateFillGetPrev = streamStateFillGetPrev;
@ -102,6 +103,11 @@ void initStateStoreAPI(SStateStore* pStore) {
pStore->streamStateSessionSeekKeyCurrentPrev = streamStateSessionSeekKeyCurrentPrev; pStore->streamStateSessionSeekKeyCurrentPrev = streamStateSessionSeekKeyCurrentPrev;
pStore->streamStateSessionSeekKeyCurrentNext = streamStateSessionSeekKeyCurrentNext; pStore->streamStateSessionSeekKeyCurrentNext = streamStateSessionSeekKeyCurrentNext;
pStore->streamStateGroupPut = streamStateGroupPut;
pStore->streamStateGroupGetCur = streamStateGroupGetCur;
pStore->streamStateGroupCurNext = streamStateGroupCurNext;
pStore->streamStateGroupGetKVByCur = streamStateGroupGetKVByCur;
pStore->streamFileStateDestroy = streamFileStateDestroy; pStore->streamFileStateDestroy = streamFileStateDestroy;
pStore->streamFileStateClear = streamFileStateClear; pStore->streamFileStateClear = streamFileStateClear;
pStore->needClearDiskBuff = needClearDiskBuff; pStore->needClearDiskBuff = needClearDiskBuff;

View File

@ -165,6 +165,7 @@ void initStateStoreAPI(SStateStore* pStore) {
pStore->streamStateFillPut = streamStateFillPut; pStore->streamStateFillPut = streamStateFillPut;
pStore->streamStateFillGet = streamStateFillGet; pStore->streamStateFillGet = streamStateFillGet;
pStore->streamStateFillAddIfNotExist = streamStateFillAddIfNotExist;
pStore->streamStateFillDel = streamStateFillDel; pStore->streamStateFillDel = streamStateFillDel;
pStore->streamStateFillGetNext = streamStateFillGetNext; pStore->streamStateFillGetNext = streamStateFillGetNext;
pStore->streamStateFillGetPrev = streamStateFillGetPrev; pStore->streamStateFillGetPrev = streamStateFillGetPrev;
@ -216,6 +217,11 @@ void initStateStoreAPI(SStateStore* pStore) {
pStore->streamStateSessionSeekKeyCurrentPrev = streamStateSessionSeekKeyCurrentPrev; pStore->streamStateSessionSeekKeyCurrentPrev = streamStateSessionSeekKeyCurrentPrev;
pStore->streamStateSessionSeekKeyCurrentNext = streamStateSessionSeekKeyCurrentNext; pStore->streamStateSessionSeekKeyCurrentNext = streamStateSessionSeekKeyCurrentNext;
pStore->streamStateGroupPut = streamStateGroupPut;
pStore->streamStateGroupGetCur = streamStateGroupGetCur;
pStore->streamStateGroupCurNext = streamStateGroupCurNext;
pStore->streamStateGroupGetKVByCur = streamStateGroupGetKVByCur;
pStore->streamFileStateDestroy = streamFileStateDestroy; pStore->streamFileStateDestroy = streamFileStateDestroy;
pStore->streamFileStateClear = streamFileStateClear; pStore->streamFileStateClear = streamFileStateClear;
pStore->needClearDiskBuff = needClearDiskBuff; pStore->needClearDiskBuff = needClearDiskBuff;

View File

@ -476,6 +476,7 @@ typedef struct SStreamScanInfo {
STqReader* tqReader; STqReader* tqReader;
uint64_t groupId; uint64_t groupId;
bool igCheckGroupId;
struct SUpdateInfo* pUpdateInfo; struct SUpdateInfo* pUpdateInfo;
EStreamScanMode scanMode; EStreamScanMode scanMode;

View File

@ -1787,7 +1787,8 @@ int32_t createExprFromOneNode(SExprInfo* pExp, SNode* pNode, int16_t slotId) {
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
pExp->base.numOfParams = 1; pExp->base.numOfParams = 1;
SDataType* pType = &pCond->node.resType; SDataType* pType = &pCond->node.resType;
pExp->base.resSchema = createResSchema(pType->type, pType->bytes, slotId, pType->scale, pType->precision, pCond->node.aliasName); pExp->base.resSchema =
createResSchema(pType->type, pType->bytes, slotId, pType->scale, pType->precision, pCond->node.aliasName);
pExp->pExpr->_optrRoot.pRootNode = pNode; pExp->pExpr->_optrRoot.pRootNode = pNode;
} }
} else { } else {
@ -2664,7 +2665,9 @@ void printSpecDataBlock(SSDataBlock* pBlock, const char* flag, const char* opStr
qDebug("%s===stream===%s %s: Block is Null", taskIdStr, flag, opStr); qDebug("%s===stream===%s %s: Block is Null", taskIdStr, flag, opStr);
return; return;
} else if (pBlock->info.rows == 0) { } else if (pBlock->info.rows == 0) {
qDebug("%s===stream===%s %s: Block is Empty. block type %d", taskIdStr, flag, opStr, pBlock->info.type); qDebug("%s===stream===%s %s: Block is Empty. block type %d.skey:%" PRId64 ",ekey:%" PRId64 ",version%" PRId64,
taskIdStr, flag, opStr, pBlock->info.type, pBlock->info.window.skey, pBlock->info.window.ekey,
pBlock->info.version);
return; return;
} }
if (qDebugFlag & DEBUG_DEBUG) { if (qDebugFlag & DEBUG_DEBUG) {

View File

@ -1483,8 +1483,12 @@ static void setGroupId(SStreamScanInfo* pInfo, SSDataBlock* pBlock, int32_t grou
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, groupColIndex); SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, groupColIndex);
uint64_t* groupCol = (uint64_t*)pColInfo->pData; uint64_t* groupCol = (uint64_t*)pColInfo->pData;
ASSERT(rowIndex < pBlock->info.rows); ASSERT(rowIndex < pBlock->info.rows);
if (colDataIsNull_s(pColInfo, rowIndex)) {
pInfo->igCheckGroupId = true;
} else {
pInfo->groupId = groupCol[rowIndex]; pInfo->groupId = groupCol[rowIndex];
} }
}
void resetTableScanInfo(STableScanInfo* pTableScanInfo, STimeWindow* pWin, uint64_t ver) { void resetTableScanInfo(STableScanInfo* pTableScanInfo, STimeWindow* pWin, uint64_t ver) {
pTableScanInfo->base.cond.twindows = *pWin; pTableScanInfo->base.cond.twindows = *pWin;
@ -1751,6 +1755,12 @@ static int32_t doRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32_t ts
continue; continue;
} }
if (pInfo->igCheckGroupId == true) {
pResult->info.calWin = pInfo->updateWin;
(*ppRes) = pResult;
goto _end;
}
if (pInfo->partitionSup.needCalc) { if (pInfo->partitionSup.needCalc) {
SSDataBlock* tmpBlock = NULL; SSDataBlock* tmpBlock = NULL;
code = createOneDataBlock(pResult, true, &tmpBlock); code = createOneDataBlock(pResult, true, &tmpBlock);
@ -1825,10 +1835,10 @@ int32_t appendOneRowToSpecialBlockImpl(SSDataBlock* pBlock, TSKEY* pStartTs, TSK
code = colDataSetVal(pEndTsCol, pBlock->info.rows, (const char*)pEndTs, false); code = colDataSetVal(pEndTsCol, pBlock->info.rows, (const char*)pEndTs, false);
QUERY_CHECK_CODE(code, lino, _end); QUERY_CHECK_CODE(code, lino, _end);
code = colDataSetVal(pUidCol, pBlock->info.rows, (const char*)pUid, false); code = colDataSetVal(pUidCol, pBlock->info.rows, (const char*)pUid, pUid == NULL);
QUERY_CHECK_CODE(code, lino, _end); QUERY_CHECK_CODE(code, lino, _end);
code = colDataSetVal(pGpCol, pBlock->info.rows, (const char*)pGp, false); code = colDataSetVal(pGpCol, pBlock->info.rows, (const char*)pGp, pGp == NULL);
QUERY_CHECK_CODE(code, lino, _end); QUERY_CHECK_CODE(code, lino, _end);
code = colDataSetVal(pCalStartCol, pBlock->info.rows, (const char*)pCalStartTs, false); code = colDataSetVal(pCalStartCol, pBlock->info.rows, (const char*)pCalStartTs, false);
@ -2065,8 +2075,8 @@ _end:
return code; return code;
} }
int32_t getTimeSliceWinRange(SStreamAggSupporter* pAggSup, SInterval* pInterval, TSKEY start, TSKEY end, int64_t groupId, int32_t getTimeSliceWinRange(SStreamAggSupporter* pAggSup, SInterval* pInterval, TSKEY start, TSKEY end,
STimeWindow* pScanRange, STimeWindow* pDelRange) { int64_t groupId, STimeWindow* pScanRange, STimeWindow* pDelRange) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0; int32_t lino = 0;
int32_t winCode = TSDB_CODE_SUCCESS; int32_t winCode = TSDB_CODE_SUCCESS;
@ -3157,6 +3167,12 @@ static bool isStreamWindow(SStreamScanInfo* pInfo) {
isTimeSlice(pInfo); isTimeSlice(pInfo);
} }
static int32_t copyGetResultBlock(SSDataBlock* dest, const SSDataBlock* src) {
TSKEY start = src->info.window.skey;
TSKEY end = src->info.window.ekey;
return appendDataToSpecialBlock(dest, &start, &end, NULL, NULL, NULL);
}
static int32_t doStreamScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) { static int32_t doStreamScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
// NOTE: this operator does never check if current status is done or not // NOTE: this operator does never check if current status is done or not
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
@ -3380,13 +3396,22 @@ FETCH_NEXT_BLOCK:
} }
} }
} break; } break;
case STREAM_GET_RESULT: {
pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
pInfo->updateResIndex = 0;
code = copyGetResultBlock(pInfo->pUpdateRes, pBlock);
QUERY_CHECK_CODE(code, lino, _end);
pInfo->pUpdateInfo->maxDataVersion = pBlock->info.version;
prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex, NULL);
pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
} break;
case STREAM_CHECKPOINT: { case STREAM_CHECKPOINT: {
qError("stream check point error. msg type: STREAM_INPUT__DATA_BLOCK"); qError("stream check point error. msg type: STREAM_INPUT__DATA_BLOCK");
} break; } break;
default: default:
break; break;
} }
printDataBlock(pBlock, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo)); printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "block recv", GET_TASKID(pTaskInfo));
setStreamOperatorState(&pInfo->basic, pBlock->info.type); setStreamOperatorState(&pInfo->basic, pBlock->info.type);
(*ppRes) = pBlock; (*ppRes) = pBlock;
return code; return code;
@ -4129,6 +4154,7 @@ int32_t createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode*
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE; pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
pInfo->windowSup = (SWindowSupporter){.pStreamAggSup = NULL, .gap = -1, .parentType = QUERY_NODE_PHYSICAL_PLAN}; pInfo->windowSup = (SWindowSupporter){.pStreamAggSup = NULL, .gap = -1, .parentType = QUERY_NODE_PHYSICAL_PLAN};
pInfo->groupId = 0; pInfo->groupId = 0;
pInfo->igCheckGroupId = false;
pInfo->pStreamScanOp = pOperator; pInfo->pStreamScanOp = pOperator;
pInfo->deleteDataIndex = 0; pInfo->deleteDataIndex = 0;
code = createSpecialDataBlock(STREAM_DELETE_DATA, &pInfo->pDeleteDataRes); code = createSpecialDataBlock(STREAM_DELETE_DATA, &pInfo->pDeleteDataRes);

View File

@ -591,7 +591,7 @@ static TSKEY adustEndTsKey(TSKEY pointTs, TSKEY rowTs, SInterval* pInterval) {
} }
static int32_t getLinearResultInfoFromState(SStreamAggSupporter* pAggSup, SStreamFillSupporter* pFillSup, TSKEY ts, static int32_t getLinearResultInfoFromState(SStreamAggSupporter* pAggSup, SStreamFillSupporter* pFillSup, TSKEY ts,
int64_t groupId, SSlicePoint* pCurPoint, SSlicePoint* pPrevPoint, int64_t groupId, SSlicePoint* pCurPoint, SSlicePoint* pPrevPoint,
SSlicePoint* pNextPoint, int32_t* pWinCode) { SSlicePoint* pNextPoint) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0; int32_t lino = 0;
int32_t tmpRes = TSDB_CODE_SUCCESS; int32_t tmpRes = TSDB_CODE_SUCCESS;
@ -605,7 +605,7 @@ static int32_t getLinearResultInfoFromState(SStreamAggSupporter* pAggSup, SStrea
pCurPoint->key.ts = ts; pCurPoint->key.ts = ts;
int32_t curVLen = 0; int32_t curVLen = 0;
code = code =
pAggSup->stateStore.streamStateFillGet(pState, &pCurPoint->key, (void**)&pCurPoint->pResPos, &curVLen, pWinCode); pAggSup->stateStore.streamStateFillGet(pState, &pCurPoint->key, (void**)&pCurPoint->pResPos, &curVLen, &tmpRes);
QUERY_CHECK_CODE(code, lino, _end); QUERY_CHECK_CODE(code, lino, _end);
setPointBuff(pCurPoint, pFillSup); setPointBuff(pCurPoint, pFillSup);
@ -674,7 +674,7 @@ _end:
static int32_t getResultInfoFromState(SStreamAggSupporter* pAggSup, SStreamFillSupporter* pFillSup, TSKEY ts, static int32_t getResultInfoFromState(SStreamAggSupporter* pAggSup, SStreamFillSupporter* pFillSup, TSKEY ts,
int64_t groupId, SSlicePoint* pCurPoint, SSlicePoint* pPrevPoint, int64_t groupId, SSlicePoint* pCurPoint, SSlicePoint* pPrevPoint,
SSlicePoint* pNextPoint, int32_t* pWinCode) { SSlicePoint* pNextPoint) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0; int32_t lino = 0;
int32_t tmpRes = TSDB_CODE_SUCCESS; int32_t tmpRes = TSDB_CODE_SUCCESS;
@ -688,13 +688,16 @@ static int32_t getResultInfoFromState(SStreamAggSupporter* pAggSup, SStreamFillS
pCurPoint->key.ts = ts; pCurPoint->key.ts = ts;
int32_t curVLen = 0; int32_t curVLen = 0;
code = code =
pAggSup->stateStore.streamStateFillGet(pState, &pCurPoint->key, (void**)&pCurPoint->pResPos, &curVLen, pWinCode); pAggSup->stateStore.streamStateFillGet(pState, &pCurPoint->key, (void**)&pCurPoint->pResPos, &curVLen, &tmpRes);
QUERY_CHECK_CODE(code, lino, _end); QUERY_CHECK_CODE(code, lino, _end);
if (tmpRes == TSDB_CODE_SUCCESS) {
setPointBuff(pCurPoint, pFillSup); setPointBuff(pCurPoint, pFillSup);
pFillSup->cur.key = pCurPoint->pRightRow->key; pFillSup->cur.key = pCurPoint->pRightRow->key;
pFillSup->cur.pRowVal = pCurPoint->pRightRow->pRowVal; pFillSup->cur.pRowVal = pCurPoint->pRightRow->pRowVal;
} else {
pFillSup->cur.key = pCurPoint->key.ts + 1;
}
pPrevPoint->key.groupId = groupId; pPrevPoint->key.groupId = groupId;
int32_t preVLen = 0; int32_t preVLen = 0;
@ -755,8 +758,8 @@ static int32_t getPointInfoFromStateRight(SStreamAggSupporter* pAggSup, SStreamF
pNextPoint->key.ts = stw.skey; pNextPoint->key.ts = stw.skey;
int32_t curVLen = 0; int32_t curVLen = 0;
code = pAggSup->stateStore.streamStateFillGet(pState, &pNextPoint->key, (void**)&pNextPoint->pResPos, &curVLen, code = pAggSup->stateStore.streamStateFillAddIfNotExist(pState, &pNextPoint->key, (void**)&pNextPoint->pResPos,
pWinCode); &curVLen, pWinCode);
QUERY_CHECK_CODE(code, lino, _end); QUERY_CHECK_CODE(code, lino, _end);
setPointBuff(pNextPoint, pFillSup); setPointBuff(pNextPoint, pFillSup);
@ -800,8 +803,8 @@ static int32_t getPointInfoFromState(SStreamAggSupporter* pAggSup, SStreamFillSu
pCurPoint->key.ts = ts; pCurPoint->key.ts = ts;
int32_t curVLen = 0; int32_t curVLen = 0;
code = code = pAggSup->stateStore.streamStateFillAddIfNotExist(pState, &pCurPoint->key, (void**)&pCurPoint->pResPos,
pAggSup->stateStore.streamStateFillGet(pState, &pCurPoint->key, (void**)&pCurPoint->pResPos, &curVLen, pWinCode); &curVLen, pWinCode);
QUERY_CHECK_CODE(code, lino, _end); QUERY_CHECK_CODE(code, lino, _end);
setPointBuff(pCurPoint, pFillSup); setPointBuff(pCurPoint, pFillSup);
@ -828,8 +831,8 @@ static int32_t getPointInfoFromState(SStreamAggSupporter* pAggSup, SStreamFillSu
} else { } else {
pNextPoint->key.ts = taosTimeAdd(pCurPoint->key.ts, pFillSup->interval.sliding, pFillSup->interval.slidingUnit, pNextPoint->key.ts = taosTimeAdd(pCurPoint->key.ts, pFillSup->interval.sliding, pFillSup->interval.slidingUnit,
pFillSup->interval.precision); pFillSup->interval.precision);
code = pAggSup->stateStore.streamStateFillGet(pState, &pNextPoint->key, (void**)&pNextPoint->pResPos, &nextVLen, code = pAggSup->stateStore.streamStateFillAddIfNotExist(pState, &pNextPoint->key, (void**)&pNextPoint->pResPos,
&tmpRes); &nextVLen, &tmpRes);
QUERY_CHECK_CODE(code, lino, _end); QUERY_CHECK_CODE(code, lino, _end);
setPointBuff(pNextPoint, pFillSup); setPointBuff(pNextPoint, pFillSup);
if (tmpRes != TSDB_CODE_SUCCESS) { if (tmpRes != TSDB_CODE_SUCCESS) {
@ -1021,6 +1024,30 @@ static void transBlockToResultRow(const SSDataBlock* pBlock, int32_t rowId, TSKE
pRowVal->key = ts; pRowVal->key = ts;
} }
static int32_t saveTimeSliceWinResultInfo(SStreamAggSupporter* pAggSup, int8_t trigger, SWinKey* pKey,
SSHashObj* pUpdatedMap, bool needDel, SSHashObj* pDeletedMap) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
if (trigger == STREAM_TRIGGER_AT_ONCE) {
code = saveTimeSliceWinResult(pKey, pUpdatedMap);
QUERY_CHECK_CODE(code, lino, _end);
if (needDel) {
code = saveTimeSliceWinResult(pKey, pDeletedMap);
QUERY_CHECK_CODE(code, lino, _end);
}
} else if (trigger == STREAM_TRIGGER_FORCE_WINDOW_CLOSE) {
code = pAggSup->stateStore.streamStateGroupPut(pAggSup->pState, pKey->groupId, NULL, 0);
QUERY_CHECK_CODE(code, lino, _end);
}
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
return code;
}
static void doStreamTimeSliceImpl(SOperatorInfo* pOperator, SSDataBlock* pBlock) { static void doStreamTimeSliceImpl(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0; int32_t lino = 0;
@ -1088,10 +1115,9 @@ static void doStreamTimeSliceImpl(SOperatorInfo* pOperator, SSDataBlock* pBlock)
right = needAdjustValue(&curPoint, tsCols[startPos], false, pFillSup->type); right = needAdjustValue(&curPoint, tsCols[startPos], false, pFillSup->type);
if (right) { if (right) {
transBlockToResultRow(pBlock, startPos, tsCols[startPos], curPoint.pRightRow); transBlockToResultRow(pBlock, startPos, tsCols[startPos], curPoint.pRightRow);
saveTimeSliceWinResult(&curPoint.key, pInfo->pUpdatedMap); bool needDel = pInfo->destHasPrimaryKey && winCode == TSDB_CODE_SUCCESS;
if (pInfo->destHasPrimaryKey && winCode == TSDB_CODE_SUCCESS) { saveTimeSliceWinResultInfo(pAggSup, pInfo->twAggSup.calTrigger, &curPoint.key, pInfo->pUpdatedMap, needDel,
saveTimeSliceWinResult(&curPoint.key, pInfo->pDeletedMap); pInfo->pDeletedMap);
}
} }
releaseOutputBuf(pAggSup->pState, curPoint.pResPos, &pAggSup->stateStore); releaseOutputBuf(pAggSup->pState, curPoint.pResPos, &pAggSup->stateStore);
@ -1103,10 +1129,9 @@ static void doStreamTimeSliceImpl(SOperatorInfo* pOperator, SSDataBlock* pBlock)
left = needAdjustValue(&nextPoint, tsCols[leftRowId], true, pFillSup->type); left = needAdjustValue(&nextPoint, tsCols[leftRowId], true, pFillSup->type);
if (left) { if (left) {
transBlockToResultRow(pBlock, leftRowId, tsCols[leftRowId], nextPoint.pLeftRow); transBlockToResultRow(pBlock, leftRowId, tsCols[leftRowId], nextPoint.pLeftRow);
saveTimeSliceWinResult(&nextPoint.key, pInfo->pUpdatedMap); bool needDel = pInfo->destHasPrimaryKey && winCode == TSDB_CODE_SUCCESS;
if (pInfo->destHasPrimaryKey && winCode == TSDB_CODE_SUCCESS) { saveTimeSliceWinResultInfo(pAggSup, pInfo->twAggSup.calTrigger, &nextPoint.key, pInfo->pUpdatedMap, needDel,
saveTimeSliceWinResult(&curPoint.key, pInfo->pDeletedMap); pInfo->pDeletedMap);
}
} }
releaseOutputBuf(pAggSup->pState, nextPoint.pResPos, &pAggSup->stateStore); releaseOutputBuf(pAggSup->pState, nextPoint.pResPos, &pAggSup->stateStore);
@ -1125,10 +1150,9 @@ static void doStreamTimeSliceImpl(SOperatorInfo* pOperator, SSDataBlock* pBlock)
right = needAdjustValue(&curPoint, tsCols[startPos], false, pFillSup->type); right = needAdjustValue(&curPoint, tsCols[startPos], false, pFillSup->type);
if (right) { if (right) {
transBlockToResultRow(pBlock, startPos, tsCols[startPos], curPoint.pRightRow); transBlockToResultRow(pBlock, startPos, tsCols[startPos], curPoint.pRightRow);
saveTimeSliceWinResult(&curPoint.key, pInfo->pUpdatedMap); bool needDel = pInfo->destHasPrimaryKey && winCode == TSDB_CODE_SUCCESS;
if (pInfo->destHasPrimaryKey && winCode == TSDB_CODE_SUCCESS) { saveTimeSliceWinResultInfo(pAggSup, pInfo->twAggSup.calTrigger, &curPoint.key, pInfo->pUpdatedMap, needDel,
saveTimeSliceWinResult(&curPoint.key, pInfo->pDeletedMap); pInfo->pDeletedMap);
}
} }
releaseOutputBuf(pAggSup->pState, curPoint.pResPos, &pAggSup->stateStore); releaseOutputBuf(pAggSup->pState, curPoint.pResPos, &pAggSup->stateStore);
} }
@ -1162,13 +1186,11 @@ void doBuildTimeSlicePointResult(SStreamAggSupporter* pAggSup, SStreamFillSuppor
SSlicePoint curPoint = {.key.ts = pKey->ts, .key.groupId = pKey->groupId}; SSlicePoint curPoint = {.key.ts = pKey->ts, .key.groupId = pKey->groupId};
SSlicePoint prevPoint = {0}; SSlicePoint prevPoint = {0};
SSlicePoint nextPoint = {0}; SSlicePoint nextPoint = {0};
int32_t winCode = TSDB_CODE_SUCCESS;
if (pFillSup->type != TSDB_FILL_LINEAR) { if (pFillSup->type != TSDB_FILL_LINEAR) {
code = getResultInfoFromState(pAggSup, pFillSup, pKey->ts, pKey->groupId, &curPoint, &prevPoint, &nextPoint, code = getResultInfoFromState(pAggSup, pFillSup, pKey->ts, pKey->groupId, &curPoint, &prevPoint, &nextPoint);
&winCode);
} else { } else {
code = getLinearResultInfoFromState(pAggSup, pFillSup, pKey->ts, pKey->groupId, &curPoint, &prevPoint, &nextPoint, code =
&winCode); getLinearResultInfoFromState(pAggSup, pFillSup, pKey->ts, pKey->groupId, &curPoint, &prevPoint, &nextPoint);
} }
QUERY_CHECK_CODE(code, lino, _end); QUERY_CHECK_CODE(code, lino, _end);
@ -1291,6 +1313,28 @@ _end:
return code; return code;
} }
static int32_t setAllResultKey(SStreamAggSupporter* pAggSup, TSKEY ts, SSHashObj* pUpdatedMap) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
int64_t groupId = 0;
SStreamStateCur* pCur = pAggSup->stateStore.streamStateGroupGetCur(pAggSup->pState);
int32_t winCode = pAggSup->stateStore.streamStateGroupGetKVByCur(pCur, &groupId, NULL, NULL);
if (winCode != TSDB_CODE_SUCCESS) {
goto _end;
}
SWinKey key = {.ts = ts, .groupId = groupId};
code = saveTimeSliceWinResult(&key, pUpdatedMap);
QUERY_CHECK_CODE(code, lino, _end);
pAggSup->stateStore.streamStateGroupCurNext(pCur);
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
return code;
}
static int32_t doStreamTimeSliceNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) { static int32_t doStreamTimeSliceNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0; int32_t lino = 0;
@ -1376,6 +1420,10 @@ static int32_t doStreamTimeSliceNext(SOperatorInfo* pOperator, SSDataBlock** ppR
(*ppRes) = pBlock; (*ppRes) = pBlock;
goto _end; goto _end;
} break; } break;
case STREAM_GET_RESULT: {
setAllResultKey(pAggSup, pBlock->info.window.skey, pInfo->pUpdatedMap);
continue;
}
default: default:
ASSERTS(false, "invalid SSDataBlock type"); ASSERTS(false, "invalid SSDataBlock type");
} }

View File

@ -762,6 +762,7 @@ tag_def_or_ref_opt(A) ::= TAGS NK_LP column_stream_def_list(B) NK_RP.
stream_options(A) ::= . { A = createStreamOptions(pCxt); } stream_options(A) ::= . { A = createStreamOptions(pCxt); }
stream_options(A) ::= stream_options(B) TRIGGER AT_ONCE(C). { A = setStreamOptions(pCxt, B, SOPT_TRIGGER_TYPE_SET, &C, NULL); } stream_options(A) ::= stream_options(B) TRIGGER AT_ONCE(C). { A = setStreamOptions(pCxt, B, SOPT_TRIGGER_TYPE_SET, &C, NULL); }
stream_options(A) ::= stream_options(B) TRIGGER WINDOW_CLOSE(C). { A = setStreamOptions(pCxt, B, SOPT_TRIGGER_TYPE_SET, &C, NULL); } stream_options(A) ::= stream_options(B) TRIGGER WINDOW_CLOSE(C). { A = setStreamOptions(pCxt, B, SOPT_TRIGGER_TYPE_SET, &C, NULL); }
stream_options(A) ::= stream_options(B) TRIGGER FORCE_WINDOW_CLOSE(C). { A = setStreamOptions(pCxt, B, SOPT_TRIGGER_TYPE_SET, &C, NULL); }
stream_options(A) ::= stream_options(B) TRIGGER MAX_DELAY(C) duration_literal(D). { A = setStreamOptions(pCxt, B, SOPT_TRIGGER_TYPE_SET, &C, releaseRawExprNode(pCxt, D)); } stream_options(A) ::= stream_options(B) TRIGGER MAX_DELAY(C) duration_literal(D). { A = setStreamOptions(pCxt, B, SOPT_TRIGGER_TYPE_SET, &C, releaseRawExprNode(pCxt, D)); }
stream_options(A) ::= stream_options(B) WATERMARK duration_literal(C). { A = setStreamOptions(pCxt, B, SOPT_WATERMARK_SET, NULL, releaseRawExprNode(pCxt, C)); } stream_options(A) ::= stream_options(B) WATERMARK duration_literal(C). { A = setStreamOptions(pCxt, B, SOPT_WATERMARK_SET, NULL, releaseRawExprNode(pCxt, C)); }
stream_options(A) ::= stream_options(B) IGNORE EXPIRED NK_INTEGER(C). { A = setStreamOptions(pCxt, B, SOPT_IGNORE_EXPIRED_SET, &C, NULL); } stream_options(A) ::= stream_options(B) IGNORE EXPIRED NK_INTEGER(C). { A = setStreamOptions(pCxt, B, SOPT_IGNORE_EXPIRED_SET, &C, NULL); }

View File

@ -2953,6 +2953,8 @@ static int8_t getTriggerType(uint32_t tokenType) {
return STREAM_TRIGGER_WINDOW_CLOSE; return STREAM_TRIGGER_WINDOW_CLOSE;
case TK_MAX_DELAY: case TK_MAX_DELAY:
return STREAM_TRIGGER_MAX_DELAY; return STREAM_TRIGGER_MAX_DELAY;
case TK_FORCE_WINDOW_CLOSE:
return STREAM_TRIGGER_FORCE_WINDOW_CLOSE;
default: default:
break; break;
} }

View File

@ -335,6 +335,7 @@ static SKeyword keywordTable[] = {
{"LEVEL", TK_LEVEL}, {"LEVEL", TK_LEVEL},
{"ARBGROUPS", TK_ARBGROUPS}, {"ARBGROUPS", TK_ARBGROUPS},
{"IS_IMPORT", TK_IS_IMPORT}, {"IS_IMPORT", TK_IS_IMPORT},
{"FORCE_WINDOW_CLOSE", TK_FORCE_WINDOW_CLOSE},
}; };
// clang-format on // clang-format on

View File

@ -10291,7 +10291,7 @@ static int32_t checkStreamQuery(STranslateContext* pCxt, SCreateStreamStmt* pStm
"SUBTABLE expression must not has column when no partition by clause"); "SUBTABLE expression must not has column when no partition by clause");
} }
if (NULL == pSelect->pWindow && STREAM_TRIGGER_AT_ONCE != pStmt->pOptions->triggerType) { if (NULL == pSelect->pWindow && !pSelect->hasInterpFunc && STREAM_TRIGGER_AT_ONCE != pStmt->pOptions->triggerType) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY,
"The trigger mode of non window query can only be AT_ONCE"); "The trigger mode of non window query can only be AT_ONCE");
} }

File diff suppressed because it is too large Load Diff

View File

@ -211,6 +211,8 @@ SStreamStateCur* streamStateFillSeekToLast_rocksdb(SStreamState* pState);
// partag cf // partag cf
int32_t streamStatePutParTag_rocksdb(SStreamState* pState, int64_t groupId, const void* tag, int32_t tagLen); int32_t streamStatePutParTag_rocksdb(SStreamState* pState, int64_t groupId, const void* tag, int32_t tagLen);
int32_t streamStateGetParTag_rocksdb(SStreamState* pState, int64_t groupId, void** tagVal, int32_t* tagLen); int32_t streamStateGetParTag_rocksdb(SStreamState* pState, int64_t groupId, void** tagVal, int32_t* tagLen);
void streamStateParTagSeekKeyNext_rocksdb(SStreamState* pState, const int64_t groupId, SStreamStateCur* pCur);
int32_t streamStateParTagGetKVByCur_rocksdb(SStreamStateCur* pCur, int64_t* pGroupId, const void** pVal, int32_t* pVLen);
// parname cf // parname cf
int32_t streamStatePutParName_rocksdb(SStreamState* pState, int64_t groupId, const char tbname[TSDB_TABLE_NAME_LEN]); int32_t streamStatePutParName_rocksdb(SStreamState* pState, int64_t groupId, const char tbname[TSDB_TABLE_NAME_LEN]);

View File

@ -4166,7 +4166,6 @@ _end:
return res; return res;
} }
#ifdef BUILD_NO_CALL
// partag cf // partag cf
int32_t streamStatePutParTag_rocksdb(SStreamState* pState, int64_t groupId, const void* tag, int32_t tagLen) { int32_t streamStatePutParTag_rocksdb(SStreamState* pState, int64_t groupId, const void* tag, int32_t tagLen) {
int code = 0; int code = 0;
@ -4174,6 +4173,60 @@ int32_t streamStatePutParTag_rocksdb(SStreamState* pState, int64_t groupId, cons
return code; return code;
} }
void streamStateParTagSeekKeyNext_rocksdb(SStreamState* pState, const int64_t groupId, SStreamStateCur* pCur) {
if (pCur == NULL) {
return ;
}
STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
pCur->number = pState->number;
pCur->db = wrapper->db;
pCur->iter = streamStateIterCreate(pState, "partag", (rocksdb_snapshot_t**)&pCur->snapshot,
(rocksdb_readoptions_t**)&pCur->readOpt);
int i = streamStateGetCfIdx(pState, "partag");
if (i < 0) {
stError("streamState failed to put to cf name:%s", "partag");
return ;
}
char buf[128] = {0};
int32_t klen = ginitDict[i].enFunc((void*)&groupId, buf);
if (!streamStateIterSeekAndValid(pCur->iter, buf, klen)) {
streamStateFreeCur(pCur);
return ;
}
// skip ttl expired data
while (rocksdb_iter_valid(pCur->iter) && iterValueIsStale(pCur->iter)) {
rocksdb_iter_next(pCur->iter);
}
}
int32_t streamStateParTagGetKVByCur_rocksdb(SStreamStateCur* pCur, int64_t* pGroupId, const void** pVal, int32_t* pVLen) {
stDebug("streamStateFillGetKVByCur_rocksdb");
if (!pCur) {
return -1;
}
SWinKey winKey;
if (!rocksdb_iter_valid(pCur->iter) || iterValueIsStale(pCur->iter)) {
return -1;
}
size_t klen, vlen;
char* keyStr = (char*)rocksdb_iter_key(pCur->iter, &klen);
(void)parKeyDecode(pGroupId, keyStr);
if (pVal) {
const char* valStr = rocksdb_iter_value(pCur->iter, &vlen);
int32_t len = valueDecode((void*)valStr, vlen, NULL, (char**)pVal);
if (len < 0) {
return -1;
}
if (pVLen != NULL) *pVLen = len;
}
return 0;
}
#ifdef BUILD_NO_CALL
int32_t streamStateGetParTag_rocksdb(SStreamState* pState, int64_t groupId, void** tagVal, int32_t* tagLen) { int32_t streamStateGetParTag_rocksdb(SStreamState* pState, int64_t groupId, void** tagVal, int32_t* tagLen) {
int code = 0; int code = 0;
STREAM_STATE_GET_ROCKSDB(pState, "partag", &groupId, tagVal, tagLen); STREAM_STATE_GET_ROCKSDB(pState, "partag", &groupId, tagVal, tagLen);

View File

@ -73,7 +73,7 @@ bool inSessionWindow(SSessionKey* pKey, TSKEY ts, int64_t gap) {
return false; return false;
} }
SStreamStateCur* createSessionStateCursor(SStreamFileState* pFileState) { SStreamStateCur* createStateCursor(SStreamFileState* pFileState) {
SStreamStateCur* pCur = createStreamStateCursor(); SStreamStateCur* pCur = createStreamStateCursor();
pCur->pStreamFileState = pFileState; pCur->pStreamFileState = pFileState;
return pCur; return pCur;
@ -527,7 +527,7 @@ static SStreamStateCur* seekKeyCurrentPrev_buff(SStreamFileState* pFileState, co
} }
if (index >= 0) { if (index >= 0) {
pCur = createSessionStateCursor(pFileState); pCur = createStateCursor(pFileState);
pCur->buffIndex = index; pCur->buffIndex = index;
if (pIndex) { if (pIndex) {
*pIndex = index; *pIndex = index;
@ -568,7 +568,7 @@ static void checkAndTransformCursor(SStreamFileState* pFileState, const uint64_t
if (taosArrayGetSize(pWinStates) > 0 && if (taosArrayGetSize(pWinStates) > 0 &&
(code == TSDB_CODE_FAILED || sessionStateKeyCompare(&key, pWinStates, 0) >= 0)) { (code == TSDB_CODE_FAILED || sessionStateKeyCompare(&key, pWinStates, 0) >= 0)) {
if (!(*ppCur)) { if (!(*ppCur)) {
(*ppCur) = createSessionStateCursor(pFileState); (*ppCur) = createStateCursor(pFileState);
} }
transformCursor(pFileState, *ppCur); transformCursor(pFileState, *ppCur);
} else if (*ppCur) { } else if (*ppCur) {
@ -628,7 +628,7 @@ SStreamStateCur* countWinStateSeekKeyPrev(SStreamFileState* pFileState, const SS
} }
pBuffCur->buffIndex = 0; pBuffCur->buffIndex = 0;
} else if (taosArrayGetSize(pWinStates) > 0) { } else if (taosArrayGetSize(pWinStates) > 0) {
pBuffCur = createSessionStateCursor(pFileState); pBuffCur = createStateCursor(pFileState);
pBuffCur->buffIndex = 0; pBuffCur->buffIndex = 0;
} }

View File

@ -32,7 +32,7 @@ int32_t getHashSortRowBuff(SStreamFileState* pFileState, const SWinKey* pKey, vo
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0; int32_t lino = 0;
code = getRowBuff(pFileState, (void*)pKey, sizeof(SWinKey), pVal, pVLen, pWinCode); code = addRowBuffIfNotExist(pFileState, (void*)pKey, sizeof(SWinKey), pVal, pVLen, pWinCode);
QUERY_CHECK_CODE(code, lino, _end); QUERY_CHECK_CODE(code, lino, _end);
SArray* pWinStates = NULL; SArray* pWinStates = NULL;

View File

@ -128,10 +128,8 @@ SStreamState* streamStateOpen(const char* path, void* pTask, int64_t streamId, i
pState->pFileState = NULL; pState->pFileState = NULL;
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT); _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT);
pState->parNameMap = tSimpleHashInit(1024, hashFn); pState->parNameMap = tSimpleHashInit(1024, hashFn);
if (!pState->parNameMap) { QUERY_CHECK_NULL(pState->parNameMap, code, lino, _end, terrno);
code = TSDB_CODE_OUT_OF_MEMORY;
QUERY_CHECK_CODE(code, lino, _end);
}
stInfo("open state %p on backend %p 0x%" PRIx64 "-%d succ", pState, pMeta->streamBackend, pState->streamId, stInfo("open state %p on backend %p 0x%" PRIx64 "-%d succ", pState, pMeta->streamBackend, pState->streamId,
pState->taskId); pState->taskId);
return pState; return pState;
@ -201,14 +199,10 @@ _end:
return code; return code;
} }
// todo refactor int32_t streamStatePut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) { return 0; }
int32_t streamStatePut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) {
return 0;
// return streamStatePut_rocksdb(pState, key, value, vLen);
}
int32_t streamStateGet(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen, int32_t* pWinCode) { int32_t streamStateGet(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen, int32_t* pWinCode) {
return getRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey), pVal, pVLen, pWinCode); return addRowBuffIfNotExist(pState->pFileState, (void*)key, sizeof(SWinKey), pVal, pVLen, pWinCode);
} }
bool streamStateCheck(SStreamState* pState, const SWinKey* key) { bool streamStateCheck(SStreamState* pState, const SWinKey* key) {
@ -221,33 +215,36 @@ int32_t streamStateGetByPos(SStreamState* pState, void* pos, void** pVal) {
return code; return code;
} }
// todo refactor
void streamStateDel(SStreamState* pState, const SWinKey* key) { void streamStateDel(SStreamState* pState, const SWinKey* key) {
deleteRowBuff(pState->pFileState, key, sizeof(SWinKey)); deleteRowBuff(pState->pFileState, key, sizeof(SWinKey));
} }
// todo refactor
int32_t streamStateFillPut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) { int32_t streamStateFillPut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) {
return streamStateFillPut_rocksdb(pState, key, value, vLen); return streamStateFillPut_rocksdb(pState, key, value, vLen);
} }
// todo refactor
int32_t streamStateFillGet(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen, int32_t* pWinCode) { int32_t streamStateFillGet(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen, int32_t* pWinCode) {
if (pState->pFileState) { if (pState->pFileState) {
return getHashSortRowBuff(pState->pFileState, key, pVal, pVLen, pWinCode); return getRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey), pVal, pVLen, pWinCode);
} }
return streamStateFillGet_rocksdb(pState, key, pVal, pVLen); return streamStateFillGet_rocksdb(pState, key, pVal, pVLen);
} }
int32_t streamStateFillGetNext(SStreamState* pState, const SWinKey* pKey, SWinKey* pResKey, void** pVal, int32_t* pVLen, int32_t* pWinCode) { int32_t streamStateFillAddIfNotExist(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen,
int32_t* pWinCode) {
return getHashSortRowBuff(pState->pFileState, key, pVal, pVLen, pWinCode);
}
int32_t streamStateFillGetNext(SStreamState* pState, const SWinKey* pKey, SWinKey* pResKey, void** pVal, int32_t* pVLen,
int32_t* pWinCode) {
return getHashSortNextRow(pState->pFileState, pKey, pResKey, pVal, pVLen, pWinCode); return getHashSortNextRow(pState->pFileState, pKey, pResKey, pVal, pVLen, pWinCode);
} }
int32_t streamStateFillGetPrev(SStreamState* pState, const SWinKey* pKey, SWinKey* pResKey, void** pVal, int32_t* pVLen, int32_t* pWinCode) { int32_t streamStateFillGetPrev(SStreamState* pState, const SWinKey* pKey, SWinKey* pResKey, void** pVal, int32_t* pVLen,
int32_t* pWinCode) {
return getHashSortPrevRow(pState->pFileState, pKey, pResKey, pVal, pVLen, pWinCode); return getHashSortPrevRow(pState->pFileState, pKey, pResKey, pVal, pVLen, pWinCode);
} }
// todo refactor
void streamStateFillDel(SStreamState* pState, const SWinKey* key) { void streamStateFillDel(SStreamState* pState, const SWinKey* key) {
if (pState->pFileState) { if (pState->pFileState) {
deleteHashSortRowBuff(pState->pFileState, key); deleteHashSortRowBuff(pState->pFileState, key);
@ -295,7 +292,6 @@ int32_t streamStateAddIfNotExist(SStreamState* pState, const SWinKey* key, void*
} }
void streamStateReleaseBuf(SStreamState* pState, void* pVal, bool used) { void streamStateReleaseBuf(SStreamState* pState, void* pVal, bool used) {
// todo refactor
if (!pVal) { if (!pVal) {
return; return;
} }
@ -458,7 +454,6 @@ int32_t streamStateSessionAddIfNotExist(SStreamState* pState, SSessionKey* key,
int32_t streamStateStateAddIfNotExist(SStreamState* pState, SSessionKey* key, char* pKeyData, int32_t keyDataLen, int32_t streamStateStateAddIfNotExist(SStreamState* pState, SSessionKey* key, char* pKeyData, int32_t keyDataLen,
state_key_cmpr_fn fn, void** pVal, int32_t* pVLen, int32_t* pWinCode) { state_key_cmpr_fn fn, void** pVal, int32_t* pVLen, int32_t* pWinCode) {
// todo refactor
return getStateWinResultBuff(pState->pFileState, key, pKeyData, keyDataLen, fn, pVal, pVLen, pWinCode); return getStateWinResultBuff(pState->pFileState, key, pKeyData, keyDataLen, fn, pVal, pVLen, pWinCode);
} }
@ -556,3 +551,25 @@ int32_t streamStateCountWinAddIfNotExist(SStreamState* pState, SSessionKey* pKey
int32_t streamStateCountWinAdd(SStreamState* pState, SSessionKey* pKey, void** pVal, int32_t* pVLen) { int32_t streamStateCountWinAdd(SStreamState* pState, SSessionKey* pKey, void** pVal, int32_t* pVLen) {
return createCountWinResultBuff(pState->pFileState, pKey, pVal, pVLen); return createCountWinResultBuff(pState->pFileState, pKey, pVal, pVLen);
} }
int32_t streamStateGroupPut(SStreamState* pState, int64_t groupId, void* value, int32_t vLen) {
return streamFileStateGroupPut(pState->pFileState, groupId, value, vLen);
}
SStreamStateCur* streamStateGroupGetCur(SStreamState* pState) {
SStreamStateCur* pCur = createStateCursor(pState->pFileState);
pCur->hashIter = 0;
pCur->pHashData = NULL;
return pCur;
}
void streamStateGroupCurNext(SStreamStateCur* pCur) {
streamFileStateGroupCurNext(pCur);
}
int32_t streamStateGroupGetKVByCur(SStreamStateCur* pCur, int64_t* pKey, void** pVal, int32_t* pVLen) {
if (pVal != NULL) {
return -1;
}
return streamFileStateGroupGetKVByCur(pCur, pKey, pVal, pVLen);
}

View File

@ -29,6 +29,7 @@
#define MIN_NUM_OF_RECOVER_ROW_BUFF 128 #define MIN_NUM_OF_RECOVER_ROW_BUFF 128
#define MIN_NUM_SEARCH_BUCKET 128 #define MIN_NUM_SEARCH_BUCKET 128
#define MAX_ARRAY_SIZE 1024 #define MAX_ARRAY_SIZE 1024
#define MAX_GROUP_ID_NUM 200000
#define TASK_KEY "streamFileState" #define TASK_KEY "streamFileState"
#define STREAM_STATE_INFO_NAME "StreamStateCheckPoint" #define STREAM_STATE_INFO_NAME "StreamStateCheckPoint"
@ -52,6 +53,7 @@ struct SStreamFileState {
char* id; char* id;
char* cfName; char* cfName;
void* searchBuff; void* searchBuff;
SSHashObj* pGroupIdMap;
_state_buff_cleanup_fn stateBuffCleanupFn; _state_buff_cleanup_fn stateBuffCleanupFn;
_state_buff_remove_fn stateBuffRemoveFn; _state_buff_remove_fn stateBuffRemoveFn;
@ -134,17 +136,19 @@ static void streamFileStateEncode(TSKEY* pKey, void** pVal, int32_t* pLen) {
SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_t rowSize, uint32_t selectRowSize, SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_t rowSize, uint32_t selectRowSize,
GetTsFun fp, void* pFile, TSKEY delMark, const char* taskId, int64_t checkpointId, GetTsFun fp, void* pFile, TSKEY delMark, const char* taskId, int64_t checkpointId,
int8_t type) { int8_t type) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
if (memSize <= 0) { if (memSize <= 0) {
memSize = DEFAULT_MAX_STREAM_BUFFER_SIZE; memSize = DEFAULT_MAX_STREAM_BUFFER_SIZE;
} }
if (rowSize == 0) { if (rowSize == 0) {
goto _error; code = TSDB_CODE_INVALID_PARA;
QUERY_CHECK_CODE(code, lino, _end);
} }
SStreamFileState* pFileState = taosMemoryCalloc(1, sizeof(SStreamFileState)); SStreamFileState* pFileState = taosMemoryCalloc(1, sizeof(SStreamFileState));
if (!pFileState) { QUERY_CHECK_NULL(pFileState, code, lino, _end, terrno);
goto _error;
}
rowSize += selectRowSize; rowSize += selectRowSize;
pFileState->maxRowCount = TMAX((uint64_t)memSize / rowSize, FLUSH_NUM * 2); pFileState->maxRowCount = TMAX((uint64_t)memSize / rowSize, FLUSH_NUM * 2);
pFileState->usedBuffs = tdListNew(POINTER_BYTES); pFileState->usedBuffs = tdListNew(POINTER_BYTES);
@ -161,7 +165,7 @@ SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_
pFileState->stateFileRemoveFn = intervalFileRemoveFn; pFileState->stateFileRemoveFn = intervalFileRemoveFn;
pFileState->stateFileGetFn = intervalFileGetFn; pFileState->stateFileGetFn = intervalFileGetFn;
pFileState->cfName = taosStrdup("state"); pFileState->cfName = taosStrdup("state");
pFileState->stateFunctionGetFn = getRowBuff; pFileState->stateFunctionGetFn = addRowBuffIfNotExist;
} else if (type == STREAM_STATE_BUFF_SORT) { } else if (type == STREAM_STATE_BUFF_SORT) {
pFileState->rowStateBuff = tSimpleHashInit(cap, hashFn); pFileState->rowStateBuff = tSimpleHashInit(cap, hashFn);
pFileState->stateBuffCleanupFn = sessionWinStateCleanup; pFileState->stateBuffCleanupFn = sessionWinStateCleanup;
@ -176,6 +180,7 @@ SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_
} else if (type == STREAM_STATE_BUFF_HASH_SORT) { } else if (type == STREAM_STATE_BUFF_HASH_SORT) {
pFileState->rowStateBuff = tSimpleHashInit(cap, hashFn); pFileState->rowStateBuff = tSimpleHashInit(cap, hashFn);
pFileState->searchBuff = tSimpleHashInit(MIN_NUM_SEARCH_BUCKET, hashFn); pFileState->searchBuff = tSimpleHashInit(MIN_NUM_SEARCH_BUCKET, hashFn);
QUERY_CHECK_NULL(pFileState->searchBuff, code, lino, _end, terrno);
pFileState->stateBuffCleanupFn = stateHashBuffCleanupFn; pFileState->stateBuffCleanupFn = stateHashBuffCleanupFn;
pFileState->stateBuffRemoveFn = stateHashBuffRemoveFn; pFileState->stateBuffRemoveFn = stateHashBuffRemoveFn;
pFileState->stateBuffRemoveByPosFn = stateHashBuffRemoveByPosFn; pFileState->stateBuffRemoveByPosFn = stateHashBuffRemoveByPosFn;
@ -187,9 +192,10 @@ SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_
pFileState->stateFunctionGetFn = NULL; pFileState->stateFunctionGetFn = NULL;
} }
if (!pFileState->usedBuffs || !pFileState->freeBuffs || !pFileState->rowStateBuff) { QUERY_CHECK_NULL(pFileState->usedBuffs, code, lino, _end, terrno);
goto _error; QUERY_CHECK_NULL(pFileState->freeBuffs, code, lino, _end, terrno);
} QUERY_CHECK_NULL(pFileState->rowStateBuff, code, lino, _end, terrno);
QUERY_CHECK_NULL(pFileState->cfName, code, lino, _end, terrno);
pFileState->keyLen = keySize; pFileState->keyLen = keySize;
pFileState->rowSize = rowSize; pFileState->rowSize = rowSize;
@ -203,6 +209,10 @@ SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_
pFileState->flushMark = INT64_MIN; pFileState->flushMark = INT64_MIN;
pFileState->maxTs = INT64_MIN; pFileState->maxTs = INT64_MIN;
pFileState->id = taosStrdup(taskId); pFileState->id = taosStrdup(taskId);
QUERY_CHECK_NULL(pFileState->id, code, lino, _end, terrno);
pFileState->pGroupIdMap = tSimpleHashInit(1024, hashFn);
QUERY_CHECK_NULL(pFileState->pGroupIdMap, code, lino, _end, terrno);
// todo(liuyao) optimize // todo(liuyao) optimize
if (type == STREAM_STATE_BUFF_HASH) { if (type == STREAM_STATE_BUFF_HASH) {
@ -215,18 +225,21 @@ SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_
void* valBuf = NULL; void* valBuf = NULL;
int32_t len = 0; int32_t len = 0;
int32_t code = streamDefaultGet_rocksdb(pFileState->pFileStore, STREAM_STATE_INFO_NAME, &valBuf, &len); int32_t tmpRes = streamDefaultGet_rocksdb(pFileState->pFileStore, STREAM_STATE_INFO_NAME, &valBuf, &len);
if (code == TSDB_CODE_SUCCESS) { if (tmpRes == TSDB_CODE_SUCCESS) {
ASSERT(len == sizeof(TSKEY)); ASSERT(len == sizeof(TSKEY));
streamFileStateDecode(&pFileState->flushMark, valBuf, len); streamFileStateDecode(&pFileState->flushMark, valBuf, len);
qDebug("===stream===flushMark read:%" PRId64, pFileState->flushMark); qDebug("===stream===flushMark read:%" PRId64, pFileState->flushMark);
} }
taosMemoryFreeClear(valBuf); taosMemoryFreeClear(valBuf);
return pFileState;
_error: _end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
streamFileStateDestroy(pFileState); streamFileStateDestroy(pFileState);
return NULL; pFileState = NULL;
}
return pFileState;
} }
void destroyRowBuffPos(SRowBuffPos* pPos) { void destroyRowBuffPos(SRowBuffPos* pPos) {
@ -430,7 +443,6 @@ int32_t flushRowBuff(SStreamFileState* pFileState) {
clearFlushedRowBuff(pFileState, pFlushList, pFileState->curRowCount, true); clearFlushedRowBuff(pFileState, pFlushList, pFileState->curRowCount, true);
} }
flushSnapshot(pFileState, pFlushList, false); flushSnapshot(pFileState, pFlushList, false);
SListIter fIter = {0}; SListIter fIter = {0};
@ -548,7 +560,7 @@ _error:
return NULL; return NULL;
} }
int32_t getRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen, int32_t addRowBuffIfNotExist(SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen,
int32_t* pWinCode) { int32_t* pWinCode) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0; int32_t lino = 0;
@ -992,3 +1004,99 @@ int32_t recoverFillSnapshot(SStreamFileState* pFileState, int64_t ckId) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t getRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen,
int32_t* pWinCode) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
(*pWinCode) = TSDB_CODE_FAILED;
pFileState->maxTs = TMAX(pFileState->maxTs, pFileState->getTs(pKey));
SRowBuffPos** pos = tSimpleHashGet(pFileState->rowStateBuff, pKey, keyLen);
if (pos) {
*pVLen = pFileState->rowSize;
*pVal = *pos;
(*pos)->beUsed = true;
(*pos)->beFlushed = false;
(*pWinCode) = TSDB_CODE_SUCCESS;
}
TSKEY ts = pFileState->getTs(pKey);
if (!isDeteled(pFileState, ts) && isFlushedState(pFileState, ts, 0)) {
int32_t len = 0;
void* p = NULL;
(*pWinCode) = pFileState->stateFileGetFn(pFileState, pKey, &p, &len);
qDebug("===stream===get %" PRId64 " from disc, res %d", ts, (*pWinCode));
if ((*pWinCode) == TSDB_CODE_SUCCESS) {
SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
if (!pNewPos || !pNewPos->pRowBuff) {
code = TSDB_CODE_OUT_OF_MEMORY;
QUERY_CHECK_CODE(code, lino, _end);
}
memcpy(pNewPos->pKey, pKey, keyLen);
memcpy(pNewPos->pRowBuff, p, len);
code = tSimpleHashPut(pFileState->rowStateBuff, pKey, keyLen, &pNewPos, POINTER_BYTES);
QUERY_CHECK_CODE(code, lino, _end);
if (pVal) {
*pVLen = pFileState->rowSize;
*pVal = pNewPos;
}
}
taosMemoryFree(p);
}
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
return code;
}
int32_t streamFileStateGroupPut(SStreamFileState* pFileState, int64_t groupId, void* value, int32_t vLen) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
if (value != NULL) {
code = TSDB_CODE_INVALID_PARA;
QUERY_CHECK_CODE(code, lino, _end);
}
if (tSimpleHashGet(pFileState->pGroupIdMap, &groupId, sizeof(int64_t)) == NULL) {
if (tSimpleHashGetSize(pFileState->pGroupIdMap) <= MAX_GROUP_ID_NUM) {
code = tSimpleHashPut(pFileState->pGroupIdMap, &groupId, sizeof(int64_t), NULL, 0);
QUERY_CHECK_CODE(code, lino, _end);
}
code = streamStatePutParTag_rocksdb(pFileState->pFileStore, groupId, value, vLen);
QUERY_CHECK_CODE(code, lino, _end);
}
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
return code;
}
void streamFileStateGroupCurNext(SStreamStateCur* pCur) {
SStreamFileState* pFileState = (SStreamFileState*)pCur->pStreamFileState;
if (pCur->hashIter == -1) {
streamStateCurNext(pFileState->pFileStore, pCur);
}
SSHashObj* pHash = pFileState->pGroupIdMap;
pCur->pHashData = tSimpleHashIterate(pHash, pCur->pHashData, &pCur->hashIter);
if (!pCur->pHashData) {
pCur->hashIter = -1;
streamStateParTagSeekKeyNext_rocksdb(pFileState->pFileStore, pCur->minGpId, pCur);
}
int64_t gpId = *(int64_t*)tSimpleHashGetKey(pCur->pHashData, NULL);
pCur->minGpId = TMIN(pCur->minGpId, gpId);
}
int32_t streamFileStateGroupGetKVByCur(SStreamStateCur* pCur, int64_t* pKey, void** pVal, int32_t* pVLen) {
int32_t code = TSDB_CODE_SUCCESS;
if (pCur->pHashData) {
*pKey = *(int64_t*)tSimpleHashGetKey(pCur->pHashData, NULL);
return code;
}
return streamStateParTagGetKVByCur_rocksdb(pCur, pKey, NULL, NULL);
}

View File

@ -62,6 +62,8 @@ system sh/exec.sh -n dnode1 -s start
sleep 2000 sleep 2000
run tsim/stream/checkTaskStatus.sim
sql insert into t1 values(1648791213002,3,2,3,1.1); sql insert into t1 values(1648791213002,3,2,3,1.1);
sql insert into t2 values(1648791233003,4,2,3,1.1); sql insert into t2 values(1648791233003,4,2,3,1.1);