enh(stream): internal optimize
This commit is contained in:
parent
ce1b6147a0
commit
035e6b13c8
|
@ -1726,13 +1726,14 @@ typedef struct {
|
||||||
char name[TSDB_STREAM_FNAME_LEN];
|
char name[TSDB_STREAM_FNAME_LEN];
|
||||||
char sourceDB[TSDB_DB_FNAME_LEN];
|
char sourceDB[TSDB_DB_FNAME_LEN];
|
||||||
char targetStbFullName[TSDB_TABLE_FNAME_LEN];
|
char targetStbFullName[TSDB_TABLE_FNAME_LEN];
|
||||||
int8_t igExists;
|
|
||||||
char* sql;
|
char* sql;
|
||||||
char* ast;
|
char* ast;
|
||||||
|
int8_t igExists;
|
||||||
int8_t triggerType;
|
int8_t triggerType;
|
||||||
|
int8_t igExpired;
|
||||||
|
int8_t fillHistory; // process data inserted before creating stream
|
||||||
int64_t maxDelay;
|
int64_t maxDelay;
|
||||||
int64_t watermark;
|
int64_t watermark;
|
||||||
int8_t igExpired;
|
|
||||||
int32_t numOfTags;
|
int32_t numOfTags;
|
||||||
SArray* pTags; // array of SField
|
SArray* pTags; // array of SField
|
||||||
} SCMCreateStreamReq;
|
} SCMCreateStreamReq;
|
||||||
|
|
|
@ -36,6 +36,7 @@ typedef struct SStreamTask SStreamTask;
|
||||||
enum {
|
enum {
|
||||||
STREAM_STATUS__NORMAL = 0,
|
STREAM_STATUS__NORMAL = 0,
|
||||||
STREAM_STATUS__STOP,
|
STREAM_STATUS__STOP,
|
||||||
|
STREAM_STATUS__INIT,
|
||||||
STREAM_STATUS__FAILED,
|
STREAM_STATUS__FAILED,
|
||||||
STREAM_STATUS__RECOVER,
|
STREAM_STATUS__RECOVER,
|
||||||
};
|
};
|
||||||
|
@ -291,6 +292,9 @@ typedef struct SStreamTask {
|
||||||
int64_t recoverSnapVer;
|
int64_t recoverSnapVer;
|
||||||
int64_t startVer;
|
int64_t startVer;
|
||||||
|
|
||||||
|
// fill history
|
||||||
|
int8_t fillHistory;
|
||||||
|
|
||||||
// children info
|
// children info
|
||||||
SArray* childEpInfo; // SArray<SStreamChildEpInfo*>
|
SArray* childEpInfo; // SArray<SStreamChildEpInfo*>
|
||||||
int32_t nextCheckId;
|
int32_t nextCheckId;
|
||||||
|
|
|
@ -17,6 +17,7 @@
|
||||||
|
|
||||||
#include "taosdef.h"
|
#include "taosdef.h"
|
||||||
#include "tarray.h"
|
#include "tarray.h"
|
||||||
|
#include "tcommon.h"
|
||||||
#include "tmsg.h"
|
#include "tmsg.h"
|
||||||
#include "tscalablebf.h"
|
#include "tscalablebf.h"
|
||||||
|
|
||||||
|
@ -24,6 +25,11 @@
|
||||||
extern "C" {
|
extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
typedef struct SUpdateKey {
|
||||||
|
int64_t tbUid;
|
||||||
|
TSKEY ts;
|
||||||
|
} SUpdateKey;
|
||||||
|
|
||||||
typedef struct SUpdateInfo {
|
typedef struct SUpdateInfo {
|
||||||
SArray *pTsBuckets;
|
SArray *pTsBuckets;
|
||||||
uint64_t numBuckets;
|
uint64_t numBuckets;
|
||||||
|
@ -41,6 +47,7 @@ typedef struct SUpdateInfo {
|
||||||
|
|
||||||
SUpdateInfo *updateInfoInitP(SInterval *pInterval, int64_t watermark);
|
SUpdateInfo *updateInfoInitP(SInterval *pInterval, int64_t watermark);
|
||||||
SUpdateInfo *updateInfoInit(int64_t interval, int32_t precision, int64_t watermark);
|
SUpdateInfo *updateInfoInit(int64_t interval, int32_t precision, int64_t watermark);
|
||||||
|
void updateInfoFillBlockData(SUpdateInfo *pInfo, SSDataBlock *pBlock, int32_t primaryTsCol);
|
||||||
bool updateInfoIsUpdated(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts);
|
bool updateInfoIsUpdated(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts);
|
||||||
bool updateInfoIsTableInserted(SUpdateInfo *pInfo, int64_t tbUid);
|
bool updateInfoIsTableInserted(SUpdateInfo *pInfo, int64_t tbUid);
|
||||||
void updateInfoSetScanRange(SUpdateInfo *pInfo, STimeWindow *pWin, uint64_t groupId, uint64_t version);
|
void updateInfoSetScanRange(SUpdateInfo *pInfo, STimeWindow *pWin, uint64_t groupId, uint64_t version);
|
||||||
|
|
|
@ -50,6 +50,9 @@ uint64_t MurmurHash3_64(const char *key, uint32_t len);
|
||||||
uint32_t taosIntHash_32(const char *key, uint32_t len);
|
uint32_t taosIntHash_32(const char *key, uint32_t len);
|
||||||
uint32_t taosIntHash_64(const char *key, uint32_t len);
|
uint32_t taosIntHash_64(const char *key, uint32_t len);
|
||||||
|
|
||||||
|
uint32_t taosFastHash(const char *key, uint32_t len);
|
||||||
|
uint32_t taosDJB2Hash(const char *key, uint32_t len);
|
||||||
|
|
||||||
_hash_fn_t taosGetDefaultHashFunction(int32_t type);
|
_hash_fn_t taosGetDefaultHashFunction(int32_t type);
|
||||||
_equal_fn_t taosGetDefaultEqualFunction(int32_t type);
|
_equal_fn_t taosGetDefaultEqualFunction(int32_t type);
|
||||||
|
|
||||||
|
|
|
@ -613,6 +613,7 @@ typedef struct {
|
||||||
// config
|
// config
|
||||||
int8_t igExpired;
|
int8_t igExpired;
|
||||||
int8_t trigger;
|
int8_t trigger;
|
||||||
|
int8_t fillHistory;
|
||||||
int64_t triggerParam;
|
int64_t triggerParam;
|
||||||
int64_t watermark;
|
int64_t watermark;
|
||||||
// source and target
|
// source and target
|
||||||
|
|
|
@ -18,6 +18,7 @@
|
||||||
#include "mndConsumer.h"
|
#include "mndConsumer.h"
|
||||||
|
|
||||||
int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) {
|
int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) {
|
||||||
|
if (tStartEncode(pEncoder) < 0) return -1;
|
||||||
if (tEncodeCStr(pEncoder, pObj->name) < 0) return -1;
|
if (tEncodeCStr(pEncoder, pObj->name) < 0) return -1;
|
||||||
|
|
||||||
if (tEncodeI64(pEncoder, pObj->createTime) < 0) return -1;
|
if (tEncodeI64(pEncoder, pObj->createTime) < 0) return -1;
|
||||||
|
@ -31,6 +32,7 @@ int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) {
|
||||||
|
|
||||||
if (tEncodeI8(pEncoder, pObj->igExpired) < 0) return -1;
|
if (tEncodeI8(pEncoder, pObj->igExpired) < 0) return -1;
|
||||||
if (tEncodeI8(pEncoder, pObj->trigger) < 0) return -1;
|
if (tEncodeI8(pEncoder, pObj->trigger) < 0) return -1;
|
||||||
|
if (tEncodeI8(pEncoder, pObj->fillHistory) < 0) return -1;
|
||||||
if (tEncodeI64(pEncoder, pObj->triggerParam) < 0) return -1;
|
if (tEncodeI64(pEncoder, pObj->triggerParam) < 0) return -1;
|
||||||
if (tEncodeI64(pEncoder, pObj->watermark) < 0) return -1;
|
if (tEncodeI64(pEncoder, pObj->watermark) < 0) return -1;
|
||||||
|
|
||||||
|
@ -74,10 +76,12 @@ int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) {
|
||||||
|
|
||||||
if (tEncodeSSchemaWrapper(pEncoder, &pObj->outputSchema) < 0) return -1;
|
if (tEncodeSSchemaWrapper(pEncoder, &pObj->outputSchema) < 0) return -1;
|
||||||
|
|
||||||
|
tEndEncode(pEncoder);
|
||||||
return pEncoder->pos;
|
return pEncoder->pos;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj) {
|
int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj) {
|
||||||
|
if (tStartDecode(pDecoder) < 0) return -1;
|
||||||
if (tDecodeCStrTo(pDecoder, pObj->name) < 0) return -1;
|
if (tDecodeCStrTo(pDecoder, pObj->name) < 0) return -1;
|
||||||
|
|
||||||
if (tDecodeI64(pDecoder, &pObj->createTime) < 0) return -1;
|
if (tDecodeI64(pDecoder, &pObj->createTime) < 0) return -1;
|
||||||
|
@ -91,6 +95,7 @@ int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj) {
|
||||||
|
|
||||||
if (tDecodeI8(pDecoder, &pObj->igExpired) < 0) return -1;
|
if (tDecodeI8(pDecoder, &pObj->igExpired) < 0) return -1;
|
||||||
if (tDecodeI8(pDecoder, &pObj->trigger) < 0) return -1;
|
if (tDecodeI8(pDecoder, &pObj->trigger) < 0) return -1;
|
||||||
|
if (tDecodeI8(pDecoder, &pObj->fillHistory) < 0) return -1;
|
||||||
if (tDecodeI64(pDecoder, &pObj->triggerParam) < 0) return -1;
|
if (tDecodeI64(pDecoder, &pObj->triggerParam) < 0) return -1;
|
||||||
if (tDecodeI64(pDecoder, &pObj->watermark) < 0) return -1;
|
if (tDecodeI64(pDecoder, &pObj->watermark) < 0) return -1;
|
||||||
|
|
||||||
|
@ -134,6 +139,7 @@ int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj) {
|
||||||
|
|
||||||
if (tDecodeSSchemaWrapper(pDecoder, &pObj->outputSchema) < 0) return -1;
|
if (tDecodeSSchemaWrapper(pDecoder, &pObj->outputSchema) < 0) return -1;
|
||||||
|
|
||||||
|
tEndDecode(pDecoder);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -280,6 +280,7 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj,
|
||||||
pObj->trigger = pCreate->triggerType;
|
pObj->trigger = pCreate->triggerType;
|
||||||
pObj->triggerParam = pCreate->maxDelay;
|
pObj->triggerParam = pCreate->maxDelay;
|
||||||
pObj->watermark = pCreate->watermark;
|
pObj->watermark = pCreate->watermark;
|
||||||
|
pObj->fillHistory = pCreate->fillHistory;
|
||||||
|
|
||||||
memcpy(pObj->sourceDb, pCreate->sourceDB, TSDB_DB_FNAME_LEN);
|
memcpy(pObj->sourceDb, pCreate->sourceDB, TSDB_DB_FNAME_LEN);
|
||||||
SDbObj *pSourceDb = mndAcquireDb(pMnode, pCreate->sourceDB);
|
SDbObj *pSourceDb = mndAcquireDb(pMnode, pCreate->sourceDB);
|
||||||
|
@ -686,7 +687,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
|
||||||
mError("stream:%s, failed to create since %s", createStreamReq.name, terrstr());
|
mError("stream:%s, failed to create since %s", createStreamReq.name, terrstr());
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
mndTransSetDbName(pTrans, createStreamReq.sourceDB, streamObj.targetDb); // hack way
|
mndTransSetDbName(pTrans, createStreamReq.sourceDB, streamObj.targetDb);
|
||||||
mInfo("trans:%d, used to create stream:%s", pTrans->id, createStreamReq.name);
|
mInfo("trans:%d, used to create stream:%s", pTrans->id, createStreamReq.name);
|
||||||
|
|
||||||
// create stb for stream
|
// create stb for stream
|
||||||
|
|
|
@ -162,6 +162,8 @@ typedef struct {
|
||||||
SQueryTableDataCond tableCond;
|
SQueryTableDataCond tableCond;
|
||||||
int64_t recoverStartVer;
|
int64_t recoverStartVer;
|
||||||
int64_t recoverEndVer;
|
int64_t recoverEndVer;
|
||||||
|
int64_t fillHistoryVer1;
|
||||||
|
int64_t fillHistoryVer2;
|
||||||
SStreamState* pState;
|
SStreamState* pState;
|
||||||
} SStreamTaskInfo;
|
} SStreamTaskInfo;
|
||||||
|
|
||||||
|
|
|
@ -569,8 +569,10 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (pResult != pSrcBlock) {
|
||||||
pResult->info.groupId = pSrcBlock->info.groupId;
|
pResult->info.groupId = pSrcBlock->info.groupId;
|
||||||
memcpy(pResult->info.parTbName, pSrcBlock->info.parTbName, TSDB_TABLE_NAME_LEN);
|
memcpy(pResult->info.parTbName, pSrcBlock->info.parTbName, TSDB_TABLE_NAME_LEN);
|
||||||
|
}
|
||||||
|
|
||||||
// if the source equals to the destination, it is to create a new column as the result of scalar
|
// if the source equals to the destination, it is to create a new column as the result of scalar
|
||||||
// function or some operators.
|
// function or some operators.
|
||||||
|
|
|
@ -284,7 +284,6 @@ static bool doLoadBlockSMA(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock,
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
static void doSetTagColumnData(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo) {
|
static void doSetTagColumnData(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo) {
|
||||||
if (pTableScanInfo->pseudoSup.numOfExprs > 0) {
|
if (pTableScanInfo->pseudoSup.numOfExprs > 0) {
|
||||||
SExprSupp* pSup = &pTableScanInfo->pseudoSup;
|
SExprSupp* pSup = &pTableScanInfo->pseudoSup;
|
||||||
|
@ -737,7 +736,8 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
|
||||||
|
|
||||||
SDataBlockDescNode* pDescNode = pTableScanNode->scan.node.pOutputDataBlockDesc;
|
SDataBlockDescNode* pDescNode = pTableScanNode->scan.node.pOutputDataBlockDesc;
|
||||||
int32_t numOfCols = 0;
|
int32_t numOfCols = 0;
|
||||||
pInfo->pColMatchInfo = extractColMatchInfo(pTableScanNode->scan.pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID);
|
pInfo->pColMatchInfo =
|
||||||
|
extractColMatchInfo(pTableScanNode->scan.pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID);
|
||||||
|
|
||||||
int32_t code = initQueryTableDataCond(&pInfo->cond, pTableScanNode);
|
int32_t code = initQueryTableDataCond(&pInfo->cond, pTableScanNode);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
@ -1707,9 +1707,12 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
#if 1
|
||||||
if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE) {
|
if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE) {
|
||||||
STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
|
STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
|
||||||
memcpy(&pTSInfo->cond, &pTaskInfo->streamInfo.tableCond, sizeof(SQueryTableDataCond));
|
memcpy(&pTSInfo->cond, &pTaskInfo->streamInfo.tableCond, sizeof(SQueryTableDataCond));
|
||||||
|
pTSInfo->cond.startVersion = -1;
|
||||||
|
pTSInfo->cond.endVersion = pTaskInfo->streamInfo.fillHistoryVer1;
|
||||||
pTSInfo->scanTimes = 0;
|
pTSInfo->scanTimes = 0;
|
||||||
pTSInfo->currentGroupId = -1;
|
pTSInfo->currentGroupId = -1;
|
||||||
pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__SCAN;
|
pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__SCAN;
|
||||||
|
@ -1718,12 +1721,14 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
|
||||||
if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__SCAN) {
|
if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__SCAN) {
|
||||||
SSDataBlock* pBlock = doTableScan(pInfo->pTableScanOp);
|
SSDataBlock* pBlock = doTableScan(pInfo->pTableScanOp);
|
||||||
if (pBlock != NULL) {
|
if (pBlock != NULL) {
|
||||||
|
calBlockTbName(&pInfo->tbnameCalSup, pBlock);
|
||||||
|
updateInfoFillBlockData(pInfo->pUpdateInfo, pBlock, pInfo->primaryTsIndex);
|
||||||
return pBlock;
|
return pBlock;
|
||||||
}
|
}
|
||||||
// TODO fill in bloom filter
|
|
||||||
pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__NONE;
|
pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__NONE;
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
size_t total = taosArrayGetSize(pInfo->pBlockLists);
|
size_t total = taosArrayGetSize(pInfo->pBlockLists);
|
||||||
// TODO: refactor
|
// TODO: refactor
|
||||||
|
@ -2109,7 +2114,7 @@ SOperatorInfo* createRawScanOperatorInfo(SReadHandle* pHandle, SExecTaskInfo* pT
|
||||||
pOperator->fpSet = createOperatorFpSet(NULL, doRawScan, NULL, NULL, destroyRawScanOperatorInfo, NULL, NULL, NULL);
|
pOperator->fpSet = createOperatorFpSet(NULL, doRawScan, NULL, NULL, destroyRawScanOperatorInfo, NULL, NULL, NULL);
|
||||||
return pOperator;
|
return pOperator;
|
||||||
|
|
||||||
_end:
|
_end:
|
||||||
taosMemoryFree(pInfo);
|
taosMemoryFree(pInfo);
|
||||||
taosMemoryFree(pOperator);
|
taosMemoryFree(pOperator);
|
||||||
pTaskInfo->code = code;
|
pTaskInfo->code = code;
|
||||||
|
|
|
@ -3149,6 +3149,7 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* p
|
||||||
|
|
||||||
static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
|
static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
|
||||||
SStreamIntervalOperatorInfo* pInfo = pOperator->info;
|
SStreamIntervalOperatorInfo* pInfo = pOperator->info;
|
||||||
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
|
|
||||||
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
||||||
TSKEY maxTs = INT64_MIN;
|
TSKEY maxTs = INT64_MIN;
|
||||||
|
@ -3191,6 +3192,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
|
||||||
} else {
|
} else {
|
||||||
deleteIntervalDiscBuf(pInfo->pState, pInfo->pPullDataMap, pInfo->twAggSup.maxTs - pInfo->twAggSup.deleteMark,
|
deleteIntervalDiscBuf(pInfo->pState, pInfo->pPullDataMap, pInfo->twAggSup.maxTs - pInfo->twAggSup.deleteMark,
|
||||||
&pInfo->interval, &pInfo->delKey);
|
&pInfo->interval, &pInfo->delKey);
|
||||||
|
streamStateCommit(pTaskInfo->streamInfo.pState);
|
||||||
}
|
}
|
||||||
return NULL;
|
return NULL;
|
||||||
} else {
|
} else {
|
||||||
|
@ -4975,8 +4977,6 @@ SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream,
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
SInterval interval = {.interval = pNode->interval,
|
SInterval interval = {.interval = pNode->interval,
|
||||||
.sliding = pNode->sliding,
|
.sliding = pNode->sliding,
|
||||||
.intervalUnit = pNode->intervalUnit,
|
.intervalUnit = pNode->intervalUnit,
|
||||||
|
@ -5383,6 +5383,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
|
||||||
deleteIntervalDiscBuf(pInfo->pState, NULL, pInfo->twAggSup.maxTs - pInfo->twAggSup.deleteMark, &pInfo->interval,
|
deleteIntervalDiscBuf(pInfo->pState, NULL, pInfo->twAggSup.maxTs - pInfo->twAggSup.deleteMark, &pInfo->interval,
|
||||||
&pInfo->delKey);
|
&pInfo->delKey);
|
||||||
doSetOperatorCompleted(pOperator);
|
doSetOperatorCompleted(pOperator);
|
||||||
|
streamStateCommit(pTaskInfo->streamInfo.pState);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -113,6 +113,13 @@ int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, int64_t startVer, char*
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
goto FAIL;
|
goto FAIL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (pTask->fillHistory) {
|
||||||
|
// pipeline exec
|
||||||
|
// if finished, dispatch a stream-prepare-finished msg to downstream task
|
||||||
|
// set status normal
|
||||||
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
FAIL:
|
FAIL:
|
||||||
|
|
|
@ -49,7 +49,7 @@ int32_t tDecodeStreamEpInfo(SDecoder* pDecoder, SStreamChildEpInfo* pInfo) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
|
int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
|
||||||
/*if (tStartEncode(pEncoder) < 0) return -1;*/
|
if (tStartEncode(pEncoder) < 0) return -1;
|
||||||
if (tEncodeI64(pEncoder, pTask->streamId) < 0) return -1;
|
if (tEncodeI64(pEncoder, pTask->streamId) < 0) return -1;
|
||||||
if (tEncodeI32(pEncoder, pTask->taskId) < 0) return -1;
|
if (tEncodeI32(pEncoder, pTask->taskId) < 0) return -1;
|
||||||
if (tEncodeI32(pEncoder, pTask->totalLevel) < 0) return -1;
|
if (tEncodeI32(pEncoder, pTask->totalLevel) < 0) return -1;
|
||||||
|
@ -64,6 +64,10 @@ int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
|
||||||
if (tEncodeI32(pEncoder, pTask->nodeId) < 0) return -1;
|
if (tEncodeI32(pEncoder, pTask->nodeId) < 0) return -1;
|
||||||
if (tEncodeSEpSet(pEncoder, &pTask->epSet) < 0) return -1;
|
if (tEncodeSEpSet(pEncoder, &pTask->epSet) < 0) return -1;
|
||||||
|
|
||||||
|
if (tEncodeI64(pEncoder, pTask->recoverSnapVer) < 0) return -1;
|
||||||
|
if (tEncodeI64(pEncoder, pTask->startVer) < 0) return -1;
|
||||||
|
if (tEncodeI8(pEncoder, pTask->fillHistory) < 0) return -1;
|
||||||
|
|
||||||
int32_t epSz = taosArrayGetSize(pTask->childEpInfo);
|
int32_t epSz = taosArrayGetSize(pTask->childEpInfo);
|
||||||
if (tEncodeI32(pEncoder, epSz) < 0) return -1;
|
if (tEncodeI32(pEncoder, epSz) < 0) return -1;
|
||||||
for (int32_t i = 0; i < epSz; i++) {
|
for (int32_t i = 0; i < epSz; i++) {
|
||||||
|
@ -93,12 +97,12 @@ int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
|
||||||
}
|
}
|
||||||
if (tEncodeI64(pEncoder, pTask->triggerParam) < 0) return -1;
|
if (tEncodeI64(pEncoder, pTask->triggerParam) < 0) return -1;
|
||||||
|
|
||||||
/*tEndEncode(pEncoder);*/
|
tEndEncode(pEncoder);
|
||||||
return pEncoder->pos;
|
return pEncoder->pos;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
|
int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
|
||||||
/*if (tStartDecode(pDecoder) < 0) return -1;*/
|
if (tStartDecode(pDecoder) < 0) return -1;
|
||||||
if (tDecodeI64(pDecoder, &pTask->streamId) < 0) return -1;
|
if (tDecodeI64(pDecoder, &pTask->streamId) < 0) return -1;
|
||||||
if (tDecodeI32(pDecoder, &pTask->taskId) < 0) return -1;
|
if (tDecodeI32(pDecoder, &pTask->taskId) < 0) return -1;
|
||||||
if (tDecodeI32(pDecoder, &pTask->totalLevel) < 0) return -1;
|
if (tDecodeI32(pDecoder, &pTask->totalLevel) < 0) return -1;
|
||||||
|
@ -113,6 +117,10 @@ int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
|
||||||
if (tDecodeI32(pDecoder, &pTask->nodeId) < 0) return -1;
|
if (tDecodeI32(pDecoder, &pTask->nodeId) < 0) return -1;
|
||||||
if (tDecodeSEpSet(pDecoder, &pTask->epSet) < 0) return -1;
|
if (tDecodeSEpSet(pDecoder, &pTask->epSet) < 0) return -1;
|
||||||
|
|
||||||
|
if (tDecodeI64(pDecoder, &pTask->recoverSnapVer) < 0) return -1;
|
||||||
|
if (tDecodeI64(pDecoder, &pTask->startVer) < 0) return -1;
|
||||||
|
if (tDecodeI8(pDecoder, &pTask->fillHistory) < 0) return -1;
|
||||||
|
|
||||||
int32_t epSz;
|
int32_t epSz;
|
||||||
if (tDecodeI32(pDecoder, &epSz) < 0) return -1;
|
if (tDecodeI32(pDecoder, &epSz) < 0) return -1;
|
||||||
pTask->childEpInfo = taosArrayInit(epSz, sizeof(void*));
|
pTask->childEpInfo = taosArrayInit(epSz, sizeof(void*));
|
||||||
|
@ -150,7 +158,7 @@ int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
|
||||||
}
|
}
|
||||||
if (tDecodeI64(pDecoder, &pTask->triggerParam) < 0) return -1;
|
if (tDecodeI64(pDecoder, &pTask->triggerParam) < 0) return -1;
|
||||||
|
|
||||||
/*tEndDecode(pDecoder);*/
|
tEndDecode(pDecoder);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -14,6 +14,7 @@
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "query.h"
|
#include "query.h"
|
||||||
|
#include "tdatablock.h"
|
||||||
#include "tencode.h"
|
#include "tencode.h"
|
||||||
#include "tstreamUpdate.h"
|
#include "tstreamUpdate.h"
|
||||||
#include "ttime.h"
|
#include "ttime.h"
|
||||||
|
@ -162,15 +163,42 @@ bool updateInfoIsTableInserted(SUpdateInfo *pInfo, int64_t tbUid) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void updateInfoFillBlockData(SUpdateInfo *pInfo, SSDataBlock *pBlock, int32_t primaryTsCol) {
|
||||||
|
if (pBlock == NULL || pBlock->info.rows == 0) return;
|
||||||
|
TSKEY maxTs = -1;
|
||||||
|
int64_t tbUid = pBlock->info.uid;
|
||||||
|
|
||||||
|
SColumnInfoData *pColDataInfo = taosArrayGet(pBlock->pDataBlock, primaryTsCol);
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < pBlock->info.rows; i++) {
|
||||||
|
TSKEY ts = ((TSKEY *)pColDataInfo->pData)[i];
|
||||||
|
maxTs = TMAX(maxTs, ts);
|
||||||
|
SScalableBf *pSBf = getSBf(pInfo, ts);
|
||||||
|
if (pSBf) {
|
||||||
|
tScalableBfPut(pSBf, &ts, sizeof(TSKEY));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
TSKEY *pMaxTs = taosHashGet(pInfo->pMap, &tbUid, sizeof(int64_t));
|
||||||
|
if (pMaxTs == NULL || *pMaxTs > tbUid) {
|
||||||
|
taosHashPut(pInfo->pMap, &tbUid, sizeof(int64_t), &maxTs, sizeof(TSKEY));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
bool updateInfoIsUpdated(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts) {
|
bool updateInfoIsUpdated(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts) {
|
||||||
int32_t res = TSDB_CODE_FAILED;
|
int32_t res = TSDB_CODE_FAILED;
|
||||||
|
|
||||||
|
SUpdateKey updateKey = {
|
||||||
|
.tbUid = tableId,
|
||||||
|
.ts = ts,
|
||||||
|
};
|
||||||
|
|
||||||
TSKEY *pMapMaxTs = taosHashGet(pInfo->pMap, &tableId, sizeof(uint64_t));
|
TSKEY *pMapMaxTs = taosHashGet(pInfo->pMap, &tableId, sizeof(uint64_t));
|
||||||
uint64_t index = ((uint64_t)tableId) % pInfo->numBuckets;
|
uint64_t index = ((uint64_t)tableId) % pInfo->numBuckets;
|
||||||
TSKEY maxTs = *(TSKEY *)taosArrayGet(pInfo->pTsBuckets, index);
|
TSKEY maxTs = *(TSKEY *)taosArrayGet(pInfo->pTsBuckets, index);
|
||||||
if (ts < maxTs - pInfo->watermark) {
|
if (ts < maxTs - pInfo->watermark) {
|
||||||
// this window has been closed.
|
// this window has been closed.
|
||||||
if (pInfo->pCloseWinSBF) {
|
if (pInfo->pCloseWinSBF) {
|
||||||
res = tScalableBfPut(pInfo->pCloseWinSBF, &ts, sizeof(TSKEY));
|
res = tScalableBfPut(pInfo->pCloseWinSBF, &updateKey, sizeof(SUpdateKey));
|
||||||
if (res == TSDB_CODE_SUCCESS) {
|
if (res == TSDB_CODE_SUCCESS) {
|
||||||
return false;
|
return false;
|
||||||
} else {
|
} else {
|
||||||
|
@ -183,7 +211,7 @@ bool updateInfoIsUpdated(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts) {
|
||||||
SScalableBf *pSBf = getSBf(pInfo, ts);
|
SScalableBf *pSBf = getSBf(pInfo, ts);
|
||||||
// pSBf may be a null pointer
|
// pSBf may be a null pointer
|
||||||
if (pSBf) {
|
if (pSBf) {
|
||||||
res = tScalableBfPut(pSBf, &ts, sizeof(TSKEY));
|
res = tScalableBfPut(pSBf, &updateKey, sizeof(SUpdateKey));
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t size = taosHashGetSize(pInfo->pMap);
|
int32_t size = taosHashGetSize(pInfo->pMap);
|
||||||
|
|
|
@ -57,8 +57,10 @@ SBloomFilter *tBloomFilterInit(uint64_t expectedEntries, double errorRate) {
|
||||||
|
|
||||||
// ln(2) = 0.693147180559945
|
// ln(2) = 0.693147180559945
|
||||||
pBF->hashFunctions = (uint32_t)ceil(lnRate / 0.693147180559945);
|
pBF->hashFunctions = (uint32_t)ceil(lnRate / 0.693147180559945);
|
||||||
pBF->hashFn1 = taosGetDefaultHashFunction(TSDB_DATA_TYPE_TIMESTAMP);
|
/*pBF->hashFn1 = taosGetDefaultHashFunction(TSDB_DATA_TYPE_TIMESTAMP);*/
|
||||||
pBF->hashFn2 = taosGetDefaultHashFunction(TSDB_DATA_TYPE_NCHAR);
|
/*pBF->hashFn2 = taosGetDefaultHashFunction(TSDB_DATA_TYPE_NCHAR);*/
|
||||||
|
pBF->hashFn1 = taosFastHash;
|
||||||
|
pBF->hashFn2 = taosDJB2Hash;
|
||||||
pBF->buffer = taosMemoryCalloc(pBF->numUnits, sizeof(uint64_t));
|
pBF->buffer = taosMemoryCalloc(pBF->numUnits, sizeof(uint64_t));
|
||||||
if (pBF->buffer == NULL) {
|
if (pBF->buffer == NULL) {
|
||||||
tBloomFilterDestroy(pBF);
|
tBloomFilterDestroy(pBF);
|
||||||
|
|
|
@ -32,6 +32,23 @@
|
||||||
(h) ^= (h) >> 16; \
|
(h) ^= (h) >> 16; \
|
||||||
} while (0)
|
} while (0)
|
||||||
|
|
||||||
|
uint32_t taosFastHash(const char *key, uint32_t len) {
|
||||||
|
uint32_t result = 0x55555555;
|
||||||
|
for (uint32_t i = 0; i < len; i++) {
|
||||||
|
result ^= (uint8_t)key[i];
|
||||||
|
result = ROTL32(result, 5);
|
||||||
|
}
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
uint32_t taosDJB2Hash(const char *key, uint32_t len) {
|
||||||
|
uint32_t hash = 5381;
|
||||||
|
for (uint32_t i = 0; i < len; i++) {
|
||||||
|
hash = ((hash << 5) + hash) + (uint8_t)key[i]; /* hash * 33 + c */
|
||||||
|
}
|
||||||
|
return hash;
|
||||||
|
}
|
||||||
|
|
||||||
uint32_t MurmurHash3_32(const char *key, uint32_t len) {
|
uint32_t MurmurHash3_32(const char *key, uint32_t len) {
|
||||||
const uint8_t *data = (const uint8_t *)key;
|
const uint8_t *data = (const uint8_t *)key;
|
||||||
const int32_t nblocks = len >> 2u;
|
const int32_t nblocks = len >> 2u;
|
||||||
|
|
Loading…
Reference in New Issue