[td-1373]

This commit is contained in:
Haojun Liao 2020-10-22 16:45:17 +08:00
parent ad499c5541
commit e06d7083c1
4 changed files with 10 additions and 15 deletions

View File

@ -761,9 +761,7 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
pSupporter->pTSBuf = pBuf; pSupporter->pTSBuf = pBuf;
} else { } else {
assert(pQueryInfo->numOfTables == 1); // for subquery, only one assert(pQueryInfo->numOfTables == 1); // for subquery, only one
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); tsBufMerge(pSupporter->pTSBuf, pBuf);
tsBufMerge(pSupporter->pTSBuf, pBuf, pTableMetaInfo->vgroupIndex);
tsBufDestroy(pBuf); tsBufDestroy(pBuf);
} }

View File

@ -111,7 +111,7 @@ STSBuf* tsBufCreateFromCompBlocks(const char* pData, int32_t numOfBlocks, int32_
void* tsBufDestroy(STSBuf* pTSBuf); void* tsBufDestroy(STSBuf* pTSBuf);
void tsBufAppend(STSBuf* pTSBuf, int32_t vnodeId, tVariant* tag, const char* pData, int32_t len); void tsBufAppend(STSBuf* pTSBuf, int32_t vnodeId, tVariant* tag, const char* pData, int32_t len);
int32_t tsBufMerge(STSBuf* pDestBuf, const STSBuf* pSrcBuf, int32_t vnodeIdx); int32_t tsBufMerge(STSBuf* pDestBuf, const STSBuf* pSrcBuf);
STSBuf* tsBufClone(STSBuf* pTSBuf); STSBuf* tsBufClone(STSBuf* pTSBuf);

View File

@ -715,7 +715,7 @@ STSElem tsBufGetElem(STSBuf* pTSBuf) {
* @param vnodeId * @param vnodeId
* @return * @return
*/ */
int32_t tsBufMerge(STSBuf* pDestBuf, const STSBuf* pSrcBuf, int32_t vnodeId) { int32_t tsBufMerge(STSBuf* pDestBuf, const STSBuf* pSrcBuf) {
if (pDestBuf == NULL || pSrcBuf == NULL || pSrcBuf->numOfVnodes <= 0) { if (pDestBuf == NULL || pSrcBuf == NULL || pSrcBuf->numOfVnodes <= 0) {
return 0; return 0;
} }
@ -725,14 +725,13 @@ int32_t tsBufMerge(STSBuf* pDestBuf, const STSBuf* pSrcBuf, int32_t vnodeId) {
} }
// src can only have one vnode index // src can only have one vnode index
if (pSrcBuf->numOfVnodes > 1) { assert(pSrcBuf->numOfVnodes == 1);
return -1;
}
// there are data in buffer, flush to disk first // there are data in buffer, flush to disk first
tsBufFlush(pDestBuf); tsBufFlush(pDestBuf);
// compared with the last vnode id // compared with the last vnode id
int32_t vnodeId = tsBufGetLastVnodeInfo((STSBuf*) pSrcBuf)->info.vnode;
if (vnodeId != tsBufGetLastVnodeInfo(pDestBuf)->info.vnode) { if (vnodeId != tsBufGetLastVnodeInfo(pDestBuf)->info.vnode) {
int32_t oldSize = pDestBuf->numOfVnodes; int32_t oldSize = pDestBuf->numOfVnodes;
int32_t newSize = oldSize + pSrcBuf->numOfVnodes; int32_t newSize = oldSize + pSrcBuf->numOfVnodes;

View File

@ -416,8 +416,8 @@ void mergeDiffVnodeBufferTest() {
int64_t* list = createTsList(num, start, step); int64_t* list = createTsList(num, start, step);
t.i64Key = i; t.i64Key = i;
tsBufAppend(pTSBuf1, 0, &t, (const char*)list, num * sizeof(int64_t)); tsBufAppend(pTSBuf1, 1, &t, (const char*)list, num * sizeof(int64_t));
tsBufAppend(pTSBuf2, 0, &t, (const char*)list, num * sizeof(int64_t)); tsBufAppend(pTSBuf2, 9, &t, (const char*)list, num * sizeof(int64_t));
free(list); free(list);
@ -426,7 +426,7 @@ void mergeDiffVnodeBufferTest() {
tsBufFlush(pTSBuf2); tsBufFlush(pTSBuf2);
tsBufMerge(pTSBuf1, pTSBuf2, 9); tsBufMerge(pTSBuf1, pTSBuf2);
EXPECT_EQ(pTSBuf1->numOfVnodes, 2); EXPECT_EQ(pTSBuf1->numOfVnodes, 2);
EXPECT_EQ(pTSBuf1->numOfTotal, numOfTags * 2 * num); EXPECT_EQ(pTSBuf1->numOfTotal, numOfTags * 2 * num);
@ -459,8 +459,6 @@ void mergeIdenticalVnodeBufferTest() {
start += step * num; start += step * num;
} }
for (int32_t i = numOfTags; i < numOfTags * 2; ++i) { for (int32_t i = numOfTags; i < numOfTags * 2; ++i) {
int64_t* list = createTsList(num, start, step); int64_t* list = createTsList(num, start, step);
@ -473,7 +471,7 @@ void mergeIdenticalVnodeBufferTest() {
tsBufFlush(pTSBuf2); tsBufFlush(pTSBuf2);
tsBufMerge(pTSBuf1, pTSBuf2, 12); tsBufMerge(pTSBuf1, pTSBuf2);
EXPECT_EQ(pTSBuf1->numOfVnodes, 1); EXPECT_EQ(pTSBuf1->numOfVnodes, 1);
EXPECT_EQ(pTSBuf1->numOfTotal, numOfTags * 2 * num); EXPECT_EQ(pTSBuf1->numOfTotal, numOfTags * 2 * num);