Merge branch '3.0' of https://github.com/taosdata/TDengine into 3.0
This commit is contained in:
commit
0da9cdafac
|
@ -56,6 +56,8 @@ If there are other TSMA created based on the TSMA being deleted, the delete oper
|
|||
## TSMA Calculation
|
||||
The calculation result of TSMA is a super table in the same database as the original table, but it is not visible to users. It cannot be deleted and will be automatically deleted when `DROP TSMA` is executed. The calculation of TSMA is done through stream computing, which is a background asynchronous process. The calculation result of TSMA is not guaranteed to be real-time, but it can guarantee eventual correctness.
|
||||
|
||||
If there is no data in the original subtable, the corresponding output subtable may not be created. Therefore, in count queries, even if `countAlwaysReturnValue` is configured, the result of this subtable will not be returned.
|
||||
|
||||
When there is a large amount of historical data, after creating TSMA, the stream computing will first calculate the historical data. During this period, newly created TSMA will not be used. The calculation will be automatically recalculated when data updates, deletions, or expired data arrive. During the recalculation period, the TSMA query results are not guaranteed to be real-time. If you want to query real-time data, you can use the hint `/*+ skip_tsma() */` in the SQL statement or disable the `querySmaOptimize` parameter to query from the original data.
|
||||
|
||||
## Using and Limitations of TSMA
|
||||
|
|
|
@ -206,11 +206,11 @@ Please note the `taoskeeper` needs to be installed and running to create the `lo
|
|||
|
||||
| Attribute | Description |
|
||||
| ---------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
|
||||
| Applicable | Server only |
|
||||
| Applicable | Server and Client |
|
||||
| Meaning | count()/hyperloglog() return value or not if the input data is empty or NULL |
|
||||
| Vlue Range | 0: Return empty line, 1: Return 0 |
|
||||
| Value Range | 0: Return empty line, 1: Return 0 |
|
||||
| Default | 1 |
|
||||
| Notes | When this parameter is setting to 1, for queries containing GROUP BY, PARTITION BY and INTERVAL clause, and input data in certain groups or windows is empty or NULL, the corresponding groups or windows have no return values |
|
||||
| Notes | When this parameter is setting to 1, for queries containing INTERVAL clause or the queries using TSMA, and input data in certain groups or windows is empty or NULL, the corresponding groups or windows have no return values. Server and client use the same value|
|
||||
|
||||
### maxNumOfDistinctRes
|
||||
|
||||
|
|
|
@ -55,6 +55,8 @@ DROP TSMA [db_name.]tsma_name;
|
|||
## TSMA的计算
|
||||
TSMA的计算结果为与原始表相同库下的一张超级表, 此表用户不可见. 不可删除, 在`DROP TSMA`时自动删除. TSMA的计算是通过流计算完成的, 此过程为后台异步过程, TSMA的计算结果不保证实时性, 但可以保证最终正确性.
|
||||
|
||||
TSMA计算时若原始子表内没有数据, 则可能不会创建对应的输出子表, 因此在count查询中, 即使配置了`countAlwaysReturnValue`, 也不会返回该表的结果.
|
||||
|
||||
当存在大量历史数据时, 创建TSMA之后, 流计算将会首先计算历史数据, 此期间新创建的TSMA不会被使用. 数据更新删除或者过期数据到来时自动重新计算影响部分数据。 在重新计算期间 TSMA 查询结果不保证实时性。若希望查询实时数据, 可以通过在 SQL 中添加 hint `/*+ skip_tsma() */` 或者关闭参数`querySmaOptimize`从原始数据查询。
|
||||
|
||||
## TSMA的使用与限制
|
||||
|
|
|
@ -224,11 +224,11 @@ taos -C
|
|||
|
||||
| 属性 | 说明 |
|
||||
| -------- | ---------------------------------------------------------------------------------------------------------------------------------------------- |
|
||||
| 适用范围 | 仅服务端适用 |
|
||||
| 适用范围 | 服务端和客户端适用 |
|
||||
| 含义 | count/hyperloglog函数在输入数据为空或者NULL的情况下是否返回值 |
|
||||
| 取值范围 | 0:返回空行,1:返回 0 |
|
||||
| 缺省值 | 1 |
|
||||
| 补充说明 | 该参数设置为 1 时,如果查询中含有 GROUP BY,PARTITION BY 以及 INTERVAL 子句且相应的组或窗口内数据为空或者NULL, 对应的组或窗口将不返回查询结果 |
|
||||
| 补充说明 | 该参数设置为 1 时,如果查询中含有 INTERVAL 子句或者该查询使用了TSMA时, 且相应的组或窗口内数据为空或者NULL, 对应的组或窗口将不返回查询结果. 注意此参数客户端和服务端值应保持一致. |
|
||||
|
||||
### multiResultFunctionStarReturnTags
|
||||
|
||||
|
|
|
@ -250,7 +250,7 @@ int32_t blockDataTrimFirstRows(SSDataBlock* pBlock, size_t n);
|
|||
int32_t blockDataKeepFirstNRows(SSDataBlock* pBlock, size_t n);
|
||||
|
||||
int32_t assignOneDataBlock(SSDataBlock* dst, const SSDataBlock* src);
|
||||
int32_t copyDataBlock(SSDataBlock* dst, const SSDataBlock* src);
|
||||
int32_t copyDataBlock(SSDataBlock* pDst, const SSDataBlock* pSrc);
|
||||
|
||||
SSDataBlock* createDataBlock();
|
||||
void* blockDataDestroy(SSDataBlock* pBlock);
|
||||
|
|
|
@ -21,6 +21,8 @@
|
|||
|
||||
#define MALLOC_ALIGN_BYTES 32
|
||||
|
||||
static void copyPkVal(SDataBlockInfo* pDst, const SDataBlockInfo* pSrc);
|
||||
|
||||
int32_t colDataGetLength(const SColumnInfoData* pColumnInfoData, int32_t numOfRows) {
|
||||
if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
|
||||
if (pColumnInfoData->reassigned) {
|
||||
|
@ -1563,24 +1565,28 @@ int32_t assignOneDataBlock(SSDataBlock* dst, const SSDataBlock* src) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t copyDataBlock(SSDataBlock* dst, const SSDataBlock* src) {
|
||||
blockDataCleanup(dst);
|
||||
int32_t code = blockDataEnsureCapacity(dst, src->info.rows);
|
||||
int32_t copyDataBlock(SSDataBlock* pDst, const SSDataBlock* pSrc) {
|
||||
blockDataCleanup(pDst);
|
||||
|
||||
int32_t code = blockDataEnsureCapacity(pDst, pSrc->info.rows);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
terrno = code;
|
||||
return code;
|
||||
}
|
||||
|
||||
size_t numOfCols = taosArrayGetSize(src->pDataBlock);
|
||||
size_t numOfCols = taosArrayGetSize(pSrc->pDataBlock);
|
||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||
SColumnInfoData* pDst = taosArrayGet(dst->pDataBlock, i);
|
||||
SColumnInfoData* pSrc = taosArrayGet(src->pDataBlock, i);
|
||||
colDataAssign(pDst, pSrc, src->info.rows, &src->info);
|
||||
SColumnInfoData* pDstCol = taosArrayGet(pDst->pDataBlock, i);
|
||||
SColumnInfoData* pSrcCol = taosArrayGet(pSrc->pDataBlock, i);
|
||||
colDataAssign(pDstCol, pSrcCol, pSrc->info.rows, &pSrc->info);
|
||||
}
|
||||
|
||||
uint32_t cap = dst->info.capacity;
|
||||
dst->info = src->info;
|
||||
dst->info.capacity = cap;
|
||||
uint32_t cap = pDst->info.capacity;
|
||||
|
||||
pDst->info = pSrc->info;
|
||||
copyPkVal(&pDst->info, &pSrc->info);
|
||||
|
||||
pDst->info.capacity = cap;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -1663,62 +1669,68 @@ SSDataBlock* blockCopyOneRow(const SSDataBlock* pDataBlock, int32_t rowIdx) {
|
|||
return pBlock;
|
||||
}
|
||||
|
||||
void copyPkVal(SDataBlockInfo* pDst, const SDataBlockInfo* pSrc) {
|
||||
if (!IS_VAR_DATA_TYPE(pSrc->pks[0].type)) {
|
||||
return;
|
||||
}
|
||||
|
||||
// prepare the pk buffer if needed
|
||||
SValue* p = &pDst->pks[0];
|
||||
|
||||
p->type = pDst->pks[0].type;
|
||||
p->pData = taosMemoryCalloc(1, pDst->pks[0].nData);
|
||||
p->nData = pDst->pks[0].nData;
|
||||
memcpy(p->pData, pDst->pks[0].pData, p->nData);
|
||||
|
||||
p = &pDst->pks[1];
|
||||
p->type = pDst->pks[1].type;
|
||||
p->pData = taosMemoryCalloc(1, pDst->pks[1].nData);
|
||||
p->nData = pDst->pks[1].nData;
|
||||
memcpy(p->pData, pDst->pks[1].pData, p->nData);
|
||||
}
|
||||
|
||||
SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData) {
|
||||
if (pDataBlock == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
SSDataBlock* pBlock = createDataBlock();
|
||||
pBlock->info = pDataBlock->info;
|
||||
SSDataBlock* pDstBlock = createDataBlock();
|
||||
pDstBlock->info = pDataBlock->info;
|
||||
|
||||
pBlock->info.rows = 0;
|
||||
pBlock->info.capacity = 0;
|
||||
pBlock->info.rowSize = 0;
|
||||
pBlock->info.id = pDataBlock->info.id;
|
||||
pBlock->info.blankFill = pDataBlock->info.blankFill;
|
||||
pDstBlock->info.rows = 0;
|
||||
pDstBlock->info.capacity = 0;
|
||||
pDstBlock->info.rowSize = 0;
|
||||
pDstBlock->info.id = pDataBlock->info.id;
|
||||
pDstBlock->info.blankFill = pDataBlock->info.blankFill;
|
||||
|
||||
size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
|
||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||
SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i);
|
||||
SColumnInfoData colInfo = {.hasNull = true, .info = p->info};
|
||||
blockDataAppendColInfo(pBlock, &colInfo);
|
||||
blockDataAppendColInfo(pDstBlock, &colInfo);
|
||||
}
|
||||
|
||||
// prepare the pk buffer if necessary
|
||||
if (IS_VAR_DATA_TYPE(pDataBlock->info.pks[0].type)) {
|
||||
SValue* pVal = &pBlock->info.pks[0];
|
||||
|
||||
pVal->type = pDataBlock->info.pks[0].type;
|
||||
pVal->pData = taosMemoryCalloc(1, pDataBlock->info.pks[0].nData);
|
||||
pVal->nData = pDataBlock->info.pks[0].nData;
|
||||
memcpy(pVal->pData, pDataBlock->info.pks[0].pData, pVal->nData);
|
||||
|
||||
SValue* p = &pBlock->info.pks[1];
|
||||
p->type = pDataBlock->info.pks[1].type;
|
||||
p->pData = taosMemoryCalloc(1, pDataBlock->info.pks[1].nData);
|
||||
p->nData = pDataBlock->info.pks[1].nData;
|
||||
memcpy(p->pData, pDataBlock->info.pks[1].pData, p->nData);
|
||||
}
|
||||
copyPkVal(&pDstBlock->info, &pDataBlock->info);
|
||||
|
||||
if (copyData) {
|
||||
int32_t code = blockDataEnsureCapacity(pBlock, pDataBlock->info.rows);
|
||||
int32_t code = blockDataEnsureCapacity(pDstBlock, pDataBlock->info.rows);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
terrno = code;
|
||||
blockDataDestroy(pBlock);
|
||||
blockDataDestroy(pDstBlock);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||
SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, i);
|
||||
SColumnInfoData* pDst = taosArrayGet(pDstBlock->pDataBlock, i);
|
||||
SColumnInfoData* pSrc = taosArrayGet(pDataBlock->pDataBlock, i);
|
||||
colDataAssign(pDst, pSrc, pDataBlock->info.rows, &pDataBlock->info);
|
||||
}
|
||||
|
||||
pBlock->info.rows = pDataBlock->info.rows;
|
||||
pBlock->info.capacity = pDataBlock->info.rows;
|
||||
pDstBlock->info.rows = pDataBlock->info.rows;
|
||||
pDstBlock->info.capacity = pDataBlock->info.rows;
|
||||
}
|
||||
|
||||
return pBlock;
|
||||
return pDstBlock;
|
||||
}
|
||||
|
||||
SSDataBlock* createDataBlock() {
|
||||
|
|
|
@ -66,7 +66,7 @@ static int32_t mndSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
|
|||
}
|
||||
|
||||
static int32_t mndSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) {
|
||||
int32_t code = tmsgSendReq(pEpSet, pMsg);
|
||||
int32_t code = tmsgSendSyncReq(pEpSet, pMsg);
|
||||
if (code != 0) {
|
||||
rpcFreeCont(pMsg->pCont);
|
||||
pMsg->pCont = NULL;
|
||||
|
|
|
@ -136,6 +136,7 @@ int32_t tqMetaGetHandle(STQ* pTq, const char* key);
|
|||
int32_t tqCreateHandle(STQ* pTq, SMqRebVgReq* req, STqHandle* handle);
|
||||
|
||||
STqOffsetStore* tqOffsetOpen(STQ* pTq);
|
||||
int32_t tqMetaTransform(STQ* pTq);
|
||||
void tqOffsetClose(STqOffsetStore*);
|
||||
STqOffset* tqOffsetRead(STqOffsetStore* pStore, const char* subscribeKey);
|
||||
int32_t tqOffsetWrite(STqOffsetStore* pStore, const STqOffset* pOffset);
|
||||
|
|
|
@ -86,15 +86,6 @@ STQ* tqOpen(const char* path, SVnode* pVnode) {
|
|||
}
|
||||
|
||||
int32_t tqInitialize(STQ* pTq) {
|
||||
if (tqMetaOpen(pTq) < 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
pTq->pOffsetStore = tqOffsetOpen(pTq);
|
||||
if (pTq->pOffsetStore == NULL) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
int32_t vgId = TD_VID(pTq->pVnode);
|
||||
pTq->pStreamMeta = streamMetaOpen(pTq->path, pTq, (FTaskExpand*)tqExpandTask, vgId, -1, tqStartTaskCompleteCallback);
|
||||
if (pTq->pStreamMeta == NULL) {
|
||||
|
@ -102,6 +93,19 @@ int32_t tqInitialize(STQ* pTq) {
|
|||
}
|
||||
|
||||
/*int32_t code = */streamMetaLoadAllTasks(pTq->pStreamMeta);
|
||||
|
||||
if (tqMetaTransform(pTq) < 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (tqMetaRestoreCheckInfo(pTq) < 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
pTq->pOffsetStore = tqOffsetOpen(pTq);
|
||||
if (pTq->pOffsetStore == NULL) {
|
||||
return -1;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
|
@ -88,10 +88,6 @@ int32_t tqMetaOpen(STQ* pTq) {
|
|||
return -1;
|
||||
}
|
||||
|
||||
if (tqMetaRestoreCheckInfo(pTq) < 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -378,6 +374,146 @@ int32_t tqCreateHandle(STQ* pTq, SMqRebVgReq* req, STqHandle* handle){
|
|||
return taosHashPut(pTq->pHandle, handle->subKey, strlen(handle->subKey), handle, sizeof(STqHandle));
|
||||
}
|
||||
|
||||
static int32_t tqMetaTransformInfo(TDB* pMetaDB, TTB* pExecStoreOld, TTB* pExecStoreNew){
|
||||
TBC* pCur = NULL;
|
||||
if (tdbTbcOpen(pExecStoreOld, &pCur, NULL) < 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
TXN* txn;
|
||||
if (tdbBegin(pMetaDB, &txn, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
void* pKey = NULL;
|
||||
int kLen = 0;
|
||||
void* pVal = NULL;
|
||||
int vLen = 0;
|
||||
|
||||
tdbTbcMoveToFirst(pCur);
|
||||
while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) {
|
||||
if (tdbTbUpsert(pExecStoreNew, pKey, kLen, pVal, vLen, txn) < 0) {
|
||||
tqError("transform sub info error");
|
||||
tdbFree(pKey);
|
||||
tdbFree(pVal);
|
||||
tdbTbcClose(pCur);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
tdbFree(pKey);
|
||||
tdbFree(pVal);
|
||||
tdbTbcClose(pCur);
|
||||
|
||||
if (tdbCommit(pMetaDB, txn) < 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (tdbPostCommit(pMetaDB, txn) < 0) {
|
||||
return -1;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t tqMetaTransform(STQ* pTq) {
|
||||
int32_t len = strlen(pTq->path) + 64;
|
||||
char* maindb = taosMemoryCalloc(1, len);
|
||||
sprintf(maindb, "%s%s%s", pTq->path, TD_DIRSEP, TDB_MAINDB_NAME);
|
||||
|
||||
if(!taosCheckExistFile(maindb)){
|
||||
taosMemoryFree(maindb);
|
||||
char* tpath = taosMemoryCalloc(1, len);
|
||||
if(tpath == NULL){
|
||||
return -1;
|
||||
}
|
||||
sprintf(tpath, "%s%s%s", pTq->path, TD_DIRSEP, "subscribe");
|
||||
taosMemoryFree(pTq->path);
|
||||
pTq->path = tpath;
|
||||
return tqMetaOpen(pTq);
|
||||
}
|
||||
|
||||
int32_t code = 0;
|
||||
TDB* pMetaDB = NULL;
|
||||
TTB* pExecStore = NULL;
|
||||
TTB* pCheckStore = NULL;
|
||||
char* offsetNew = NULL;
|
||||
char* offset = tqOffsetBuildFName(pTq->path, 0);
|
||||
if(offset == NULL){
|
||||
code = -1;
|
||||
goto END;
|
||||
}
|
||||
|
||||
|
||||
if (tdbOpen(pTq->path, 16 * 1024, 1, &pMetaDB, 0, 0, NULL) < 0) {
|
||||
code = -1;
|
||||
goto END;
|
||||
}
|
||||
|
||||
if (tdbTbOpen("tq.db", -1, -1, NULL, pMetaDB, &pExecStore, 0) < 0) {
|
||||
code = -1;
|
||||
goto END;
|
||||
}
|
||||
|
||||
if (tdbTbOpen("tq.check.db", -1, -1, NULL, pMetaDB, &pCheckStore, 0) < 0) {
|
||||
code = -1;
|
||||
goto END;
|
||||
}
|
||||
|
||||
char* tpath = taosMemoryCalloc(1, len);
|
||||
if(tpath == NULL){
|
||||
code = -1;
|
||||
goto END;
|
||||
}
|
||||
sprintf(tpath, "%s%s%s", pTq->path, TD_DIRSEP, "subscribe");
|
||||
taosMemoryFree(pTq->path);
|
||||
pTq->path = tpath;
|
||||
if (tqMetaOpen(pTq) < 0) {
|
||||
code = -1;
|
||||
goto END;
|
||||
}
|
||||
|
||||
if( tqMetaTransformInfo(pTq->pMetaDB, pExecStore, pTq->pExecStore) < 0){
|
||||
code = -1;
|
||||
goto END;
|
||||
}
|
||||
|
||||
if(tqMetaTransformInfo(pTq->pMetaDB, pCheckStore, pTq->pCheckStore) < 0){
|
||||
code = -1;
|
||||
goto END;
|
||||
}
|
||||
|
||||
tdbTbClose(pExecStore);
|
||||
pExecStore = NULL;
|
||||
tdbTbClose(pCheckStore);
|
||||
pCheckStore = NULL;
|
||||
tdbClose(pMetaDB);
|
||||
pMetaDB = NULL;
|
||||
|
||||
offsetNew = tqOffsetBuildFName(pTq->path, 0);
|
||||
if(offsetNew == NULL){
|
||||
code = -1;
|
||||
goto END;
|
||||
}
|
||||
if(taosCheckExistFile(offset) && taosCopyFile(offset, offsetNew) < 0){
|
||||
tqError("copy offset file error");
|
||||
code = -1;
|
||||
goto END;
|
||||
}
|
||||
|
||||
taosRemoveFile(maindb);
|
||||
taosRemoveFile(offset);
|
||||
|
||||
END:
|
||||
taosMemoryFree(maindb);
|
||||
taosMemoryFree(offset);
|
||||
taosMemoryFree(offsetNew);
|
||||
|
||||
tdbTbClose(pExecStore);
|
||||
tdbTbClose(pCheckStore);
|
||||
tdbClose(pMetaDB);
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
//int32_t tqMetaRestoreHandle(STQ* pTq) {
|
||||
// int code = 0;
|
||||
// TBC* pCur = NULL;
|
||||
|
|
|
@ -25,6 +25,10 @@ struct STqOffsetStore {
|
|||
char* tqOffsetBuildFName(const char* path, int32_t fVer) {
|
||||
int32_t len = strlen(path);
|
||||
char* fname = taosMemoryCalloc(1, len + 40);
|
||||
if(fname == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
}
|
||||
snprintf(fname, len + 40, "%s/offset-ver%d", path, fVer);
|
||||
return fname;
|
||||
}
|
||||
|
|
|
@ -6041,7 +6041,7 @@ static EDealRes tsmaOptTagCheck(SNode* pNode, void* pContext) {
|
|||
STsmaOptTagCheckCtx* pCtx = pContext;
|
||||
for (int32_t i = 0; i < pCtx->pTsma->pTags->size; ++i) {
|
||||
SSchema* pSchema = taosArrayGet(pCtx->pTsma->pTags, i);
|
||||
if (pSchema->colId == pCol->colId) {
|
||||
if (strcmp(pSchema->name, pCol->colName) == 0) {
|
||||
found = true;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -25,6 +25,7 @@ typedef struct {
|
|||
|
||||
SStreamTask* pTask;
|
||||
int64_t dbRefId;
|
||||
void* pMeta;
|
||||
} SAsyncUploadArg;
|
||||
|
||||
static int32_t downloadCheckpointDataByName(const char* id, const char* fname, const char* dstName);
|
||||
|
@ -437,7 +438,7 @@ int32_t uploadCheckpointData(void* param) {
|
|||
return -1;
|
||||
}
|
||||
|
||||
if ((code = taskDbGenChkpUploadData(arg->pTask->pBackend, arg->pTask->pMeta->bkdChkptMgt, arg->chkpId,
|
||||
if ((code = taskDbGenChkpUploadData(arg->pTask->pBackend, ((SStreamMeta*)arg->pMeta)->bkdChkptMgt, arg->chkpId,
|
||||
(int8_t)(arg->type), &path, toDelFiles)) != 0) {
|
||||
stError("s-task:%s failed to gen upload checkpoint:%" PRId64 "", taskStr, arg->chkpId);
|
||||
}
|
||||
|
@ -489,6 +490,7 @@ int32_t streamTaskRemoteBackupCheckpoint(SStreamTask* pTask, int64_t chkpId, cha
|
|||
arg->chkpId = chkpId;
|
||||
arg->pTask = pTask;
|
||||
arg->dbRefId = taskGetDBRef(pTask->pBackend);
|
||||
arg->pMeta = pTask->pMeta;
|
||||
|
||||
return streamMetaAsyncExec(pTask->pMeta, uploadCheckpointData, arg, NULL);
|
||||
}
|
||||
|
|
|
@ -2475,7 +2475,12 @@ int tdbBtcMoveTo(SBTC *pBtc, const void *pKey, int kLen, int *pCRst) {
|
|||
}
|
||||
|
||||
int tdbBtcClose(SBTC *pBtc) {
|
||||
if (pBtc->iPage < 0) return 0;
|
||||
if (pBtc->iPage < 0) {
|
||||
if (pBtc->freeTxn) {
|
||||
tdbTxnClose(pBtc->pTxn);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
for (;;) {
|
||||
if (NULL == pBtc->pPage) {
|
||||
|
|
|
@ -257,7 +257,18 @@ class TDTestCase:
|
|||
os.system(f'taos -f {sql_file}')
|
||||
tdSql.query('select count(c_1) from d2.t2 where c_1 < 10', queryTimes=1)
|
||||
tdSql.checkData(0, 0, 0)
|
||||
tdSql.execute('drop database d2')
|
||||
tdSql.query('select count(c_1), min(c_1),tbname from d2.can partition by tbname', queryTimes=1)
|
||||
tdSql.checkData(0, 0, 0)
|
||||
tdSql.checkData(0, 1, None)
|
||||
tdSql.checkData(0, 2, 't3')
|
||||
|
||||
tdSql.checkData(1, 0, 15)
|
||||
tdSql.checkData(1, 1, 1471617148940980000)
|
||||
tdSql.checkData(1, 2, 't2')
|
||||
|
||||
tdSql.checkData(2, 0, 0)
|
||||
tdSql.checkData(2, 1, None)
|
||||
tdSql.checkData(2, 2, 't1')
|
||||
|
||||
def run(self):
|
||||
self.test_count_with_sma_data()
|
||||
|
|
|
@ -1064,6 +1064,12 @@ class TDTestCase:
|
|||
.should_query_with_tsma('tsma2', '2018-09-17 09:30:00', '2018-09-17 09:59:59.999')
|
||||
.should_query_with_table('meters', '2018-09-17 10:00:00.000', '2018-09-17 10:23:19.664').get_qc())
|
||||
|
||||
sql = f"SELECT avg(c1), avg(c2) FROM {db_name}.meters WHERE ts >= '2018-09-17 09:00:00.009' AND ts < '2018-09-17 10:23:19.665' PARTITION BY t5 INTERVAL(30m)"
|
||||
ctxs.append(TSMAQCBuilder().with_sql(sql)
|
||||
.should_query_with_table('meters', '2018-09-17 09:00:00.009', '2018-09-17 09:29:59.999')
|
||||
.should_query_with_tsma('tsma2', '2018-09-17 09:30:00', '2018-09-17 09:59:59.999')
|
||||
.should_query_with_table('meters', '2018-09-17 10:00:00.000', '2018-09-17 10:23:19.664').get_qc())
|
||||
|
||||
sql = f"select avg(c1), avg(c2) from {db_name}.meters where ts >= '2018-09-17 09:00:00.009' and ts < '2018-09-17 10:23:19.665' interval(30m, 25m) SLIDING(10m)"
|
||||
ctxs.append(TSMAQCBuilder().with_sql(sql)
|
||||
.should_query_with_table('meters', '2018-09-17 09:00:00.009', '2018-09-17 09:04:59.999')
|
||||
|
|
Loading…
Reference in New Issue