more work
This commit is contained in:
parent
3c18269c29
commit
d59b0254d3
|
@ -109,8 +109,8 @@ typedef struct SQueryTableDataCond {
|
||||||
int32_t type; // data block load type:
|
int32_t type; // data block load type:
|
||||||
int32_t numOfTWindows;
|
int32_t numOfTWindows;
|
||||||
STimeWindow* twindows;
|
STimeWindow* twindows;
|
||||||
int32_t startVersion;
|
int64_t startVersion;
|
||||||
int32_t endVersion;
|
int64_t endVersion;
|
||||||
} SQueryTableDataCond;
|
} SQueryTableDataCond;
|
||||||
|
|
||||||
void* blockDataDestroy(SSDataBlock* pBlock);
|
void* blockDataDestroy(SSDataBlock* pBlock);
|
||||||
|
|
|
@ -119,7 +119,6 @@ typedef void *STsdbReader;
|
||||||
STsdbReader *tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, STableListInfo *tableInfoGroup, uint64_t qId,
|
STsdbReader *tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, STableListInfo *tableInfoGroup, uint64_t qId,
|
||||||
uint64_t taskId);
|
uint64_t taskId);
|
||||||
int32_t tsdbGetFileBlocksDistInfo(STsdbReader *pReader, STableBlockDistInfo *pTableBlockInfo);
|
int32_t tsdbGetFileBlocksDistInfo(STsdbReader *pReader, STableBlockDistInfo *pTableBlockInfo);
|
||||||
bool isTsdbCacheLastRow(STsdbReader *pReader);
|
|
||||||
int32_t tsdbGetAllTableList(SMeta *pMeta, uint64_t uid, SArray *list);
|
int32_t tsdbGetAllTableList(SMeta *pMeta, uint64_t uid, SArray *list);
|
||||||
int32_t tsdbGetCtbIdList(SMeta *pMeta, int64_t suid, SArray *list);
|
int32_t tsdbGetCtbIdList(SMeta *pMeta, int64_t suid, SArray *list);
|
||||||
void *tsdbGetIdx(SMeta *pMeta);
|
void *tsdbGetIdx(SMeta *pMeta);
|
||||||
|
|
|
@ -138,6 +138,8 @@ void tsdbFree(uint8_t *pBuf);
|
||||||
int32_t tTABLEIDCmprFn(const void *p1, const void *p2);
|
int32_t tTABLEIDCmprFn(const void *p1, const void *p2);
|
||||||
int32_t tsdbKeyCmprFn(const void *p1, const void *p2);
|
int32_t tsdbKeyCmprFn(const void *p1, const void *p2);
|
||||||
|
|
||||||
|
int32_t tsdbBuildDeleteSkyline(SArray *aDelData, int32_t sidx, int32_t eidx, SArray *aSkyline);
|
||||||
|
|
||||||
// SMapData
|
// SMapData
|
||||||
void tMapDataReset(SMapData *pMapData);
|
void tMapDataReset(SMapData *pMapData);
|
||||||
void tMapDataClear(SMapData *pMapData);
|
void tMapDataClear(SMapData *pMapData);
|
||||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -458,3 +458,109 @@ int32_t tsdbColDataBlockAppend(SColDataBlock *pColDataBlock, TSDBROW *pRow, STSc
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// delete skyline ======================================================
|
||||||
|
static int32_t tsdbMergeSkyline(SArray *aSkyline1, SArray *aSkyline2, SArray *aSkyline) {
|
||||||
|
int32_t code = 0;
|
||||||
|
int32_t i1 = 0;
|
||||||
|
int32_t n1 = taosArrayGetSize(aSkyline1);
|
||||||
|
int32_t i2 = 0;
|
||||||
|
int32_t n2 = taosArrayGetSize(aSkyline2);
|
||||||
|
TSDBKEY *pSkyline1;
|
||||||
|
TSDBKEY *pSkyline2;
|
||||||
|
TSDBKEY item;
|
||||||
|
int64_t version1 = 0;
|
||||||
|
int64_t version2 = 0;
|
||||||
|
|
||||||
|
ASSERT(n1 > 0 && n2 > 0);
|
||||||
|
|
||||||
|
taosArrayClear(aSkyline);
|
||||||
|
|
||||||
|
while (i1 < n1 && i2 < n2) {
|
||||||
|
pSkyline1 = (TSDBKEY *)taosArrayGet(aSkyline1, i1);
|
||||||
|
pSkyline2 = (TSDBKEY *)taosArrayGet(aSkyline2, i2);
|
||||||
|
|
||||||
|
if (pSkyline1->ts < pSkyline2->ts) {
|
||||||
|
version1 = pSkyline1->version;
|
||||||
|
i1++;
|
||||||
|
} else if (pSkyline1->ts > pSkyline2->ts) {
|
||||||
|
version2 = pSkyline2->version;
|
||||||
|
i2++;
|
||||||
|
} else {
|
||||||
|
version1 = pSkyline1->version;
|
||||||
|
version2 = pSkyline2->version;
|
||||||
|
i1++;
|
||||||
|
i2++;
|
||||||
|
}
|
||||||
|
|
||||||
|
item.ts = TMIN(pSkyline1->ts, pSkyline2->ts);
|
||||||
|
item.version = TMAX(version1, version2);
|
||||||
|
if (taosArrayPush(aSkyline, &item) == NULL) {
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
goto _exit;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
while (i1 < n1) {
|
||||||
|
pSkyline1 = (TSDBKEY *)taosArrayGet(aSkyline1, i1);
|
||||||
|
item.ts = pSkyline1->ts;
|
||||||
|
item.version = pSkyline1->version;
|
||||||
|
if (taosArrayPush(aSkyline, &item) == NULL) {
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
goto _exit;
|
||||||
|
}
|
||||||
|
i1++;
|
||||||
|
}
|
||||||
|
|
||||||
|
while (i2 < n2) {
|
||||||
|
pSkyline2 = (TSDBKEY *)taosArrayGet(aSkyline2, i2);
|
||||||
|
item.ts = pSkyline2->ts;
|
||||||
|
item.version = pSkyline2->version;
|
||||||
|
if (taosArrayPush(aSkyline, &item) == NULL) {
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
goto _exit;
|
||||||
|
}
|
||||||
|
i2++;
|
||||||
|
}
|
||||||
|
|
||||||
|
_exit:
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
int32_t tsdbBuildDeleteSkyline(SArray *aDelData, int32_t sidx, int32_t eidx, SArray *aSkyline) {
|
||||||
|
int32_t code = 0;
|
||||||
|
SDelData *pDelData;
|
||||||
|
int32_t midx;
|
||||||
|
|
||||||
|
taosArrayClear(aSkyline);
|
||||||
|
if (sidx == eidx) {
|
||||||
|
pDelData = (SDelData *)taosArrayGet(aDelData, sidx);
|
||||||
|
taosArrayPush(aSkyline, &(TSDBKEY){.ts = pDelData->sKey, .version = pDelData->version});
|
||||||
|
taosArrayPush(aSkyline, &(TSDBKEY){.ts = pDelData->eKey, .version = 0});
|
||||||
|
} else {
|
||||||
|
SArray *aSkyline1 = NULL;
|
||||||
|
SArray *aSkyline2 = NULL;
|
||||||
|
|
||||||
|
aSkyline1 = taosArrayInit(0, sizeof(TSDBKEY));
|
||||||
|
aSkyline2 = taosArrayInit(0, sizeof(TSDBKEY));
|
||||||
|
if (aSkyline1 == NULL || aSkyline2 == NULL) {
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
goto _clear;
|
||||||
|
}
|
||||||
|
|
||||||
|
midx = (sidx + eidx) / 2;
|
||||||
|
|
||||||
|
code = tsdbBuildDeleteSkyline(aDelData, sidx, midx, aSkyline1);
|
||||||
|
if (code) goto _clear;
|
||||||
|
|
||||||
|
code = tsdbBuildDeleteSkyline(aDelData, midx + 1, eidx, aSkyline2);
|
||||||
|
if (code) goto _clear;
|
||||||
|
|
||||||
|
code = tsdbMergeSkyline(aSkyline1, aSkyline2, aSkyline);
|
||||||
|
|
||||||
|
_clear:
|
||||||
|
taosArrayDestroy(aSkyline1);
|
||||||
|
taosArrayDestroy(aSkyline2);
|
||||||
|
}
|
||||||
|
|
||||||
|
return code;
|
||||||
|
}
|
Loading…
Reference in New Issue