enh(stream): reduce table scan
This commit is contained in:
parent
bea3b35b2e
commit
2ced6c280c
|
@ -25,33 +25,34 @@ extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
typedef struct SUpdateInfo {
|
typedef struct SUpdateInfo {
|
||||||
SArray *pTsBuckets;
|
SArray *pTsBuckets;
|
||||||
uint64_t numBuckets;
|
uint64_t numBuckets;
|
||||||
SArray *pTsSBFs;
|
SArray *pTsSBFs;
|
||||||
uint64_t numSBFs;
|
uint64_t numSBFs;
|
||||||
int64_t interval;
|
int64_t interval;
|
||||||
int64_t watermark;
|
int64_t watermark;
|
||||||
TSKEY minTS;
|
TSKEY minTS;
|
||||||
SScalableBf* pCloseWinSBF;
|
SScalableBf *pCloseWinSBF;
|
||||||
SHashObj* pMap;
|
SHashObj *pMap;
|
||||||
STimeWindow scanWindow;
|
STimeWindow scanWindow;
|
||||||
uint64_t scanGroupId;
|
uint64_t scanGroupId;
|
||||||
uint64_t maxVersion;
|
uint64_t maxVersion;
|
||||||
} SUpdateInfo;
|
} SUpdateInfo;
|
||||||
|
|
||||||
SUpdateInfo *updateInfoInitP(SInterval* pInterval, int64_t watermark);
|
SUpdateInfo *updateInfoInitP(SInterval *pInterval, int64_t watermark);
|
||||||
SUpdateInfo *updateInfoInit(int64_t interval, int32_t precision, int64_t watermark);
|
SUpdateInfo *updateInfoInit(int64_t interval, int32_t precision, int64_t watermark);
|
||||||
bool updateInfoIsUpdated(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts);
|
bool updateInfoIsUpdated(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts);
|
||||||
void updateInfoSetScanRange(SUpdateInfo *pInfo, STimeWindow* pWin, uint64_t groupId, uint64_t version);
|
bool updateInfoIsTableInserted(SUpdateInfo *pInfo, int64_t tbUid);
|
||||||
bool updateInfoIgnore(SUpdateInfo *pInfo, STimeWindow* pWin, uint64_t groupId, uint64_t version);
|
void updateInfoSetScanRange(SUpdateInfo *pInfo, STimeWindow *pWin, uint64_t groupId, uint64_t version);
|
||||||
void updateInfoDestroy(SUpdateInfo *pInfo);
|
bool updateInfoIgnore(SUpdateInfo *pInfo, STimeWindow *pWin, uint64_t groupId, uint64_t version);
|
||||||
void updateInfoAddCloseWindowSBF(SUpdateInfo *pInfo);
|
void updateInfoDestroy(SUpdateInfo *pInfo);
|
||||||
void updateInfoDestoryColseWinSBF(SUpdateInfo *pInfo);
|
void updateInfoAddCloseWindowSBF(SUpdateInfo *pInfo);
|
||||||
int32_t updateInfoSerialize(void *buf, int32_t bufLen, const SUpdateInfo *pInfo);
|
void updateInfoDestoryColseWinSBF(SUpdateInfo *pInfo);
|
||||||
int32_t updateInfoDeserialize(void *buf, int32_t bufLen, SUpdateInfo *pInfo);
|
int32_t updateInfoSerialize(void *buf, int32_t bufLen, const SUpdateInfo *pInfo);
|
||||||
|
int32_t updateInfoDeserialize(void *buf, int32_t bufLen, SUpdateInfo *pInfo);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
#endif /* ifndef _TSTREAMUPDATE_H_ */
|
#endif /* ifndef _TSTREAMUPDATE_H_ */
|
||||||
|
|
|
@ -56,8 +56,8 @@ struct tmq_conf_t {
|
||||||
int8_t autoCommit;
|
int8_t autoCommit;
|
||||||
int8_t resetOffset;
|
int8_t resetOffset;
|
||||||
int8_t withTbName;
|
int8_t withTbName;
|
||||||
int8_t ssEnable;
|
int8_t snapEnable;
|
||||||
int32_t ssBatchSize;
|
int32_t snapBatchSize;
|
||||||
|
|
||||||
bool hbBgEnable;
|
bool hbBgEnable;
|
||||||
|
|
||||||
|
@ -287,16 +287,21 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value
|
||||||
|
|
||||||
if (strcmp(key, "experimental.snapshot.enable") == 0) {
|
if (strcmp(key, "experimental.snapshot.enable") == 0) {
|
||||||
if (strcmp(value, "true") == 0) {
|
if (strcmp(value, "true") == 0) {
|
||||||
conf->ssEnable = true;
|
conf->snapEnable = true;
|
||||||
return TMQ_CONF_OK;
|
return TMQ_CONF_OK;
|
||||||
} else if (strcmp(value, "false") == 0) {
|
} else if (strcmp(value, "false") == 0) {
|
||||||
conf->ssEnable = false;
|
conf->snapEnable = false;
|
||||||
return TMQ_CONF_OK;
|
return TMQ_CONF_OK;
|
||||||
} else {
|
} else {
|
||||||
return TMQ_CONF_INVALID;
|
return TMQ_CONF_INVALID;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (strcmp(key, "experimental.snapshot.batch.size") == 0) {
|
||||||
|
conf->snapBatchSize = atoi(value);
|
||||||
|
return TMQ_CONF_OK;
|
||||||
|
}
|
||||||
|
|
||||||
if (strcmp(key, "enable.heartbeat.background") == 0) {
|
if (strcmp(key, "enable.heartbeat.background") == 0) {
|
||||||
if (strcmp(value, "true") == 0) {
|
if (strcmp(value, "true") == 0) {
|
||||||
conf->hbBgEnable = true;
|
conf->hbBgEnable = true;
|
||||||
|
@ -310,11 +315,6 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value
|
||||||
return TMQ_CONF_OK;
|
return TMQ_CONF_OK;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (strcmp(key, "experimental.snapshot.batch.size") == 0) {
|
|
||||||
conf->ssBatchSize = atoi(value);
|
|
||||||
return TMQ_CONF_OK;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (strcmp(key, "td.connect.ip") == 0) {
|
if (strcmp(key, "td.connect.ip") == 0) {
|
||||||
conf->ip = strdup(value);
|
conf->ip = strdup(value);
|
||||||
return TMQ_CONF_OK;
|
return TMQ_CONF_OK;
|
||||||
|
@ -889,7 +889,7 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
|
||||||
strcpy(pTmq->clientId, conf->clientId);
|
strcpy(pTmq->clientId, conf->clientId);
|
||||||
strcpy(pTmq->groupId, conf->groupId);
|
strcpy(pTmq->groupId, conf->groupId);
|
||||||
pTmq->withTbName = conf->withTbName;
|
pTmq->withTbName = conf->withTbName;
|
||||||
pTmq->useSnapshot = conf->ssEnable;
|
pTmq->useSnapshot = conf->snapEnable;
|
||||||
pTmq->autoCommit = conf->autoCommit;
|
pTmq->autoCommit = conf->autoCommit;
|
||||||
pTmq->autoCommitInterval = conf->autoCommitInterval;
|
pTmq->autoCommitInterval = conf->autoCommitInterval;
|
||||||
pTmq->commitCb = conf->commitCb;
|
pTmq->commitCb = conf->commitCb;
|
||||||
|
|
|
@ -13,11 +13,11 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "os.h"
|
|
||||||
#include "executorimpl.h"
|
#include "executorimpl.h"
|
||||||
#include "filter.h"
|
#include "filter.h"
|
||||||
#include "function.h"
|
#include "function.h"
|
||||||
#include "functionMgt.h"
|
#include "functionMgt.h"
|
||||||
|
#include "os.h"
|
||||||
#include "querynodes.h"
|
#include "querynodes.h"
|
||||||
#include "systable.h"
|
#include "systable.h"
|
||||||
#include "tname.h"
|
#include "tname.h"
|
||||||
|
@ -178,8 +178,8 @@ static SResultRow* getTableGroupOutputBuf(SOperatorInfo* pOperator, uint64_t gro
|
||||||
|
|
||||||
STableScanInfo* pTableScanInfo = pOperator->info;
|
STableScanInfo* pTableScanInfo = pOperator->info;
|
||||||
|
|
||||||
SResultRowPosition* p1 =
|
SResultRowPosition* p1 = (SResultRowPosition*)taosHashGet(pTableScanInfo->pdInfo.pAggSup->pResultRowHashTable, buf,
|
||||||
(SResultRowPosition*)taosHashGet(pTableScanInfo->pdInfo.pAggSup->pResultRowHashTable, buf, GET_RES_WINDOW_KEY_LEN(sizeof(groupId)));
|
GET_RES_WINDOW_KEY_LEN(sizeof(groupId)));
|
||||||
|
|
||||||
if (p1 == NULL) {
|
if (p1 == NULL) {
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -238,7 +238,7 @@ static FORCE_INLINE bool doFilterByBlockSMA(const SNode* pFilterNode, SColumnDat
|
||||||
|
|
||||||
// todo move to the initialization function
|
// todo move to the initialization function
|
||||||
int32_t code = filterInitFromNode((SNode*)pFilterNode, &filter, 0);
|
int32_t code = filterInitFromNode((SNode*)pFilterNode, &filter, 0);
|
||||||
bool keep = filterRangeExecute(filter, pColsAgg, numOfCols, numOfRows);
|
bool keep = filterRangeExecute(filter, pColsAgg, numOfCols, numOfRows);
|
||||||
|
|
||||||
filterFreeInfo(filter);
|
filterFreeInfo(filter);
|
||||||
return keep;
|
return keep;
|
||||||
|
@ -312,9 +312,9 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
} else if (*status == FUNC_DATA_REQUIRED_STATIS_LOAD) {
|
} else if (*status == FUNC_DATA_REQUIRED_STATIS_LOAD) {
|
||||||
pCost->loadBlockStatis += 1;
|
pCost->loadBlockStatis += 1;
|
||||||
loadSMA = true; // mark the operation of load sma;
|
loadSMA = true; // mark the operation of load sma;
|
||||||
bool success = doLoadBlockSMA(pTableScanInfo, pBlock, pTaskInfo);
|
bool success = doLoadBlockSMA(pTableScanInfo, pBlock, pTaskInfo);
|
||||||
if (success) { // failed to load the block sma data, data block statistics does not exist, load data block instead
|
if (success) { // failed to load the block sma data, data block statistics does not exist, load data block instead
|
||||||
qDebug("%s data block SMA loaded, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
|
qDebug("%s data block SMA loaded, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
|
||||||
pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
|
pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -453,7 +453,7 @@ int32_t addTagPseudoColumnData(SReadHandle* pHandle, SExprInfo* pPseudoExpr, int
|
||||||
colDataAppendNNULL(pColInfoData, 0, pBlock->info.rows);
|
colDataAppendNNULL(pColInfoData, 0, pBlock->info.rows);
|
||||||
} else if (pColInfoData->info.type != TSDB_DATA_TYPE_JSON) {
|
} else if (pColInfoData->info.type != TSDB_DATA_TYPE_JSON) {
|
||||||
colDataAppendNItems(pColInfoData, 0, data, pBlock->info.rows);
|
colDataAppendNItems(pColInfoData, 0, data, pBlock->info.rows);
|
||||||
} else { // todo opt for json tag
|
} else { // todo opt for json tag
|
||||||
for (int32_t i = 0; i < pBlock->info.rows; ++i) {
|
for (int32_t i = 0; i < pBlock->info.rows; ++i) {
|
||||||
colDataAppend(pColInfoData, i, data, false);
|
colDataAppend(pColInfoData, i, data, false);
|
||||||
}
|
}
|
||||||
|
@ -570,7 +570,10 @@ static SSDataBlock* doTableScanGroup(SOperatorInfo* pOperator) {
|
||||||
if (pTableScanInfo->scanTimes < pTableScanInfo->scanInfo.numOfAsc) {
|
if (pTableScanInfo->scanTimes < pTableScanInfo->scanInfo.numOfAsc) {
|
||||||
setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED);
|
setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED);
|
||||||
pTableScanInfo->scanFlag = REPEAT_SCAN;
|
pTableScanInfo->scanFlag = REPEAT_SCAN;
|
||||||
qDebug("%s start to repeat ascending order scan data SELECT last_row(*),hostname from cpu group by hostname;blocks due to query func required", GET_TASKID(pTaskInfo));
|
qDebug(
|
||||||
|
"%s start to repeat ascending order scan data SELECT last_row(*),hostname from cpu group by hostname;blocks "
|
||||||
|
"due to query func required",
|
||||||
|
GET_TASKID(pTaskInfo));
|
||||||
|
|
||||||
// do prepare for the next round table scan operation
|
// do prepare for the next round table scan operation
|
||||||
tsdbReaderReset(pTableScanInfo->dataReader, &pTableScanInfo->cond);
|
tsdbReaderReset(pTableScanInfo->dataReader, &pTableScanInfo->cond);
|
||||||
|
@ -1174,16 +1177,18 @@ static void checkUpdateData(SStreamScanInfo* pInfo, bool invertible, SSDataBlock
|
||||||
for (int32_t rowId = 0; rowId < pBlock->info.rows; rowId++) {
|
for (int32_t rowId = 0; rowId < pBlock->info.rows; rowId++) {
|
||||||
SResultRowInfo dumyInfo;
|
SResultRowInfo dumyInfo;
|
||||||
dumyInfo.cur.pageId = -1;
|
dumyInfo.cur.pageId = -1;
|
||||||
bool isClosed = false;
|
bool isClosed = false;
|
||||||
STimeWindow win = {.skey = INT64_MIN, .ekey = INT64_MAX};
|
STimeWindow win = {.skey = INT64_MIN, .ekey = INT64_MAX};
|
||||||
if (isOverdue(tsCol[rowId], &pInfo->twAggSup)) {
|
if (isOverdue(tsCol[rowId], &pInfo->twAggSup)) {
|
||||||
win = getActiveTimeWindow(NULL, &dumyInfo, tsCol[rowId], &pInfo->interval, TSDB_ORDER_ASC);
|
win = getActiveTimeWindow(NULL, &dumyInfo, tsCol[rowId], &pInfo->interval, TSDB_ORDER_ASC);
|
||||||
isClosed = isCloseWindow(&win, &pInfo->twAggSup);
|
isClosed = isCloseWindow(&win, &pInfo->twAggSup);
|
||||||
}
|
}
|
||||||
|
bool inserted = updateInfoIsTableInserted(pInfo->pUpdateInfo, pBlock->info.uid);
|
||||||
// must check update info first.
|
// must check update info first.
|
||||||
bool update = updateInfoIsUpdated(pInfo->pUpdateInfo, pBlock->info.uid, tsCol[rowId]);
|
bool update = updateInfoIsUpdated(pInfo->pUpdateInfo, pBlock->info.uid, tsCol[rowId]);
|
||||||
if ((update || (isSignleIntervalWindow(pInfo) && isClosed &&
|
bool closedWin = isClosed && inserted && isSignleIntervalWindow(pInfo) &&
|
||||||
isDeletedWindow(&win, pBlock->info.groupId, pInfo->sessionSup.pIntervalAggSup))) && out) {
|
isDeletedWindow(&win, pBlock->info.groupId, pInfo->sessionSup.pIntervalAggSup);
|
||||||
|
if ((update || closedWin) && out) {
|
||||||
appendOneRow(pInfo->pUpdateDataRes, tsCol + rowId, tsCol + rowId, &pBlock->info.uid);
|
appendOneRow(pInfo->pUpdateDataRes, tsCol + rowId, tsCol + rowId, &pBlock->info.uid);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1390,8 +1395,8 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
|
||||||
SSDataBlock* pSDB = doRangeScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex);
|
SSDataBlock* pSDB = doRangeScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex);
|
||||||
if (pSDB) {
|
if (pSDB) {
|
||||||
STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
|
STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
|
||||||
uint64_t version = getReaderMaxVersion(pTableScanInfo->dataReader);
|
uint64_t version = getReaderMaxVersion(pTableScanInfo->dataReader);
|
||||||
updateInfoSetScanRange(pInfo->pUpdateInfo, &pTableScanInfo->cond.twindows, pInfo->groupId,version);
|
updateInfoSetScanRange(pInfo->pUpdateInfo, &pTableScanInfo->cond.twindows, pInfo->groupId, version);
|
||||||
pSDB->info.type = pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE ? STREAM_NORMAL : STREAM_PULL_DATA;
|
pSDB->info.type = pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE ? STREAM_NORMAL : STREAM_PULL_DATA;
|
||||||
checkUpdateData(pInfo, true, pSDB, false);
|
checkUpdateData(pInfo, true, pSDB, false);
|
||||||
return pSDB;
|
return pSDB;
|
||||||
|
@ -1445,7 +1450,8 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
|
||||||
|
|
||||||
setBlockIntoRes(pInfo, &block);
|
setBlockIntoRes(pInfo, &block);
|
||||||
|
|
||||||
if (updateInfoIgnore(pInfo->pUpdateInfo, &pInfo->pRes->info.window, pInfo->pRes->info.groupId, pInfo->pRes->info.version)) {
|
if (updateInfoIgnore(pInfo->pUpdateInfo, &pInfo->pRes->info.window, pInfo->pRes->info.groupId,
|
||||||
|
pInfo->pRes->info.version)) {
|
||||||
printDataBlock(pInfo->pRes, "stream scan ignore");
|
printDataBlock(pInfo->pRes, "stream scan ignore");
|
||||||
blockDataCleanup(pInfo->pRes);
|
blockDataCleanup(pInfo->pRes);
|
||||||
continue;
|
continue;
|
||||||
|
@ -2248,7 +2254,7 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) {
|
||||||
// build message and send to mnode to fetch the content of system tables.
|
// build message and send to mnode to fetch the content of system tables.
|
||||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
SSysTableScanInfo* pInfo = pOperator->info;
|
SSysTableScanInfo* pInfo = pOperator->info;
|
||||||
char dbName[TSDB_DB_NAME_LEN] = {0};
|
char dbName[TSDB_DB_NAME_LEN] = {0};
|
||||||
|
|
||||||
const char* name = tNameGetTableName(&pInfo->name);
|
const char* name = tNameGetTableName(&pInfo->name);
|
||||||
if (pInfo->showRewrite) {
|
if (pInfo->showRewrite) {
|
||||||
|
@ -2260,8 +2266,8 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) {
|
||||||
return sysTableScanUserTables(pOperator);
|
return sysTableScanUserTables(pOperator);
|
||||||
} else if (strncasecmp(name, TSDB_INS_TABLE_TAGS, TSDB_TABLE_FNAME_LEN) == 0) {
|
} else if (strncasecmp(name, TSDB_INS_TABLE_TAGS, TSDB_TABLE_FNAME_LEN) == 0) {
|
||||||
return sysTableScanUserTags(pOperator);
|
return sysTableScanUserTags(pOperator);
|
||||||
} else if (strncasecmp(name, TSDB_INS_TABLE_STABLES, TSDB_TABLE_FNAME_LEN) == 0 &&
|
} else if (strncasecmp(name, TSDB_INS_TABLE_STABLES, TSDB_TABLE_FNAME_LEN) == 0 && pInfo->showRewrite &&
|
||||||
pInfo->showRewrite && IS_SYS_DBNAME(dbName)) {
|
IS_SYS_DBNAME(dbName)) {
|
||||||
return sysTableScanUserSTables(pOperator);
|
return sysTableScanUserSTables(pOperator);
|
||||||
} else { // load the meta from mnode of the given epset
|
} else { // load the meta from mnode of the given epset
|
||||||
if (pOperator->status == OP_EXEC_DONE) {
|
if (pOperator->status == OP_EXEC_DONE) {
|
||||||
|
@ -2541,7 +2547,7 @@ static void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput) {
|
||||||
pInfo->pRes = blockDataDestroy(pInfo->pRes);
|
pInfo->pRes = blockDataDestroy(pInfo->pRes);
|
||||||
|
|
||||||
taosArrayDestroy(pInfo->pColMatchInfo);
|
taosArrayDestroy(pInfo->pColMatchInfo);
|
||||||
|
|
||||||
taosMemoryFreeClear(param);
|
taosMemoryFreeClear(param);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2597,7 +2603,6 @@ _error:
|
||||||
int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags, bool groupSort, SReadHandle* pHandle,
|
int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags, bool groupSort, SReadHandle* pHandle,
|
||||||
STableListInfo* pTableListInfo, SNode* pTagCond, SNode* pTagIndexCond,
|
STableListInfo* pTableListInfo, SNode* pTagCond, SNode* pTagIndexCond,
|
||||||
const char* idStr) {
|
const char* idStr) {
|
||||||
|
|
||||||
int64_t st = taosGetTimestampUs();
|
int64_t st = taosGetTimestampUs();
|
||||||
|
|
||||||
int32_t code = getTableList(pHandle->meta, pHandle->vnode, pScanNode, pTagCond, pTagIndexCond, pTableListInfo);
|
int32_t code = getTableList(pHandle->meta, pHandle->vnode, pScanNode, pTagCond, pTagIndexCond, pTableListInfo);
|
||||||
|
@ -2606,7 +2611,7 @@ int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t st1 = taosGetTimestampUs();
|
int64_t st1 = taosGetTimestampUs();
|
||||||
qDebug("generate queried table list completed, elapsed time:%.2f ms %s", (st1-st)/1000.0, idStr);
|
qDebug("generate queried table list completed, elapsed time:%.2f ms %s", (st1 - st) / 1000.0, idStr);
|
||||||
|
|
||||||
if (taosArrayGetSize(pTableListInfo->pTableList) == 0) {
|
if (taosArrayGetSize(pTableListInfo->pTableList) == 0) {
|
||||||
qDebug("no table qualified for query, %s" PRIx64, idStr);
|
qDebug("no table qualified for query, %s" PRIx64, idStr);
|
||||||
|
@ -2620,7 +2625,7 @@ int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t st2 = taosGetTimestampUs();
|
int64_t st2 = taosGetTimestampUs();
|
||||||
qDebug("generate group id map completed, elapsed time:%.2f ms %s", (st2-st1)/1000.0, idStr);
|
qDebug("generate group id map completed, elapsed time:%.2f ms %s", (st2 - st1) / 1000.0, idStr);
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
|
@ -13,33 +13,31 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "tstreamUpdate.h"
|
|
||||||
#include "tencode.h"
|
|
||||||
#include "ttime.h"
|
|
||||||
#include "query.h"
|
#include "query.h"
|
||||||
|
#include "tencode.h"
|
||||||
|
#include "tstreamUpdate.h"
|
||||||
|
#include "ttime.h"
|
||||||
|
|
||||||
#define DEFAULT_FALSE_POSITIVE 0.01
|
#define DEFAULT_FALSE_POSITIVE 0.01
|
||||||
#define DEFAULT_BUCKET_SIZE 1310720
|
#define DEFAULT_BUCKET_SIZE 1310720
|
||||||
#define DEFAULT_MAP_CAPACITY 1310720
|
#define DEFAULT_MAP_CAPACITY 1310720
|
||||||
#define DEFAULT_MAP_SIZE (DEFAULT_MAP_CAPACITY * 10)
|
#define DEFAULT_MAP_SIZE (DEFAULT_MAP_CAPACITY * 10)
|
||||||
#define ROWS_PER_MILLISECOND 1
|
#define ROWS_PER_MILLISECOND 1
|
||||||
#define MAX_NUM_SCALABLE_BF 100000
|
#define MAX_NUM_SCALABLE_BF 100000
|
||||||
#define MIN_NUM_SCALABLE_BF 10
|
#define MIN_NUM_SCALABLE_BF 10
|
||||||
#define DEFAULT_PREADD_BUCKET 1
|
#define DEFAULT_PREADD_BUCKET 1
|
||||||
#define MAX_INTERVAL MILLISECOND_PER_MINUTE
|
#define MAX_INTERVAL MILLISECOND_PER_MINUTE
|
||||||
#define MIN_INTERVAL (MILLISECOND_PER_SECOND * 10)
|
#define MIN_INTERVAL (MILLISECOND_PER_SECOND * 10)
|
||||||
#define DEFAULT_EXPECTED_ENTRIES 10000
|
#define DEFAULT_EXPECTED_ENTRIES 10000
|
||||||
|
|
||||||
static int64_t adjustExpEntries(int64_t entries) {
|
static int64_t adjustExpEntries(int64_t entries) { return TMIN(DEFAULT_EXPECTED_ENTRIES, entries); }
|
||||||
return TMIN(DEFAULT_EXPECTED_ENTRIES, entries);
|
|
||||||
}
|
|
||||||
|
|
||||||
static void windowSBfAdd(SUpdateInfo *pInfo, uint64_t count) {
|
static void windowSBfAdd(SUpdateInfo *pInfo, uint64_t count) {
|
||||||
if (pInfo->numSBFs < count) {
|
if (pInfo->numSBFs < count) {
|
||||||
count = pInfo->numSBFs;
|
count = pInfo->numSBFs;
|
||||||
}
|
}
|
||||||
for (uint64_t i = 0; i < count; ++i) {
|
for (uint64_t i = 0; i < count; ++i) {
|
||||||
int64_t rows = adjustExpEntries(pInfo->interval * ROWS_PER_MILLISECOND);
|
int64_t rows = adjustExpEntries(pInfo->interval * ROWS_PER_MILLISECOND);
|
||||||
SScalableBf *tsSBF = tScalableBfInit(rows, DEFAULT_FALSE_POSITIVE);
|
SScalableBf *tsSBF = tScalableBfInit(rows, DEFAULT_FALSE_POSITIVE);
|
||||||
taosArrayPush(pInfo->pTsSBFs, &tsSBF);
|
taosArrayPush(pInfo->pTsSBFs, &tsSBF);
|
||||||
}
|
}
|
||||||
|
@ -78,7 +76,7 @@ static int64_t adjustInterval(int64_t interval, int32_t precision) {
|
||||||
|
|
||||||
static int64_t adjustWatermark(int64_t adjInterval, int64_t originInt, int64_t watermark) {
|
static int64_t adjustWatermark(int64_t adjInterval, int64_t originInt, int64_t watermark) {
|
||||||
if (watermark <= adjInterval) {
|
if (watermark <= adjInterval) {
|
||||||
watermark = TMAX(originInt/adjInterval, 1) * adjInterval;
|
watermark = TMAX(originInt / adjInterval, 1) * adjInterval;
|
||||||
} else if (watermark > MAX_NUM_SCALABLE_BF * adjInterval) {
|
} else if (watermark > MAX_NUM_SCALABLE_BF * adjInterval) {
|
||||||
watermark = MAX_NUM_SCALABLE_BF * adjInterval;
|
watermark = MAX_NUM_SCALABLE_BF * adjInterval;
|
||||||
}/* else if (watermark < MIN_NUM_SCALABLE_BF * adjInterval) {
|
}/* else if (watermark < MIN_NUM_SCALABLE_BF * adjInterval) {
|
||||||
|
@ -158,11 +156,17 @@ static SScalableBf *getSBf(SUpdateInfo *pInfo, TSKEY ts) {
|
||||||
return res;
|
return res;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool updateInfoIsTableInserted(SUpdateInfo *pInfo, int64_t tbUid) {
|
||||||
|
void *pVal = taosHashGet(pInfo->pMap, &tbUid, sizeof(int64_t));
|
||||||
|
if (pVal || taosHashGetSize(pInfo->pMap) >= DEFAULT_MAP_SIZE) return true;
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
bool updateInfoIsUpdated(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts) {
|
bool updateInfoIsUpdated(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts) {
|
||||||
int32_t res = TSDB_CODE_FAILED;
|
int32_t res = TSDB_CODE_FAILED;
|
||||||
TSKEY* pMapMaxTs = taosHashGet(pInfo->pMap, &tableId, sizeof(uint64_t));
|
TSKEY *pMapMaxTs = taosHashGet(pInfo->pMap, &tableId, sizeof(uint64_t));
|
||||||
uint64_t index = ((uint64_t)tableId) % pInfo->numBuckets;
|
uint64_t index = ((uint64_t)tableId) % pInfo->numBuckets;
|
||||||
TSKEY maxTs = *(TSKEY *)taosArrayGet(pInfo->pTsBuckets, index);
|
TSKEY maxTs = *(TSKEY *)taosArrayGet(pInfo->pTsBuckets, index);
|
||||||
if (ts < maxTs - pInfo->watermark) {
|
if (ts < maxTs - pInfo->watermark) {
|
||||||
// this window has been closed.
|
// this window has been closed.
|
||||||
if (pInfo->pCloseWinSBF) {
|
if (pInfo->pCloseWinSBF) {
|
||||||
|
@ -178,42 +182,47 @@ bool updateInfoIsUpdated(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t size = taosHashGetSize(pInfo->pMap);
|
int32_t size = taosHashGetSize(pInfo->pMap);
|
||||||
if ( (!pMapMaxTs && size < DEFAULT_MAP_SIZE) || (pMapMaxTs && *pMapMaxTs < ts)) {
|
if ((!pMapMaxTs && size < DEFAULT_MAP_SIZE) || (pMapMaxTs && *pMapMaxTs < ts)) {
|
||||||
taosHashPut(pInfo->pMap, &tableId, sizeof(uint64_t), &ts, sizeof(TSKEY));
|
taosHashPut(pInfo->pMap, &tableId, sizeof(uint64_t), &ts, sizeof(TSKEY));
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
if ( !pMapMaxTs && maxTs < ts ) {
|
if (!pMapMaxTs && maxTs < ts) {
|
||||||
taosArraySet(pInfo->pTsBuckets, index, &ts);
|
taosArraySet(pInfo->pTsBuckets, index, &ts);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (ts < pInfo->minTS) {
|
if (ts < pInfo->minTS) {
|
||||||
qDebug("===stream===Update. tableId:%" PRIu64 ", maxTs:%" PRIu64 ", mapMaxTs:%" PRIu64 ", ts:%" PRIu64 , tableId, maxTs, *pMapMaxTs, ts);
|
qDebug("===stream===Update. tableId:%" PRIu64 ", maxTs:%" PRIu64 ", mapMaxTs:%" PRIu64 ", ts:%" PRIu64, tableId,
|
||||||
|
maxTs, *pMapMaxTs, ts);
|
||||||
return true;
|
return true;
|
||||||
} else if (res == TSDB_CODE_SUCCESS) {
|
} else if (res == TSDB_CODE_SUCCESS) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
qDebug("===stream===Update. tableId:%" PRIu64 ", maxTs:%" PRIu64 ", mapMaxTs:%" PRIu64 ", ts:%" PRIu64 , tableId, maxTs, *pMapMaxTs, ts);
|
qDebug("===stream===Update. tableId:%" PRIu64 ", maxTs:%" PRIu64 ", mapMaxTs:%" PRIu64 ", ts:%" PRIu64, tableId,
|
||||||
|
maxTs, *pMapMaxTs, ts);
|
||||||
// check from tsdb api
|
// check from tsdb api
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
void updateInfoSetScanRange(SUpdateInfo *pInfo, STimeWindow* pWin, uint64_t groupId, uint64_t version) {
|
void updateInfoSetScanRange(SUpdateInfo *pInfo, STimeWindow *pWin, uint64_t groupId, uint64_t version) {
|
||||||
qDebug("===stream===groupId:%" PRIu64 ", startTs:%" PRIu64 ", endTs:%" PRIu64 ", version:%" PRIu64 , groupId, pWin->skey, pWin->ekey, version);
|
qDebug("===stream===groupId:%" PRIu64 ", startTs:%" PRIu64 ", endTs:%" PRIu64 ", version:%" PRIu64, groupId,
|
||||||
|
pWin->skey, pWin->ekey, version);
|
||||||
pInfo->scanWindow = *pWin;
|
pInfo->scanWindow = *pWin;
|
||||||
pInfo->scanGroupId = groupId;
|
pInfo->scanGroupId = groupId;
|
||||||
pInfo->maxVersion = version;
|
pInfo->maxVersion = version;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool updateInfoIgnore(SUpdateInfo *pInfo, STimeWindow* pWin, uint64_t groupId, uint64_t version) {
|
bool updateInfoIgnore(SUpdateInfo *pInfo, STimeWindow *pWin, uint64_t groupId, uint64_t version) {
|
||||||
if (!pInfo) {
|
if (!pInfo) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
qDebug("===stream===check groupId:%" PRIu64 ", startTs:%" PRIu64 ", endTs:%" PRIu64 ", version:%" PRIu64 , groupId, pWin->skey, pWin->ekey, version);
|
qDebug("===stream===check groupId:%" PRIu64 ", startTs:%" PRIu64 ", endTs:%" PRIu64 ", version:%" PRIu64, groupId,
|
||||||
if (pInfo->scanGroupId == groupId && pInfo->scanWindow.skey <= pWin->skey &&
|
pWin->skey, pWin->ekey, version);
|
||||||
pWin->ekey <= pInfo->scanWindow.ekey && version <= pInfo->maxVersion ) {
|
if (pInfo->scanGroupId == groupId && pInfo->scanWindow.skey <= pWin->skey && pWin->ekey <= pInfo->scanWindow.ekey &&
|
||||||
qDebug("===stream===ignore groupId:%" PRIu64 ", startTs:%" PRIu64 ", endTs:%" PRIu64 ", version:%" PRIu64 , groupId, pWin->skey, pWin->ekey, version);
|
version <= pInfo->maxVersion) {
|
||||||
|
qDebug("===stream===ignore groupId:%" PRIu64 ", startTs:%" PRIu64 ", endTs:%" PRIu64 ", version:%" PRIu64, groupId,
|
||||||
|
pWin->skey, pWin->ekey, version);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
return false;
|
return false;
|
||||||
|
@ -261,7 +270,7 @@ int32_t updateInfoSerialize(void *buf, int32_t bufLen, const SUpdateInfo *pInfo)
|
||||||
int32_t size = taosArrayGetSize(pInfo->pTsBuckets);
|
int32_t size = taosArrayGetSize(pInfo->pTsBuckets);
|
||||||
if (tEncodeI32(&encoder, size) < 0) return -1;
|
if (tEncodeI32(&encoder, size) < 0) return -1;
|
||||||
for (int32_t i = 0; i < size; i++) {
|
for (int32_t i = 0; i < size; i++) {
|
||||||
TSKEY* pTs = (TSKEY*)taosArrayGet(pInfo->pTsBuckets, i);
|
TSKEY *pTs = (TSKEY *)taosArrayGet(pInfo->pTsBuckets, i);
|
||||||
if (tEncodeI64(&encoder, *pTs) < 0) return -1;
|
if (tEncodeI64(&encoder, *pTs) < 0) return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -270,7 +279,7 @@ int32_t updateInfoSerialize(void *buf, int32_t bufLen, const SUpdateInfo *pInfo)
|
||||||
int32_t sBfSize = taosArrayGetSize(pInfo->pTsSBFs);
|
int32_t sBfSize = taosArrayGetSize(pInfo->pTsSBFs);
|
||||||
if (tEncodeI32(&encoder, sBfSize) < 0) return -1;
|
if (tEncodeI32(&encoder, sBfSize) < 0) return -1;
|
||||||
for (int32_t i = 0; i < sBfSize; i++) {
|
for (int32_t i = 0; i < sBfSize; i++) {
|
||||||
SScalableBf* pSBf = taosArrayGetP(pInfo->pTsSBFs, i);
|
SScalableBf *pSBf = taosArrayGetP(pInfo->pTsSBFs, i);
|
||||||
if (tScalableBfEncode(pSBf, &encoder) < 0) return -1;
|
if (tScalableBfEncode(pSBf, &encoder) < 0) return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -278,17 +287,17 @@ int32_t updateInfoSerialize(void *buf, int32_t bufLen, const SUpdateInfo *pInfo)
|
||||||
if (tEncodeI64(&encoder, pInfo->interval) < 0) return -1;
|
if (tEncodeI64(&encoder, pInfo->interval) < 0) return -1;
|
||||||
if (tEncodeI64(&encoder, pInfo->watermark) < 0) return -1;
|
if (tEncodeI64(&encoder, pInfo->watermark) < 0) return -1;
|
||||||
if (tEncodeI64(&encoder, pInfo->minTS) < 0) return -1;
|
if (tEncodeI64(&encoder, pInfo->minTS) < 0) return -1;
|
||||||
|
|
||||||
if (tScalableBfEncode(pInfo->pCloseWinSBF, &encoder) < 0) return -1;
|
if (tScalableBfEncode(pInfo->pCloseWinSBF, &encoder) < 0) return -1;
|
||||||
|
|
||||||
int32_t mapSize = taosHashGetSize(pInfo->pMap);
|
int32_t mapSize = taosHashGetSize(pInfo->pMap);
|
||||||
if (tEncodeI32(&encoder, mapSize) < 0) return -1;
|
if (tEncodeI32(&encoder, mapSize) < 0) return -1;
|
||||||
void* pIte = NULL;
|
void *pIte = NULL;
|
||||||
size_t keyLen = 0;
|
size_t keyLen = 0;
|
||||||
while ((pIte = taosHashIterate(pInfo->pMap, pIte)) != NULL) {
|
while ((pIte = taosHashIterate(pInfo->pMap, pIte)) != NULL) {
|
||||||
void* key = taosHashGetKey(pIte, &keyLen);
|
void *key = taosHashGetKey(pIte, &keyLen);
|
||||||
if (tEncodeU64(&encoder, *(uint64_t*)key) < 0) return -1;
|
if (tEncodeU64(&encoder, *(uint64_t *)key) < 0) return -1;
|
||||||
if (tEncodeI64(&encoder, *(TSKEY*)pIte) < 0) return -1;
|
if (tEncodeI64(&encoder, *(TSKEY *)pIte) < 0) return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (tEncodeI64(&encoder, pInfo->scanWindow.skey) < 0) return -1;
|
if (tEncodeI64(&encoder, pInfo->scanWindow.skey) < 0) return -1;
|
||||||
|
@ -311,7 +320,7 @@ int32_t updateInfoDeserialize(void *buf, int32_t bufLen, SUpdateInfo *pInfo) {
|
||||||
|
|
||||||
int32_t size = 0;
|
int32_t size = 0;
|
||||||
if (tDecodeI32(&decoder, &size) < 0) return -1;
|
if (tDecodeI32(&decoder, &size) < 0) return -1;
|
||||||
pInfo->pTsBuckets = taosArrayInit(size, sizeof(TSKEY));
|
pInfo->pTsBuckets = taosArrayInit(size, sizeof(TSKEY));
|
||||||
TSKEY ts = INT64_MIN;
|
TSKEY ts = INT64_MIN;
|
||||||
for (int32_t i = 0; i < size; i++) {
|
for (int32_t i = 0; i < size; i++) {
|
||||||
if (tDecodeI64(&decoder, &ts) < 0) return -1;
|
if (tDecodeI64(&decoder, &ts) < 0) return -1;
|
||||||
|
@ -324,7 +333,7 @@ int32_t updateInfoDeserialize(void *buf, int32_t bufLen, SUpdateInfo *pInfo) {
|
||||||
if (tDecodeI32(&decoder, &sBfSize) < 0) return -1;
|
if (tDecodeI32(&decoder, &sBfSize) < 0) return -1;
|
||||||
pInfo->pTsSBFs = taosArrayInit(sBfSize, sizeof(void *));
|
pInfo->pTsSBFs = taosArrayInit(sBfSize, sizeof(void *));
|
||||||
for (int32_t i = 0; i < sBfSize; i++) {
|
for (int32_t i = 0; i < sBfSize; i++) {
|
||||||
SScalableBf* pSBf = tScalableBfDecode(&decoder);
|
SScalableBf *pSBf = tScalableBfDecode(&decoder);
|
||||||
if (!pSBf) return -1;
|
if (!pSBf) return -1;
|
||||||
taosArrayPush(pInfo->pTsSBFs, &pSBf);
|
taosArrayPush(pInfo->pTsSBFs, &pSBf);
|
||||||
}
|
}
|
||||||
|
@ -337,11 +346,11 @@ int32_t updateInfoDeserialize(void *buf, int32_t bufLen, SUpdateInfo *pInfo) {
|
||||||
|
|
||||||
int32_t mapSize = 0;
|
int32_t mapSize = 0;
|
||||||
if (tDecodeI32(&decoder, &mapSize) < 0) return -1;
|
if (tDecodeI32(&decoder, &mapSize) < 0) return -1;
|
||||||
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT);
|
||||||
pInfo->pMap = taosHashInit(mapSize, hashFn, true, HASH_NO_LOCK);
|
pInfo->pMap = taosHashInit(mapSize, hashFn, true, HASH_NO_LOCK);
|
||||||
uint64_t uid = 0;
|
uint64_t uid = 0;
|
||||||
ts = INT64_MIN;
|
ts = INT64_MIN;
|
||||||
for(int32_t i = 0; i < mapSize; i++) {
|
for (int32_t i = 0; i < mapSize; i++) {
|
||||||
if (tDecodeU64(&decoder, &uid) < 0) return -1;
|
if (tDecodeU64(&decoder, &uid) < 0) return -1;
|
||||||
if (tDecodeI64(&decoder, &ts) < 0) return -1;
|
if (tDecodeI64(&decoder, &ts) < 0) return -1;
|
||||||
taosHashPut(pInfo->pMap, &uid, sizeof(uint64_t), &ts, sizeof(TSKEY));
|
taosHashPut(pInfo->pMap, &uid, sizeof(uint64_t), &ts, sizeof(TSKEY));
|
||||||
|
|
Loading…
Reference in New Issue