Merge branch '3.0' of https://github.com/taosdata/TDengine into 3.0
This commit is contained in:
commit
60271605ff
|
@ -256,18 +256,7 @@ void tsdbDelFileName(STsdb *pTsdb, SDelFile *pFile, char fname[]);
|
||||||
// tsdbFS.c ==============================================================================================
|
// tsdbFS.c ==============================================================================================
|
||||||
int32_t tsdbFSOpen(STsdb *pTsdb, int8_t rollback);
|
int32_t tsdbFSOpen(STsdb *pTsdb, int8_t rollback);
|
||||||
int32_t tsdbFSClose(STsdb *pTsdb);
|
int32_t tsdbFSClose(STsdb *pTsdb);
|
||||||
int32_t tsdbFSCopy(STsdb *pTsdb, STsdbFS *pFS);
|
|
||||||
void tsdbFSDestroy(STsdbFS *pFS);
|
|
||||||
int32_t tDFileSetCmprFn(const void *p1, const void *p2);
|
|
||||||
int32_t tsdbFSCommit(STsdb *pTsdb);
|
|
||||||
int32_t tsdbFSRollback(STsdb *pTsdb);
|
|
||||||
int32_t tsdbFSPrepareCommit(STsdb *pTsdb, STsdbFS *pFS);
|
|
||||||
int32_t tsdbFSRef(STsdb *pTsdb, STsdbFS *pFS);
|
|
||||||
void tsdbFSUnref(STsdb *pTsdb, STsdbFS *pFS);
|
|
||||||
void tsdbGetCurrentFName(STsdb *pTsdb, char *current, char *current_t);
|
void tsdbGetCurrentFName(STsdb *pTsdb, char *current, char *current_t);
|
||||||
|
|
||||||
int32_t tsdbFSUpsertFSet(STsdbFS *pFS, SDFileSet *pSet);
|
|
||||||
int32_t tsdbFSUpsertDelFile(STsdbFS *pFS, SDelFile *pDelFile);
|
|
||||||
// tsdbReaderWriter.c ==============================================================================================
|
// tsdbReaderWriter.c ==============================================================================================
|
||||||
// SDataFWriter
|
// SDataFWriter
|
||||||
int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pSet);
|
int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pSet);
|
||||||
|
@ -737,7 +726,6 @@ struct STsdbReadSnap {
|
||||||
SMemTable *pIMem;
|
SMemTable *pIMem;
|
||||||
SQueryNode *pINode;
|
SQueryNode *pINode;
|
||||||
TFileSetArray *pfSetArray;
|
TFileSetArray *pfSetArray;
|
||||||
STsdbFS fs;
|
|
||||||
};
|
};
|
||||||
|
|
||||||
struct SDataFWriter {
|
struct SDataFWriter {
|
||||||
|
|
|
@ -216,8 +216,6 @@ int32_t tsdbBegin(STsdb* pTsdb);
|
||||||
int32_t tsdbCacheCommit(STsdb* pTsdb);
|
int32_t tsdbCacheCommit(STsdb* pTsdb);
|
||||||
int32_t tsdbCompact(STsdb* pTsdb, SCompactInfo* pInfo);
|
int32_t tsdbCompact(STsdb* pTsdb, SCompactInfo* pInfo);
|
||||||
int32_t tsdbRetention(STsdb* tsdb, int64_t now, int32_t sync);
|
int32_t tsdbRetention(STsdb* tsdb, int64_t now, int32_t sync);
|
||||||
// int32_t tsdbFinishCommit(STsdb* pTsdb);
|
|
||||||
// int32_t tsdbRollbackCommit(STsdb* pTsdb);
|
|
||||||
int tsdbScanAndConvertSubmitMsg(STsdb* pTsdb, SSubmitReq2* pMsg);
|
int tsdbScanAndConvertSubmitMsg(STsdb* pTsdb, SSubmitReq2* pMsg);
|
||||||
int tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq2* pMsg, SSubmitRsp2* pRsp);
|
int tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq2* pMsg, SSubmitRsp2* pRsp);
|
||||||
int32_t tsdbInsertTableData(STsdb* pTsdb, int64_t version, SSubmitTbData* pSubmitTbData, int32_t* affectedRows);
|
int32_t tsdbInsertTableData(STsdb* pTsdb, int64_t version, SSubmitTbData* pSubmitTbData, int32_t* affectedRows);
|
||||||
|
|
|
@ -1292,10 +1292,9 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
|
||||||
|
|
||||||
// check if the checkpoint msg already sent or not.
|
// check if the checkpoint msg already sent or not.
|
||||||
if (status == TASK_STATUS__CK) {
|
if (status == TASK_STATUS__CK) {
|
||||||
ASSERT(pTask->chkInfo.checkpointingId == req.checkpointId);
|
|
||||||
tqWarn("s-task:%s recv checkpoint-source msg again checkpointId:%" PRId64
|
tqWarn("s-task:%s recv checkpoint-source msg again checkpointId:%" PRId64
|
||||||
" already received, ignore this msg and continue process checkpoint",
|
" transId:%d already received, ignore this msg and continue process checkpoint",
|
||||||
pTask->id.idStr, pTask->chkInfo.checkpointingId);
|
pTask->id.idStr, pTask->chkInfo.checkpointingId, req.transId);
|
||||||
|
|
||||||
taosThreadMutexUnlock(&pTask->lock);
|
taosThreadMutexUnlock(&pTask->lock);
|
||||||
streamMetaReleaseTask(pMeta, pTask);
|
streamMetaReleaseTask(pMeta, pTask);
|
||||||
|
|
|
@ -1183,8 +1183,10 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE
|
||||||
LRUHandle *h = taosLRUCacheLookup(pTsdb->lruCache, keys_list[i], klen);
|
LRUHandle *h = taosLRUCacheLookup(pTsdb->lruCache, keys_list[i], klen);
|
||||||
if (h) {
|
if (h) {
|
||||||
SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pTsdb->lruCache, h);
|
SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pTsdb->lruCache, h);
|
||||||
if (pLastCol->dirty && (pLastCol->ts <= eKey && pLastCol->ts >= sKey)) {
|
if (pLastCol->dirty) {
|
||||||
pLastCol->dirty = 0;
|
pLastCol->dirty = 0;
|
||||||
|
}
|
||||||
|
if (pLastCol->ts <= eKey && pLastCol->ts >= sKey) {
|
||||||
erase = true;
|
erase = true;
|
||||||
}
|
}
|
||||||
taosLRUCacheRelease(pTsdb->lruCache, h, erase);
|
taosLRUCacheRelease(pTsdb->lruCache, h, erase);
|
||||||
|
@ -1197,8 +1199,10 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE
|
||||||
h = taosLRUCacheLookup(pTsdb->lruCache, keys_list[num_keys + i], klen);
|
h = taosLRUCacheLookup(pTsdb->lruCache, keys_list[num_keys + i], klen);
|
||||||
if (h) {
|
if (h) {
|
||||||
SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pTsdb->lruCache, h);
|
SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pTsdb->lruCache, h);
|
||||||
if (pLastCol->dirty && (pLastCol->ts <= eKey && pLastCol->ts >= sKey)) {
|
if (pLastCol->dirty) {
|
||||||
pLastCol->dirty = 0;
|
pLastCol->dirty = 0;
|
||||||
|
}
|
||||||
|
if (pLastCol->ts <= eKey && pLastCol->ts >= sKey) {
|
||||||
erase = true;
|
erase = true;
|
||||||
}
|
}
|
||||||
taosLRUCacheRelease(pTsdb->lruCache, h, erase);
|
taosLRUCacheRelease(pTsdb->lruCache, h, erase);
|
||||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -702,7 +702,7 @@ _exit:
|
||||||
}
|
}
|
||||||
|
|
||||||
// EXPOSED APIS ====================================================================================
|
// EXPOSED APIS ====================================================================================
|
||||||
int32_t tsdbFSCommit(STsdb *pTsdb) {
|
static int32_t tsdbFSCommit(STsdb *pTsdb) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int32_t lino = 0;
|
int32_t lino = 0;
|
||||||
STsdbFS fs = {0};
|
STsdbFS fs = {0};
|
||||||
|
@ -738,7 +738,7 @@ _exit:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tsdbFSRollback(STsdb *pTsdb) {
|
static int32_t tsdbFSRollback(STsdb *pTsdb) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int32_t lino = 0;
|
int32_t lino = 0;
|
||||||
|
|
||||||
|
@ -833,312 +833,3 @@ int32_t tsdbFSClose(STsdb *pTsdb) {
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tsdbFSCopy(STsdb *pTsdb, STsdbFS *pFS) {
|
|
||||||
int32_t code = 0;
|
|
||||||
int32_t lino = 0;
|
|
||||||
|
|
||||||
pFS->pDelFile = NULL;
|
|
||||||
if (pFS->aDFileSet) {
|
|
||||||
taosArrayClear(pFS->aDFileSet);
|
|
||||||
} else {
|
|
||||||
pFS->aDFileSet = taosArrayInit(taosArrayGetSize(pTsdb->fs.aDFileSet), sizeof(SDFileSet));
|
|
||||||
if (pFS->aDFileSet == NULL) {
|
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pTsdb->fs.pDelFile) {
|
|
||||||
pFS->pDelFile = (SDelFile *)taosMemoryMalloc(sizeof(SDelFile));
|
|
||||||
if (pFS->pDelFile == NULL) {
|
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
}
|
|
||||||
|
|
||||||
*pFS->pDelFile = *pTsdb->fs.pDelFile;
|
|
||||||
}
|
|
||||||
|
|
||||||
for (int32_t iSet = 0; iSet < taosArrayGetSize(pTsdb->fs.aDFileSet); iSet++) {
|
|
||||||
SDFileSet *pSet = (SDFileSet *)taosArrayGet(pTsdb->fs.aDFileSet, iSet);
|
|
||||||
SDFileSet fSet = {.diskId = pSet->diskId, .fid = pSet->fid};
|
|
||||||
|
|
||||||
// head
|
|
||||||
fSet.pHeadF = (SHeadFile *)taosMemoryMalloc(sizeof(SHeadFile));
|
|
||||||
if (fSet.pHeadF == NULL) {
|
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
}
|
|
||||||
*fSet.pHeadF = *pSet->pHeadF;
|
|
||||||
|
|
||||||
// data
|
|
||||||
fSet.pDataF = (SDataFile *)taosMemoryMalloc(sizeof(SDataFile));
|
|
||||||
if (fSet.pDataF == NULL) {
|
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
}
|
|
||||||
*fSet.pDataF = *pSet->pDataF;
|
|
||||||
|
|
||||||
// sma
|
|
||||||
fSet.pSmaF = (SSmaFile *)taosMemoryMalloc(sizeof(SSmaFile));
|
|
||||||
if (fSet.pSmaF == NULL) {
|
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
}
|
|
||||||
*fSet.pSmaF = *pSet->pSmaF;
|
|
||||||
|
|
||||||
// stt
|
|
||||||
for (fSet.nSttF = 0; fSet.nSttF < pSet->nSttF; fSet.nSttF++) {
|
|
||||||
fSet.aSttF[fSet.nSttF] = (SSttFile *)taosMemoryMalloc(sizeof(SSttFile));
|
|
||||||
if (fSet.aSttF[fSet.nSttF] == NULL) {
|
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
}
|
|
||||||
*fSet.aSttF[fSet.nSttF] = *pSet->aSttF[fSet.nSttF];
|
|
||||||
}
|
|
||||||
|
|
||||||
if (taosArrayPush(pFS->aDFileSet, &fSet) == NULL) {
|
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
_exit:
|
|
||||||
if (code) {
|
|
||||||
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
|
|
||||||
}
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t tsdbFSUpsertDelFile(STsdbFS *pFS, SDelFile *pDelFile) {
|
|
||||||
int32_t code = 0;
|
|
||||||
|
|
||||||
if (pDelFile) {
|
|
||||||
if (pFS->pDelFile == NULL) {
|
|
||||||
pFS->pDelFile = (SDelFile *)taosMemoryMalloc(sizeof(SDelFile));
|
|
||||||
if (pFS->pDelFile == NULL) {
|
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
goto _exit;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
*pFS->pDelFile = *pDelFile;
|
|
||||||
} else {
|
|
||||||
if (pFS->pDelFile) {
|
|
||||||
taosMemoryFree(pFS->pDelFile);
|
|
||||||
pFS->pDelFile = NULL;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
_exit:
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t tsdbFSUpsertFSet(STsdbFS *pFS, SDFileSet *pSet) {
|
|
||||||
int32_t code = 0;
|
|
||||||
int32_t idx = taosArraySearchIdx(pFS->aDFileSet, pSet, tDFileSetCmprFn, TD_GE);
|
|
||||||
|
|
||||||
if (idx < 0) {
|
|
||||||
idx = taosArrayGetSize(pFS->aDFileSet);
|
|
||||||
} else {
|
|
||||||
SDFileSet *pDFileSet = (SDFileSet *)taosArrayGet(pFS->aDFileSet, idx);
|
|
||||||
int32_t c = tDFileSetCmprFn(pSet, pDFileSet);
|
|
||||||
if (c == 0) {
|
|
||||||
*pDFileSet->pHeadF = *pSet->pHeadF;
|
|
||||||
*pDFileSet->pDataF = *pSet->pDataF;
|
|
||||||
*pDFileSet->pSmaF = *pSet->pSmaF;
|
|
||||||
// stt
|
|
||||||
if (pSet->nSttF > pDFileSet->nSttF) {
|
|
||||||
ASSERT(pSet->nSttF == pDFileSet->nSttF + 1);
|
|
||||||
|
|
||||||
pDFileSet->aSttF[pDFileSet->nSttF] = (SSttFile *)taosMemoryMalloc(sizeof(SSttFile));
|
|
||||||
if (pDFileSet->aSttF[pDFileSet->nSttF] == NULL) {
|
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
goto _exit;
|
|
||||||
}
|
|
||||||
*pDFileSet->aSttF[pDFileSet->nSttF] = *pSet->aSttF[pSet->nSttF - 1];
|
|
||||||
pDFileSet->nSttF++;
|
|
||||||
} else if (pSet->nSttF < pDFileSet->nSttF) {
|
|
||||||
ASSERT(pSet->nSttF == 1);
|
|
||||||
for (int32_t iStt = 1; iStt < pDFileSet->nSttF; iStt++) {
|
|
||||||
taosMemoryFree(pDFileSet->aSttF[iStt]);
|
|
||||||
}
|
|
||||||
|
|
||||||
*pDFileSet->aSttF[0] = *pSet->aSttF[0];
|
|
||||||
pDFileSet->nSttF = 1;
|
|
||||||
} else {
|
|
||||||
for (int32_t iStt = 0; iStt < pSet->nSttF; iStt++) {
|
|
||||||
*pDFileSet->aSttF[iStt] = *pSet->aSttF[iStt];
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pDFileSet->diskId = pSet->diskId;
|
|
||||||
goto _exit;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
ASSERT(pSet->nSttF == 1);
|
|
||||||
SDFileSet fSet = {.diskId = pSet->diskId, .fid = pSet->fid, .nSttF = 1};
|
|
||||||
|
|
||||||
// head
|
|
||||||
fSet.pHeadF = (SHeadFile *)taosMemoryMalloc(sizeof(SHeadFile));
|
|
||||||
if (fSet.pHeadF == NULL) {
|
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
goto _exit;
|
|
||||||
}
|
|
||||||
*fSet.pHeadF = *pSet->pHeadF;
|
|
||||||
|
|
||||||
// data
|
|
||||||
fSet.pDataF = (SDataFile *)taosMemoryMalloc(sizeof(SDataFile));
|
|
||||||
if (fSet.pDataF == NULL) {
|
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
goto _exit;
|
|
||||||
}
|
|
||||||
*fSet.pDataF = *pSet->pDataF;
|
|
||||||
|
|
||||||
// sma
|
|
||||||
fSet.pSmaF = (SSmaFile *)taosMemoryMalloc(sizeof(SSmaFile));
|
|
||||||
if (fSet.pSmaF == NULL) {
|
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
goto _exit;
|
|
||||||
}
|
|
||||||
*fSet.pSmaF = *pSet->pSmaF;
|
|
||||||
|
|
||||||
// stt
|
|
||||||
fSet.aSttF[0] = (SSttFile *)taosMemoryMalloc(sizeof(SSttFile));
|
|
||||||
if (fSet.aSttF[0] == NULL) {
|
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
goto _exit;
|
|
||||||
}
|
|
||||||
*fSet.aSttF[0] = *pSet->aSttF[0];
|
|
||||||
|
|
||||||
if (taosArrayInsert(pFS->aDFileSet, idx, &fSet) == NULL) {
|
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
goto _exit;
|
|
||||||
}
|
|
||||||
|
|
||||||
_exit:
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t tsdbFSPrepareCommit(STsdb *pTsdb, STsdbFS *pFSNew) {
|
|
||||||
int32_t code = 0;
|
|
||||||
int32_t lino = 0;
|
|
||||||
char tfname[TSDB_FILENAME_LEN];
|
|
||||||
|
|
||||||
tsdbGetCurrentFName(pTsdb, NULL, tfname);
|
|
||||||
|
|
||||||
// gnrt CURRENT.t
|
|
||||||
code = tsdbSaveFSToFile(pFSNew, tfname);
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
|
|
||||||
_exit:
|
|
||||||
if (code) {
|
|
||||||
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
|
|
||||||
}
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t tsdbFSRef(STsdb *pTsdb, STsdbFS *pFS) {
|
|
||||||
int32_t code = 0;
|
|
||||||
int32_t nRef;
|
|
||||||
|
|
||||||
pFS->aDFileSet = taosArrayInit(taosArrayGetSize(pTsdb->fs.aDFileSet), sizeof(SDFileSet));
|
|
||||||
if (pFS->aDFileSet == NULL) {
|
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
goto _exit;
|
|
||||||
}
|
|
||||||
|
|
||||||
pFS->pDelFile = pTsdb->fs.pDelFile;
|
|
||||||
if (pFS->pDelFile) {
|
|
||||||
nRef = atomic_fetch_add_32(&pFS->pDelFile->nRef, 1);
|
|
||||||
ASSERT(nRef > 0);
|
|
||||||
}
|
|
||||||
|
|
||||||
SDFileSet fSet;
|
|
||||||
for (int32_t iSet = 0; iSet < taosArrayGetSize(pTsdb->fs.aDFileSet); iSet++) {
|
|
||||||
SDFileSet *pSet = (SDFileSet *)taosArrayGet(pTsdb->fs.aDFileSet, iSet);
|
|
||||||
fSet = *pSet;
|
|
||||||
|
|
||||||
nRef = atomic_fetch_add_32(&pSet->pHeadF->nRef, 1);
|
|
||||||
ASSERT(nRef > 0);
|
|
||||||
|
|
||||||
nRef = atomic_fetch_add_32(&pSet->pDataF->nRef, 1);
|
|
||||||
ASSERT(nRef > 0);
|
|
||||||
|
|
||||||
nRef = atomic_fetch_add_32(&pSet->pSmaF->nRef, 1);
|
|
||||||
ASSERT(nRef > 0);
|
|
||||||
|
|
||||||
for (int32_t iStt = 0; iStt < pSet->nSttF; iStt++) {
|
|
||||||
nRef = atomic_fetch_add_32(&pSet->aSttF[iStt]->nRef, 1);
|
|
||||||
ASSERT(nRef > 0);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (taosArrayPush(pFS->aDFileSet, &fSet) == NULL) {
|
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
goto _exit;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
_exit:
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
void tsdbFSUnref(STsdb *pTsdb, STsdbFS *pFS) {
|
|
||||||
int32_t nRef;
|
|
||||||
char fname[TSDB_FILENAME_LEN];
|
|
||||||
|
|
||||||
if (pFS->pDelFile) {
|
|
||||||
nRef = atomic_sub_fetch_32(&pFS->pDelFile->nRef, 1);
|
|
||||||
ASSERT(nRef >= 0);
|
|
||||||
if (nRef == 0) {
|
|
||||||
tsdbDelFileName(pTsdb, pFS->pDelFile, fname);
|
|
||||||
(void)taosRemoveFile(fname);
|
|
||||||
taosMemoryFree(pFS->pDelFile);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
for (int32_t iSet = 0; iSet < taosArrayGetSize(pFS->aDFileSet); iSet++) {
|
|
||||||
SDFileSet *pSet = (SDFileSet *)taosArrayGet(pFS->aDFileSet, iSet);
|
|
||||||
|
|
||||||
// head
|
|
||||||
nRef = atomic_sub_fetch_32(&pSet->pHeadF->nRef, 1);
|
|
||||||
ASSERT(nRef >= 0);
|
|
||||||
if (nRef == 0) {
|
|
||||||
tsdbHeadFileName(pTsdb, pSet->diskId, pSet->fid, pSet->pHeadF, fname);
|
|
||||||
(void)taosRemoveFile(fname);
|
|
||||||
taosMemoryFree(pSet->pHeadF);
|
|
||||||
}
|
|
||||||
|
|
||||||
// data
|
|
||||||
nRef = atomic_sub_fetch_32(&pSet->pDataF->nRef, 1);
|
|
||||||
ASSERT(nRef >= 0);
|
|
||||||
if (nRef == 0) {
|
|
||||||
tsdbDataFileName(pTsdb, pSet->diskId, pSet->fid, pSet->pDataF, fname);
|
|
||||||
(void)taosRemoveFile(fname);
|
|
||||||
taosMemoryFree(pSet->pDataF);
|
|
||||||
}
|
|
||||||
|
|
||||||
// sma
|
|
||||||
nRef = atomic_sub_fetch_32(&pSet->pSmaF->nRef, 1);
|
|
||||||
ASSERT(nRef >= 0);
|
|
||||||
if (nRef == 0) {
|
|
||||||
tsdbSmaFileName(pTsdb, pSet->diskId, pSet->fid, pSet->pSmaF, fname);
|
|
||||||
(void)taosRemoveFile(fname);
|
|
||||||
taosMemoryFree(pSet->pSmaF);
|
|
||||||
}
|
|
||||||
|
|
||||||
// stt
|
|
||||||
for (int32_t iStt = 0; iStt < pSet->nSttF; iStt++) {
|
|
||||||
nRef = atomic_sub_fetch_32(&pSet->aSttF[iStt]->nRef, 1);
|
|
||||||
ASSERT(nRef >= 0);
|
|
||||||
if (nRef == 0) {
|
|
||||||
tsdbSttFileName(pTsdb, pSet->diskId, pSet->fid, pSet->aSttF[iStt], fname);
|
|
||||||
(void)taosRemoveFile(fname);
|
|
||||||
taosMemoryFree(pSet->aSttF[iStt]);
|
|
||||||
/* code */
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
taosArrayDestroy(pFS->aDFileSet);
|
|
||||||
}
|
|
||||||
|
|
|
@ -1325,8 +1325,8 @@ static int32_t buildDataBlockFromBuf(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
|
|
||||||
tsdbDebug("%p build data block from cache completed, elapsed time:%.2f ms, numOfRows:%" PRId64 ", brange:%" PRId64
|
tsdbDebug("%p build data block from cache completed, elapsed time:%.2f ms, numOfRows:%" PRId64 ", brange:%" PRId64
|
||||||
" - %" PRId64 ", uid:%" PRIu64 ", %s",
|
" - %" PRId64 ", uid:%" PRIu64 ", %s",
|
||||||
pReader, el, pBlock->info.rows, pBlock->info.window.skey, pBlock->info.window.ekey,
|
pReader, el, pBlock->info.rows, pBlock->info.window.skey, pBlock->info.window.ekey, pBlockScanInfo->uid,
|
||||||
pBlockScanInfo->uid, pReader->idStr);
|
pReader->idStr);
|
||||||
|
|
||||||
pReader->cost.buildmemBlock += el;
|
pReader->cost.buildmemBlock += el;
|
||||||
return code;
|
return code;
|
||||||
|
@ -1390,13 +1390,9 @@ static bool nextRowFromSttBlocks(SSttBlockReader* pSttBlockReader, STableBlockSc
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static void doPinSttBlock(SSttBlockReader* pSttBlockReader) {
|
static void doPinSttBlock(SSttBlockReader* pSttBlockReader) { tMergeTreePinSttBlock(&pSttBlockReader->mergeTree); }
|
||||||
tMergeTreePinSttBlock(&pSttBlockReader->mergeTree);
|
|
||||||
}
|
|
||||||
|
|
||||||
static void doUnpinSttBlock(SSttBlockReader* pSttBlockReader) {
|
static void doUnpinSttBlock(SSttBlockReader* pSttBlockReader) { tMergeTreeUnpinSttBlock(&pSttBlockReader->mergeTree); }
|
||||||
tMergeTreeUnpinSttBlock(&pSttBlockReader->mergeTree);
|
|
||||||
}
|
|
||||||
|
|
||||||
static bool tryCopyDistinctRowFromSttBlock(TSDBROW* fRow, SSttBlockReader* pSttBlockReader,
|
static bool tryCopyDistinctRowFromSttBlock(TSDBROW* fRow, SSttBlockReader* pSttBlockReader,
|
||||||
STableBlockScanInfo* pScanInfo, int64_t ts, STsdbReader* pReader,
|
STableBlockScanInfo* pScanInfo, int64_t ts, STsdbReader* pReader,
|
||||||
|
@ -1535,8 +1531,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->info.verRange,
|
doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->info.verRange, pReader->idStr);
|
||||||
pReader->idStr);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (minKey == k.ts) {
|
if (minKey == k.ts) {
|
||||||
|
@ -1585,8 +1580,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->info.verRange,
|
doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->info.verRange, pReader->idStr);
|
||||||
pReader->idStr);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (minKey == key) {
|
if (minKey == key) {
|
||||||
|
@ -1889,8 +1883,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->info.verRange,
|
doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->info.verRange, pReader->idStr);
|
||||||
pReader->idStr);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (minKey == ik.ts) {
|
if (minKey == ik.ts) {
|
||||||
|
@ -1948,8 +1941,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->info.verRange,
|
doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->info.verRange, pReader->idStr);
|
||||||
pReader->idStr);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (minKey == key) {
|
if (minKey == key) {
|
||||||
|
@ -2150,7 +2142,8 @@ static bool initSttBlockReader(SSttBlockReader* pSttBlockReader, STableBlockScan
|
||||||
}
|
}
|
||||||
|
|
||||||
pScanInfo->sttKeyInfo.status = taosArrayGetSize(info.pTimeWindowList) ? STT_FILE_HAS_DATA : STT_FILE_NO_DATA;
|
pScanInfo->sttKeyInfo.status = taosArrayGetSize(info.pTimeWindowList) ? STT_FILE_HAS_DATA : STT_FILE_NO_DATA;
|
||||||
pScanInfo->sttKeyInfo.nextProcKey = ASCENDING_TRAVERSE(pReader->info.order)? pScanInfo->sttWindow.skey:pScanInfo->sttWindow.ekey;
|
pScanInfo->sttKeyInfo.nextProcKey =
|
||||||
|
ASCENDING_TRAVERSE(pReader->info.order) ? pScanInfo->sttWindow.skey : pScanInfo->sttWindow.ekey;
|
||||||
hasData = true;
|
hasData = true;
|
||||||
} else {
|
} else {
|
||||||
hasData = nextRowFromSttBlocks(pSttBlockReader, pScanInfo, &pReader->info.verRange);
|
hasData = nextRowFromSttBlocks(pSttBlockReader, pScanInfo, &pReader->info.verRange);
|
||||||
|
@ -2168,9 +2161,7 @@ static bool initSttBlockReader(SSttBlockReader* pSttBlockReader, STableBlockScan
|
||||||
return hasData;
|
return hasData;
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool hasDataInSttBlock(STableBlockScanInfo *pInfo) {
|
static bool hasDataInSttBlock(STableBlockScanInfo* pInfo) { return pInfo->sttKeyInfo.status == STT_FILE_HAS_DATA; }
|
||||||
return pInfo->sttKeyInfo.status == STT_FILE_HAS_DATA;
|
|
||||||
}
|
|
||||||
|
|
||||||
bool hasDataInFileBlock(const SBlockData* pBlockData, const SFileBlockDumpInfo* pDumpInfo) {
|
bool hasDataInFileBlock(const SBlockData* pBlockData, const SFileBlockDumpInfo* pDumpInfo) {
|
||||||
if ((pBlockData->nRow > 0) && (pBlockData->nRow != pDumpInfo->totalRows)) {
|
if ((pBlockData->nRow > 0) && (pBlockData->nRow != pDumpInfo->totalRows)) {
|
||||||
|
@ -2225,7 +2216,8 @@ int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBloc
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t mergeRowsInSttBlocks(SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader) {
|
int32_t mergeRowsInSttBlocks(SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pBlockScanInfo,
|
||||||
|
STsdbReader* pReader) {
|
||||||
bool copied = false;
|
bool copied = false;
|
||||||
SRow* pTSRow = NULL;
|
SRow* pTSRow = NULL;
|
||||||
int64_t tsLastBlock = getCurrentKeyInSttBlock(pSttBlockReader);
|
int64_t tsLastBlock = getCurrentKeyInSttBlock(pSttBlockReader);
|
||||||
|
@ -2923,7 +2915,8 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
|
||||||
static int32_t buildBlockFromBufferSeqForPreFileset(STsdbReader* pReader, int64_t endKey) {
|
static int32_t buildBlockFromBufferSeqForPreFileset(STsdbReader* pReader, int64_t endKey) {
|
||||||
SReaderStatus* pStatus = &pReader->status;
|
SReaderStatus* pStatus = &pReader->status;
|
||||||
|
|
||||||
tsdbDebug("seq load data blocks from cache that preceeds fileset %d, %s", pReader->status.pCurrentFileset->fid, pReader->idStr);
|
tsdbDebug("seq load data blocks from cache that preceeds fileset %d, %s", pReader->status.pCurrentFileset->fid,
|
||||||
|
pReader->idStr);
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
if (pReader->code != TSDB_CODE_SUCCESS) {
|
if (pReader->code != TSDB_CODE_SUCCESS) {
|
||||||
|
@ -3847,7 +3840,6 @@ int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t e
|
||||||
pBlockScanInfo->lastProcKey = row.pBlockData->aTSKEY[row.iRow];
|
pBlockScanInfo->lastProcKey = row.pBlockData->aTSKEY[row.iRow];
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
// no data in buffer, return immediately
|
// no data in buffer, return immediately
|
||||||
if (!(pBlockScanInfo->iter.hasVal || pBlockScanInfo->iiter.hasVal)) {
|
if (!(pBlockScanInfo->iter.hasVal || pBlockScanInfo->iiter.hasVal)) {
|
||||||
break;
|
break;
|
||||||
|
@ -4375,8 +4367,8 @@ static int32_t doTsdbNextDataBlockFilesetDelimited(STsdbReader* pReader) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
tsdbTrace("block from file rows: %"PRId64", will process pre-file set buffer: %d. %s",
|
tsdbTrace("block from file rows: %" PRId64 ", will process pre-file set buffer: %d. %s", pBlock->info.rows,
|
||||||
pBlock->info.rows, pStatus->bProcMemFirstFileset, pReader->idStr);
|
pStatus->bProcMemFirstFileset, pReader->idStr);
|
||||||
if (pStatus->bProcMemPreFileset) {
|
if (pStatus->bProcMemPreFileset) {
|
||||||
if (pBlock->info.rows > 0) {
|
if (pBlock->info.rows > 0) {
|
||||||
if (pReader->notifyFn) {
|
if (pReader->notifyFn) {
|
||||||
|
@ -5144,7 +5136,6 @@ void tsdbUntakeReadSnap2(STsdbReader* pReader, STsdbReadSnap* pSnap, bool proact
|
||||||
tsdbUnrefMemTable(pSnap->pIMem, pSnap->pINode, proactive);
|
tsdbUnrefMemTable(pSnap->pIMem, pSnap->pINode, proactive);
|
||||||
}
|
}
|
||||||
|
|
||||||
tsdbFSUnref(pTsdb, &pSnap->fs);
|
|
||||||
if (pSnap->pNode) taosMemoryFree(pSnap->pNode);
|
if (pSnap->pNode) taosMemoryFree(pSnap->pNode);
|
||||||
if (pSnap->pINode) taosMemoryFree(pSnap->pINode);
|
if (pSnap->pINode) taosMemoryFree(pSnap->pINode);
|
||||||
|
|
||||||
|
@ -5165,9 +5156,7 @@ void tsdbReaderSetId2(STsdbReader* pReader, const char* idstr) {
|
||||||
void tsdbReaderSetCloseFlag(STsdbReader* pReader) { /*pReader->code = TSDB_CODE_TSC_QUERY_CANCELLED;*/
|
void tsdbReaderSetCloseFlag(STsdbReader* pReader) { /*pReader->code = TSDB_CODE_TSC_QUERY_CANCELLED;*/
|
||||||
}
|
}
|
||||||
|
|
||||||
void tsdbSetFilesetDelimited(STsdbReader* pReader) {
|
void tsdbSetFilesetDelimited(STsdbReader* pReader) { pReader->bFilesetDelimited = true; }
|
||||||
pReader->bFilesetDelimited = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
void tsdbReaderSetNotifyCb(STsdbReader* pReader, TsdReaderNotifyCbFn notifyFn, void* param) {
|
void tsdbReaderSetNotifyCb(STsdbReader* pReader, TsdReaderNotifyCbFn notifyFn, void* param) {
|
||||||
pReader->notifyFn = notifyFn;
|
pReader->notifyFn = notifyFn;
|
||||||
|
|
|
@ -746,7 +746,7 @@ int32_t streamTaskSendCheckpointSourceRsp(SStreamTask* pTask) {
|
||||||
taosArrayClear(pTask->pReadyMsgList);
|
taosArrayClear(pTask->pReadyMsgList);
|
||||||
stDebug("s-task:%s level:%d source checkpoint completed msg sent to mnode", pTask->id.idStr, pTask->info.taskLevel);
|
stDebug("s-task:%s level:%d source checkpoint completed msg sent to mnode", pTask->id.idStr, pTask->info.taskLevel);
|
||||||
} else {
|
} else {
|
||||||
stDebug("s-task:%s level:%d already send rsp to mnode", pTask->id.idStr, pTask->info.taskLevel);
|
stDebug("s-task:%s level:%d already send rsp checkpoint success to mnode", pTask->id.idStr, pTask->info.taskLevel);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosThreadMutexUnlock(&pTask->lock);
|
taosThreadMutexUnlock(&pTask->lock);
|
||||||
|
|
|
@ -625,10 +625,29 @@ int32_t doStreamExecTask(SStreamTask* pTask) {
|
||||||
// todo other thread may change the status
|
// todo other thread may change the status
|
||||||
// do nothing after sync executor state to storage backend, untill the vnode-level checkpoint is completed.
|
// do nothing after sync executor state to storage backend, untill the vnode-level checkpoint is completed.
|
||||||
if (type == STREAM_INPUT__CHECKPOINT) {
|
if (type == STREAM_INPUT__CHECKPOINT) {
|
||||||
|
|
||||||
|
// todo add lock
|
||||||
char* p = NULL;
|
char* p = NULL;
|
||||||
streamTaskGetStatus(pTask, &p);
|
ETaskStatus s = streamTaskGetStatus(pTask, &p);
|
||||||
|
if (s == TASK_STATUS__CK) {
|
||||||
stDebug("s-task:%s checkpoint block received, set status:%s", pTask->id.idStr, p);
|
stDebug("s-task:%s checkpoint block received, set status:%s", pTask->id.idStr, p);
|
||||||
streamTaskBuildCheckpoint(pTask);
|
streamTaskBuildCheckpoint(pTask);
|
||||||
|
} else {
|
||||||
|
// todo refactor
|
||||||
|
int32_t code = 0;
|
||||||
|
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
|
||||||
|
code = streamTaskSendCheckpointSourceRsp(pTask);
|
||||||
|
} else {
|
||||||
|
code = streamTaskSendCheckpointReadyMsg(pTask);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
// todo: let's retry send rsp to upstream/mnode
|
||||||
|
stError("s-task:%s failed to send checkpoint rsp to upstream, checkpointId:%d, code:%s", pTask->id.idStr,
|
||||||
|
0, tstrerror(code));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -518,6 +518,7 @@ e
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/json_tag.py
|
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/json_tag.py
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQueryInterval.py
|
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQueryInterval.py
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/systable_func.py
|
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/systable_func.py
|
||||||
|
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/test_ts4382.py
|
||||||
|
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/stablity.py
|
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/stablity.py
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/stablity_1.py
|
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/stablity_1.py
|
||||||
|
|
|
@ -0,0 +1,75 @@
|
||||||
|
import random
|
||||||
|
import string
|
||||||
|
from util.log import *
|
||||||
|
from util.cases import *
|
||||||
|
from util.sql import *
|
||||||
|
from util.sqlset import *
|
||||||
|
from util import constant
|
||||||
|
from util.common import *
|
||||||
|
|
||||||
|
|
||||||
|
class TDTestCase:
|
||||||
|
"""Verify the jira TS-4382
|
||||||
|
"""
|
||||||
|
def init(self, conn, logSql, replicaVar=1):
|
||||||
|
self.replicaVar = int(replicaVar)
|
||||||
|
tdLog.debug("start to execute %s" % __file__)
|
||||||
|
tdSql.init(conn.cursor())
|
||||||
|
self.dbname = 'db'
|
||||||
|
self.stbname = 'st'
|
||||||
|
self.ctbname_list = ["ct1", "ct2"]
|
||||||
|
self.tag_value_list = ['{"instance":"100"}', '{"instance":"200"}']
|
||||||
|
|
||||||
|
def prepareData(self):
|
||||||
|
# db
|
||||||
|
tdSql.execute("create database {};".format(self.dbname))
|
||||||
|
tdSql.execute("use {};".format(self.dbname))
|
||||||
|
tdLog.debug("Create database %s" % self.dbname)
|
||||||
|
|
||||||
|
# super table
|
||||||
|
tdSql.execute("create table {} (ts timestamp, col1 int) tags (t1 json);".format(self.stbname))
|
||||||
|
tdLog.debug("Create super table %s" % self.stbname)
|
||||||
|
|
||||||
|
# child table
|
||||||
|
for i in range(len(self.ctbname_list)):
|
||||||
|
tdSql.execute("create table {} using {} tags('{}');".format(self.ctbname_list[i], self.stbname, self.tag_value_list[i]))
|
||||||
|
tdLog.debug("Create child table %s" % self.ctbname_list)
|
||||||
|
|
||||||
|
# insert data
|
||||||
|
tdSql.execute("insert into {} values(now, 1)(now+1s, 2)".format(self.ctbname_list[0]))
|
||||||
|
tdSql.execute("insert into {} values(now, null)(now+1s, null)".format(self.ctbname_list[1]))
|
||||||
|
|
||||||
|
def run(self):
|
||||||
|
self.prepareData()
|
||||||
|
sql_list = [
|
||||||
|
# super table query with correct tag name of json type
|
||||||
|
{
|
||||||
|
"sql": "select to_char(ts, 'yyyy-mm-dd hh24:mi:ss') as time, irate(col1) from st group by to_char(ts, 'yyyy-mm-dd hh24:mi:ss'), t1->'instance' order by time;",
|
||||||
|
"result_check": "0.0"
|
||||||
|
},
|
||||||
|
# child table query with incorrect tag name of json type
|
||||||
|
{
|
||||||
|
"sql": "select to_char(ts, 'yyyy-mm-dd hh24:mi:ss') as time, irate(col1) from ct1 group by to_char(ts, 'yyyy-mm-dd hh24:mi:ss'), t1->'name' order by time;",
|
||||||
|
"result_check": "None"
|
||||||
|
},
|
||||||
|
# child table query with null value
|
||||||
|
{
|
||||||
|
"sql": "select ts, avg(col1) from ct2 group by ts, t1->'name' order by ts;",
|
||||||
|
"result_check": "None"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
for sql_dic in sql_list:
|
||||||
|
tdSql.query(sql_dic["sql"])
|
||||||
|
tdLog.debug("execute sql: %s" % sql_dic["sql"])
|
||||||
|
for item in [row[1] for row in tdSql.queryResult]:
|
||||||
|
if sql_dic["result_check"] in str(item):
|
||||||
|
tdLog.debug("Check query result of '{}' successfully".format(sql_dic["sql"]))
|
||||||
|
break
|
||||||
|
|
||||||
|
def stop(self):
|
||||||
|
tdSql.close()
|
||||||
|
tdLog.success("%s successfully executed" % __file__)
|
||||||
|
|
||||||
|
tdCases.addWindows(__file__, TDTestCase())
|
||||||
|
tdCases.addLinux(__file__, TDTestCase())
|
||||||
|
|
Loading…
Reference in New Issue