add updateExpiredWindow test case
This commit is contained in:
parent
693e8784cb
commit
1fbd79bafb
|
@ -276,6 +276,8 @@ int32_t tsdbGetTableGroupFromIdList(STsdb *tsdb, SArray *pTableIdList, STableGro
|
|||
*/
|
||||
void tsdbCleanupReadHandle(tsdbReaderT queryHandle);
|
||||
|
||||
int32_t tdScanAndConvertSubmitMsg(SSubmitReq *pMsg);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -227,7 +227,35 @@ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitReq *pMsg) {
|
||||
int32_t tdScanAndConvertSubmitMsg(SSubmitReq *pMsg) {
|
||||
ASSERT(pMsg != NULL);
|
||||
SSubmitMsgIter msgIter = {0};
|
||||
SSubmitBlk *pBlock = NULL;
|
||||
SSubmitBlkIter blkIter = {0};
|
||||
STSRow *row = NULL;
|
||||
|
||||
terrno = TSDB_CODE_SUCCESS;
|
||||
pMsg->length = htonl(pMsg->length);
|
||||
pMsg->numOfBlocks = htonl(pMsg->numOfBlocks);
|
||||
|
||||
if (tInitSubmitMsgIter(pMsg, &msgIter) < 0) return -1;
|
||||
while (true) {
|
||||
if (tGetSubmitMsgNext(&msgIter, &pBlock) < 0) return -1;
|
||||
if (pBlock == NULL) break;
|
||||
|
||||
pBlock->uid = htobe64(pBlock->uid);
|
||||
pBlock->suid = htobe64(pBlock->suid);
|
||||
pBlock->sversion = htonl(pBlock->sversion);
|
||||
pBlock->dataLen = htonl(pBlock->dataLen);
|
||||
pBlock->schemaLen = htonl(pBlock->schemaLen);
|
||||
pBlock->numOfRows = htons(pBlock->numOfRows);
|
||||
}
|
||||
|
||||
if (terrno != TSDB_CODE_SUCCESS) return -1;
|
||||
return 0;
|
||||
}
|
||||
|
||||
int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitReq *pMsg) {
|
||||
ASSERT(pMsg != NULL);
|
||||
// STsdbMeta * pMeta = pTsdb->tsdbMeta;
|
||||
SSubmitMsgIter msgIter = {0};
|
||||
|
|
|
@ -394,7 +394,8 @@ static STimeWindow getActiveTimeWindowX(int64_t ts, SInterval* pInterval) {
|
|||
static int32_t tsdbSetExpiredWindow(STsdb *pTsdb, SHashObj *pItemsHash, int64_t indexUid, int64_t winSKey) {
|
||||
SSmaStatItem *pItem = taosHashGet(pItemsHash, &indexUid, sizeof(indexUid));
|
||||
if (pItem == NULL) {
|
||||
pItem = tsdbNewSmaStatItem(TSDB_SMA_STAT_EXPIRED); // TODO use the real state
|
||||
// TODO: use TSDB_SMA_STAT_EXPIRED and update by stream computing later
|
||||
pItem = tsdbNewSmaStatItem(TSDB_SMA_STAT_OK); // TODO use the real state
|
||||
if (pItem == NULL) {
|
||||
// Response to stream computing: OOM
|
||||
// For query, if the indexUid not found, the TSDB should tell query module to query raw TS data.
|
||||
|
@ -419,6 +420,9 @@ static int32_t tsdbSetExpiredWindow(STsdb *pTsdb, SHashObj *pItemsHash, int64_t
|
|||
free(pItem);
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
} else if ((pItem = *(SSmaStatItem **)pItem) == NULL) {
|
||||
terrno = TSDB_CODE_INVALID_PTR;
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
int8_t state = TSDB_SMA_STAT_EXPIRED;
|
||||
|
@ -491,41 +495,39 @@ int32_t tsdbUpdateExpiredWindowImpl(STsdb *pTsdb, const char *msg) {
|
|||
TASSERT(pEnv != NULL && pStat != NULL && pItemsHash != NULL);
|
||||
|
||||
|
||||
// basic procedure
|
||||
// TODO: optimization
|
||||
tsdbRefSmaStat(pTsdb, pStat);
|
||||
|
||||
SSubmitMsgIter msgIter = {0};
|
||||
SSubmitBlk *pBlock = NULL;
|
||||
SInterval interval = {0};
|
||||
|
||||
|
||||
if (tInitSubmitMsgIter(pMsg, &msgIter) != TSDB_CODE_SUCCESS) {
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
// basic procedure
|
||||
// TODO: optimization
|
||||
tsdbRefSmaStat(pTsdb, pStat);
|
||||
|
||||
while (true) {
|
||||
tGetSubmitMsgNext(&msgIter, &pBlock);
|
||||
if (pBlock == NULL) break;
|
||||
|
||||
int64_t suid = htobe64(pBlock->uid);
|
||||
STSmaWrapper *pSW = NULL;
|
||||
STSma *pTSma = NULL;
|
||||
|
||||
SSubmitBlkIter blkIter = {0};
|
||||
if (tInitSubmitBlkIter(pBlock, &blkIter) != TSDB_CODE_SUCCESS) {
|
||||
tdFreeTSmaWrapper(pSW);
|
||||
break;
|
||||
}
|
||||
|
||||
while (true) {
|
||||
SSubmitBlkIter blkIter = {0};
|
||||
if (tInitSubmitBlkIter(pBlock, &blkIter) != TSDB_CODE_SUCCESS) {
|
||||
tdFreeTSmaWrapper(pSW);
|
||||
break;
|
||||
}
|
||||
STSRow *row = tGetSubmitBlkNext(&blkIter);
|
||||
if (row == NULL) {
|
||||
tdFreeTSmaWrapper(pSW);
|
||||
break;
|
||||
}
|
||||
if(pSW == NULL) {
|
||||
if((pSW =metaGetSmaInfoByTable(REPO_META(pTsdb), suid)) == NULL) {
|
||||
if((pSW =metaGetSmaInfoByTable(REPO_META(pTsdb), pBlock->suid)) == NULL) {
|
||||
break;
|
||||
}
|
||||
if((pSW->number) <= 0 || (pSW->tSma == NULL)) {
|
||||
|
|
|
@ -367,15 +367,49 @@ TEST(testCase, tSma_Data_Insert_Query_Test) {
|
|||
pTsdb->pTfs = tfsOpen(&pDisks, numOfDisks);
|
||||
ASSERT_NE(pTsdb->pTfs, nullptr);
|
||||
|
||||
char *msg = (char *)calloc(1, 100);
|
||||
ASSERT_NE(msg, nullptr);
|
||||
ASSERT_EQ(tsdbUpdateSmaWindow(pTsdb, msg), 0);
|
||||
// generate SSubmitReq msg and update expired window
|
||||
int16_t schemaVer = 0;
|
||||
uint32_t mockRowLen = sizeof(STSRow);
|
||||
uint32_t mockRowNum = 2;
|
||||
uint32_t mockBlkNum = 2;
|
||||
uint32_t msgLen = sizeof(SSubmitReq) + mockBlkNum * sizeof(SSubmitBlk) + mockBlkNum * mockRowNum * mockRowLen;
|
||||
|
||||
SSubmitReq *pMsg = (SSubmitReq *)calloc(1, msgLen);
|
||||
ASSERT_NE(pMsg, nullptr);
|
||||
pMsg->version = htobe64(schemaVer);
|
||||
pMsg->numOfBlocks = htonl(mockBlkNum);
|
||||
pMsg->length = htonl(msgLen);
|
||||
|
||||
SSubmitBlk *pBlk = NULL;
|
||||
STSRow *pRow = NULL;
|
||||
TSKEY now = taosGetTimestamp(pTsdb->config.precision);
|
||||
|
||||
for (uint32_t b = 0; b < mockBlkNum; ++b) {
|
||||
pBlk = (SSubmitBlk *)POINTER_SHIFT(pMsg, sizeof(SSubmitReq) + b * (sizeof(SSubmitBlk) + mockRowNum * mockRowLen));
|
||||
pBlk->uid = htobe64(tbUid);
|
||||
pBlk->suid = htobe64(tbUid);
|
||||
pBlk->sversion = htonl(schemaVer);
|
||||
pBlk->padding = htonl(0);
|
||||
pBlk->schemaLen = htonl(0);
|
||||
pBlk->numOfRows = htons(mockRowNum);
|
||||
pBlk->dataLen = htonl(mockRowNum * mockRowLen);
|
||||
for (uint32_t r = 0; r < mockRowNum; ++r) {
|
||||
pRow = (STSRow *)POINTER_SHIFT(pBlk, sizeof(SSubmitBlk) + r * mockRowLen);
|
||||
pRow->len = mockRowLen;
|
||||
pRow->ts = now + b * 1000 + r * 1000;
|
||||
pRow->sver = schemaVer;
|
||||
}
|
||||
}
|
||||
|
||||
ASSERT_EQ(tdScanAndConvertSubmitMsg(pMsg), TSDB_CODE_SUCCESS);
|
||||
|
||||
ASSERT_EQ(tsdbUpdateSmaWindow(pTsdb, (const char *)pMsg), 0);
|
||||
|
||||
// init
|
||||
int32_t allocCnt = 0;
|
||||
int32_t allocStep = 16384;
|
||||
int32_t buffer = 1024;
|
||||
void * buf = NULL;
|
||||
void *buf = NULL;
|
||||
ASSERT_EQ(tsdbMakeRoom(&buf, allocStep), 0);
|
||||
int32_t bufSize = taosTSizeof(buf);
|
||||
int32_t numOfTables = 10;
|
||||
|
@ -421,36 +455,36 @@ TEST(testCase, tSma_Data_Insert_Query_Test) {
|
|||
pTbData->dataLen = (tableDataLen - sizeof(STSmaTbData));
|
||||
len += tableDataLen;
|
||||
// printf("bufSize=%d, len=%d, len of table[%d]=%d\n", bufSize, len, t, tableDataLen);
|
||||
}
|
||||
pSmaData->dataLen = (len - sizeof(STSmaDataWrapper));
|
||||
|
||||
ASSERT_GE(bufSize, pSmaData->dataLen);
|
||||
|
||||
// execute
|
||||
ASSERT_EQ(tsdbInsertTSmaData(pTsdb, (char *)pSmaData), TSDB_CODE_SUCCESS);
|
||||
|
||||
// step 3: query
|
||||
uint32_t checkDataCnt = 0;
|
||||
for (int32_t t = 0; t < numOfTables; ++t) {
|
||||
for (col_id_t c = 0; c < numOfCols; ++c) {
|
||||
ASSERT_EQ(tsdbGetTSmaData(pTsdb, NULL, indexUid1, interval1, intervalUnit1, tbUid + t,
|
||||
c + PRIMARYKEY_TIMESTAMP_COL_ID, skey1, 1),
|
||||
TSDB_CODE_SUCCESS);
|
||||
++checkDataCnt;
|
||||
}
|
||||
pSmaData->dataLen = (len - sizeof(STSmaDataWrapper));
|
||||
|
||||
ASSERT_GE(bufSize, pSmaData->dataLen);
|
||||
|
||||
// execute
|
||||
ASSERT_EQ(tsdbInsertTSmaData(pTsdb, (char *)pSmaData), TSDB_CODE_SUCCESS);
|
||||
|
||||
// step 3: query
|
||||
uint32_t checkDataCnt = 0;
|
||||
for (int32_t t = 0; t < numOfTables; ++t) {
|
||||
for (col_id_t c = 0; c < numOfCols; ++c) {
|
||||
ASSERT_EQ(tsdbGetTSmaData(pTsdb, NULL, indexUid1, interval1, intervalUnit1, tbUid + t,
|
||||
c + PRIMARYKEY_TIMESTAMP_COL_ID, skey1, 1),
|
||||
TSDB_CODE_SUCCESS);
|
||||
++checkDataCnt;
|
||||
}
|
||||
}
|
||||
|
||||
printf("%s:%d The sma data check count for insert and query is %" PRIu32 "\n", __FILE__, __LINE__, checkDataCnt);
|
||||
|
||||
// release data
|
||||
tfree(pMsg);
|
||||
taosTZfree(buf);
|
||||
// release meta
|
||||
tdDestroyTSma(&tSma);
|
||||
tfsClose(pTsdb->pTfs);
|
||||
tsdbClose(pTsdb);
|
||||
metaClose(pMeta);
|
||||
}
|
||||
|
||||
printf("%s:%d The sma data check count for insert and query is %" PRIu32 "\n", __FILE__, __LINE__, checkDataCnt);
|
||||
|
||||
// release data
|
||||
tfree(msg);
|
||||
taosTZfree(buf);
|
||||
// release meta
|
||||
tdDestroyTSma(&tSma);
|
||||
tfsClose(pTsdb->pTfs);
|
||||
tsdbClose(pTsdb);
|
||||
metaClose(pMeta);
|
||||
}
|
||||
#endif
|
||||
|
||||
#pragma GCC diagnostic pop
|
Loading…
Reference in New Issue