fix(stream): check the pk during merge rows with identical timestamp.
This commit is contained in:
parent
9ed74085d8
commit
df516703e1
|
@ -324,6 +324,7 @@ int32_t doBuildAndSendSubmitMsg(SVnode* pVnode, SStreamTask* pTask, SSubmitReq2*
|
||||||
int32_t doMergeExistedRows(SSubmitTbData* pExisted, const SSubmitTbData* pNew, const char* id) {
|
int32_t doMergeExistedRows(SSubmitTbData* pExisted, const SSubmitTbData* pNew, const char* id) {
|
||||||
int32_t oldLen = taosArrayGetSize(pExisted->aRowP);
|
int32_t oldLen = taosArrayGetSize(pExisted->aRowP);
|
||||||
int32_t newLen = taosArrayGetSize(pNew->aRowP);
|
int32_t newLen = taosArrayGetSize(pNew->aRowP);
|
||||||
|
int32_t numOfPk = 0;
|
||||||
|
|
||||||
int32_t j = 0, k = 0;
|
int32_t j = 0, k = 0;
|
||||||
SArray* pFinal = taosArrayInit(oldLen + newLen, POINTER_BYTES);
|
SArray* pFinal = taosArrayInit(oldLen + newLen, POINTER_BYTES);
|
||||||
|
@ -335,17 +336,40 @@ int32_t doMergeExistedRows(SSubmitTbData* pExisted, const SSubmitTbData* pNew, c
|
||||||
while (j < newLen && k < oldLen) {
|
while (j < newLen && k < oldLen) {
|
||||||
SRow* pNewRow = taosArrayGetP(pNew->aRowP, j);
|
SRow* pNewRow = taosArrayGetP(pNew->aRowP, j);
|
||||||
SRow* pOldRow = taosArrayGetP(pExisted->aRowP, k);
|
SRow* pOldRow = taosArrayGetP(pExisted->aRowP, k);
|
||||||
if (pNewRow->ts <= pOldRow->ts) {
|
if (pNewRow->ts < pOldRow->ts) {
|
||||||
taosArrayPush(pFinal, &pNewRow);
|
taosArrayPush(pFinal, &pNewRow);
|
||||||
j += 1;
|
j += 1;
|
||||||
|
} else if (pNewRow->ts > pOldRow->ts) {
|
||||||
if (pNewRow->ts == pOldRow->ts) {
|
|
||||||
k += 1;
|
|
||||||
tRowDestroy(pOldRow);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
taosArrayPush(pFinal, &pOldRow);
|
taosArrayPush(pFinal, &pOldRow);
|
||||||
k += 1;
|
k += 1;
|
||||||
|
} else {
|
||||||
|
// check for the existance of primary key
|
||||||
|
if (pNewRow->numOfPKs == 0) {
|
||||||
|
taosArrayPush(pFinal, &pNewRow);
|
||||||
|
k += 1;
|
||||||
|
j += 1;
|
||||||
|
tRowDestroy(pOldRow);
|
||||||
|
} else {
|
||||||
|
numOfPk = pNewRow->numOfPKs;
|
||||||
|
|
||||||
|
SRowKey kNew, kOld;
|
||||||
|
tRowGetKey(pNewRow, &kNew);
|
||||||
|
tRowGetKey(pOldRow, &kOld);
|
||||||
|
|
||||||
|
int32_t ret = tRowKeyCompare(&kNew, &kOld);
|
||||||
|
if (ret <= 0) {
|
||||||
|
taosArrayPush(pFinal, &pNewRow);
|
||||||
|
j += 1;
|
||||||
|
|
||||||
|
if (ret == 0) {
|
||||||
|
k += 1;
|
||||||
|
tRowDestroy(pOldRow);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
taosArrayPush(pFinal, &pOldRow);
|
||||||
|
k += 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -363,8 +387,8 @@ int32_t doMergeExistedRows(SSubmitTbData* pExisted, const SSubmitTbData* pNew, c
|
||||||
taosArrayDestroy(pExisted->aRowP);
|
taosArrayDestroy(pExisted->aRowP);
|
||||||
pExisted->aRowP = pFinal;
|
pExisted->aRowP = pFinal;
|
||||||
|
|
||||||
tqTrace("s-task:%s rows merged, final rows:%d, uid:%" PRId64 ", existed auto-create table:%d, new-block:%d", id,
|
tqTrace("s-task:%s rows merged, final rows:%d, pk:%d uid:%" PRId64 ", existed auto-create table:%d, new-block:%d",
|
||||||
(int32_t)taosArrayGetSize(pFinal), pExisted->uid, (pExisted->pCreateTbReq != NULL),
|
id, (int32_t)taosArrayGetSize(pFinal), numOfPk, pExisted->uid, (pExisted->pCreateTbReq != NULL),
|
||||||
(pNew->pCreateTbReq != NULL));
|
(pNew->pCreateTbReq != NULL));
|
||||||
|
|
||||||
tdDestroySVCreateTbReq(pNew->pCreateTbReq);
|
tdDestroySVCreateTbReq(pNew->pCreateTbReq);
|
||||||
|
|
Loading…
Reference in New Issue