mark expired query window by SSmaStat
This commit is contained in:
parent
aad01e0155
commit
a455105de3
|
@ -86,7 +86,12 @@ static bool tsdbSetAndOpenTSmaFile(STSmaReadH *pReadH, STSma *param, STimeWin
|
||||||
|
|
||||||
static int32_t tsdbInitSmaStat(SSmaStat **pSmaStat) {
|
static int32_t tsdbInitSmaStat(SSmaStat **pSmaStat) {
|
||||||
ASSERT(pSmaStat != NULL);
|
ASSERT(pSmaStat != NULL);
|
||||||
// TODO: lock and create when each put, or create during tsdbNew.
|
|
||||||
|
if (*pSmaStat != NULL) { // no lock
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: lock. lazy mode when update expired window, or hungry mode during tsdbNew.
|
||||||
if (*pSmaStat == NULL) {
|
if (*pSmaStat == NULL) {
|
||||||
*pSmaStat = (SSmaStat *)calloc(1, sizeof(SSmaStat));
|
*pSmaStat = (SSmaStat *)calloc(1, sizeof(SSmaStat));
|
||||||
if (*pSmaStat == NULL) {
|
if (*pSmaStat == NULL) {
|
||||||
|
@ -149,6 +154,8 @@ int32_t tsdbUpdateExpiredWindow(STsdb *pTsdb, char *msg) {
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
tsdbInitSmaStat(&pTsdb->pSmaStat); // lazy mode
|
||||||
|
|
||||||
// TODO: decode the msg => start
|
// TODO: decode the msg => start
|
||||||
const char * indexName = SMA_TEST_INDEX_NAME;
|
const char * indexName = SMA_TEST_INDEX_NAME;
|
||||||
const int32_t SMA_TEST_EXPIRED_WINDOW_SIZE = 10;
|
const int32_t SMA_TEST_EXPIRED_WINDOW_SIZE = 10;
|
||||||
|
@ -157,8 +164,8 @@ int32_t tsdbUpdateExpiredWindow(STsdb *pTsdb, char *msg) {
|
||||||
for (int32_t i = 0; i < SMA_TEST_EXPIRED_WINDOW_SIZE; ++i) {
|
for (int32_t i = 0; i < SMA_TEST_EXPIRED_WINDOW_SIZE; ++i) {
|
||||||
expiredWindows[i] = now + i;
|
expiredWindows[i] = now + i;
|
||||||
}
|
}
|
||||||
// TODO: decode the msg <= end
|
|
||||||
|
|
||||||
|
// TODO: decode the msg <= end
|
||||||
SHashObj *pItemsHash = pTsdb->pSmaStat->smaStatItems;
|
SHashObj *pItemsHash = pTsdb->pSmaStat->smaStatItems;
|
||||||
|
|
||||||
SSmaStatItem *pItem = (SSmaStatItem *)taosHashGet(pItemsHash, indexName, strlen(indexName));
|
SSmaStatItem *pItem = (SSmaStatItem *)taosHashGet(pItemsHash, indexName, strlen(indexName));
|
||||||
|
@ -181,8 +188,12 @@ int32_t tsdbUpdateExpiredWindow(STsdb *pTsdb, char *msg) {
|
||||||
int8_t state = TSDB_SMA_STAT_EXPIRED;
|
int8_t state = TSDB_SMA_STAT_EXPIRED;
|
||||||
for (int32_t i = 0; i < SMA_TEST_EXPIRED_WINDOW_SIZE; ++i) {
|
for (int32_t i = 0; i < SMA_TEST_EXPIRED_WINDOW_SIZE; ++i) {
|
||||||
if (taosHashPut(pItem->expiredWindows, &expiredWindows[i], sizeof(TSKEY), &state, sizeof(state)) != 0) {
|
if (taosHashPut(pItem->expiredWindows, &expiredWindows[i], sizeof(TSKEY), &state, sizeof(state)) != 0) {
|
||||||
// If error occurs during put expired windows, remove the smaIndex from pTsdb->pSmaStat, thus TSDB would tell
|
// If error occurs during taosHashPut expired windows, remove the smaIndex from pTsdb->pSmaStat, thus TSDB would
|
||||||
// query module to query raw TS data.
|
// tell query module to query raw TS data.
|
||||||
|
// N.B.
|
||||||
|
// 1) It is assumed to be extemely little probability event of fail to taosHashPut.
|
||||||
|
// 2) This would solve the inconsistency to some extent, but not completely, unless we record all expired
|
||||||
|
// windows failed to put into hash table.
|
||||||
taosHashCleanup(pItem->expiredWindows);
|
taosHashCleanup(pItem->expiredWindows);
|
||||||
taosHashRemove(pItemsHash, indexName, sizeof(indexName));
|
taosHashRemove(pItemsHash, indexName, sizeof(indexName));
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
|
@ -612,6 +623,22 @@ static bool tsdbSetAndOpenTSmaFile(STSmaReadH *pReadH, STSma *param, STimeWindow
|
||||||
* @return int32_t
|
* @return int32_t
|
||||||
*/
|
*/
|
||||||
int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, STSma *param, STSmaData *pData, STimeWindow *queryWin, int32_t nMaxResult) {
|
int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, STSma *param, STSmaData *pData, STimeWindow *queryWin, int32_t nMaxResult) {
|
||||||
|
const char *indexName = param->indexName;
|
||||||
|
|
||||||
|
SSmaStatItem *pItem = (SSmaStatItem *)taosHashGet(pTsdb->pSmaStat->smaStatItems, indexName, strlen(indexName));
|
||||||
|
if (pItem == NULL) {
|
||||||
|
// mark all window as expired and notify query module to query raw TS data.
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t nQueryWin = 0;
|
||||||
|
for (int32_t n = 0; n < nQueryWin; ++n) {
|
||||||
|
TSKEY thisWindow = n;
|
||||||
|
if (taosHashGet(pItem->expiredWindows, &thisWindow, sizeof(thisWindow)) != NULL) {
|
||||||
|
// TODO: mark this window as expired.
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
STSmaReadH tReadH = {0};
|
STSmaReadH tReadH = {0};
|
||||||
tsdbInitTSmaReadH(&tReadH, pTsdb, param, pData);
|
tsdbInitTSmaReadH(&tReadH, pTsdb, param, pData);
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue