Merge branch 'develop' into feature/slguan
This commit is contained in:
commit
5506dbe136
|
@ -169,7 +169,7 @@ SELECT function<field_name>,…
|
|||
|
||||
以温度传感器采集时序数据作为例,示范STable的使用。 在这个例子中,对每个温度计都会建立一张表,表名为温度计的ID,温度计读数的时刻记为ts,采集的值记为degree。通过tags给每个采集器打上不同的标签,其中记录温度计的地区和类型,以方便我们后面的查询。所有温度计的采集量都一样,因此我们用STable来定义表结构。
|
||||
|
||||
###定义STable表结构并使用它创建子表
|
||||
###1:定义STable表结构并使用它创建子表
|
||||
|
||||
创建STable语句如下:
|
||||
|
||||
|
@ -189,7 +189,7 @@ CREATE TABLE therm4 USING thermometer TAGS ('shanghai', 3);
|
|||
|
||||
其中therm1,therm2,therm3,therm4是超级表thermometer四个具体的子表,也即普通的Table。以therm1为例,它表示采集器therm1的数据,表结构完全由thermometer定义,标签location=”beijing”, type=1表示therm1的地区是北京,类型是第1类的温度计。
|
||||
|
||||
###写入数据
|
||||
###2:写入数据
|
||||
|
||||
注意,写入数据时不能直接对STable操作,而是要对每张子表进行操作。我们分别向四张表therm1,therm2, therm3, therm4写入一条数据,写入语句如下:
|
||||
|
||||
|
@ -200,7 +200,7 @@ INSERT INTO therm3 VALUES ('2018-01-01 00:00:00.000', 24);
|
|||
INSERT INTO therm4 VALUES ('2018-01-01 00:00:00.000', 23);
|
||||
```
|
||||
|
||||
### 按标签聚合查询
|
||||
###3:按标签聚合查询
|
||||
|
||||
查询位于北京(beijing)和天津(tianjing)两个地区的温度传感器采样值的数量count(*)、平均温度avg(degree)、最高温度max(degree)、最低温度min(degree),并将结果按所处地域(location)和传感器类型(type)进行聚合。
|
||||
|
||||
|
@ -211,7 +211,7 @@ WHERE location='beijing' or location='tianjin'
|
|||
GROUP BY location, type
|
||||
```
|
||||
|
||||
### 按时间周期聚合查询
|
||||
###4:按时间周期聚合查询
|
||||
|
||||
查询仅位于北京以外地区的温度传感器最近24小时(24h)采样值的数量count(*)、平均温度avg(degree)、最高温度max(degree)和最低温度min(degree),将采集结果按照10分钟为周期进行聚合,并将结果按所处地域(location)和传感器类型(type)再次进行聚合。
|
||||
|
||||
|
|
|
@ -172,6 +172,7 @@ int vnodeFreeCacheBlock(SCacheBlock *pCacheBlock) {
|
|||
SCachePool *pPool = (SCachePool *)vnodeList[pObj->vnode].pCachePool;
|
||||
if (pCacheBlock->notFree) {
|
||||
pPool->notFreeSlots--;
|
||||
pInfo->unCommittedBlocks--;
|
||||
dTrace("vid:%d sid:%d id:%s, cache block is not free, slot:%d, index:%d notFreeSlots:%d",
|
||||
pObj->vnode, pObj->sid, pObj->meterId, pCacheBlock->slot, pCacheBlock->index, pPool->notFreeSlots);
|
||||
}
|
||||
|
|
|
@ -479,7 +479,7 @@ int vnodeImportToFile(SImportInfo *pImport) {
|
|||
slot = (slot + 1 + pInfo->maxBlocks) % pInfo->maxBlocks;
|
||||
}
|
||||
|
||||
// last slot, the uncommitted slots shall be shifted
|
||||
// last slot, the uncommitted slots shall be shifted, a cache block may have empty rows
|
||||
SCacheBlock *pCacheBlock = pInfo->cacheBlocks[slot];
|
||||
int points = pCacheBlock->numOfPoints - pInfo->commitPoint;
|
||||
if (points > 0) {
|
||||
|
@ -568,7 +568,7 @@ int vnodeImportToCache(SImportInfo *pImport, char *payload, int rows) {
|
|||
}
|
||||
}
|
||||
|
||||
// copy the overwritten data into buffer
|
||||
// copy the overwritten data into buffer, merge cache blocks
|
||||
tpoints = rows;
|
||||
pos = pImport->pos;
|
||||
slot = pImport->slot;
|
||||
|
@ -603,6 +603,19 @@ int vnodeImportToCache(SImportInfo *pImport, char *payload, int rows) {
|
|||
pos = 0;
|
||||
tpoints -= points;
|
||||
|
||||
if (tpoints == 0) {
|
||||
// free the rest of cache blocks, since cache blocks are merged
|
||||
int currentSlot = slot;
|
||||
while (slot != pInfo->currentSlot) {
|
||||
slot = (slot + 1) % pInfo->maxBlocks;
|
||||
pCacheBlock = pInfo->cacheBlocks[slot];
|
||||
vnodeFreeCacheBlock(pCacheBlock);
|
||||
}
|
||||
|
||||
pInfo->currentSlot = currentSlot;
|
||||
slot = currentSlot; // make sure to exit from the while loop
|
||||
}
|
||||
|
||||
if (slot == pInfo->currentSlot) break;
|
||||
slot = (slot + 1) % pInfo->maxBlocks;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue