chore: more check
This commit is contained in:
parent
314123ef34
commit
7c85f7701d
|
@ -1911,7 +1911,7 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq2** ppReq, const SSDataBlock* pDat
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
char dupKey[40];
|
char dupKey[50];
|
||||||
|
|
||||||
for (int32_t i = 0; i < sz; ++i) {
|
for (int32_t i = 0; i < sz; ++i) {
|
||||||
int32_t colNum = taosArrayGetSize(pDataBlock->pDataBlock);
|
int32_t colNum = taosArrayGetSize(pDataBlock->pDataBlock);
|
||||||
|
@ -1957,10 +1957,10 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq2** ppReq, const SSDataBlock* pDat
|
||||||
ASSERT(PRIMARYKEY_TIMESTAMP_COL_ID == pCol->colId);
|
ASSERT(PRIMARYKEY_TIMESTAMP_COL_ID == pCol->colId);
|
||||||
SColVal cv = COL_VAL_VALUE(pCol->colId, pCol->type, (SValue){.val = *(TSKEY*)var});
|
SColVal cv = COL_VAL_VALUE(pCol->colId, pCol->type, (SValue){.val = *(TSKEY*)var});
|
||||||
taosArrayPush(pVals, &cv);
|
taosArrayPush(pVals, &cv);
|
||||||
snprintf(dupKey, 40, "%" PRIi64 ":%" PRIi64, *(TSKEY*)var, blkVer);
|
snprintf(dupKey, 50, "%d:%" PRIi64 ":%" PRIi64, vgId, *(TSKEY*)var, blkVer);
|
||||||
uInfo("%s:%d key:ver: %s, tags: %s", __func__, __LINE__, dupKey, tag);
|
uInfo("%s:%d key:ver: %s, tags: %s", __func__, __LINE__, dupKey, tag);
|
||||||
int32_t dupKeyLen = strlen(dupKey);
|
int32_t dupKeyLen = strlen(dupKey);
|
||||||
assert(dupKeyLen < 40);
|
assert(dupKeyLen < 50);
|
||||||
void* hashKey = NULL;
|
void* hashKey = NULL;
|
||||||
if ((hashKey = taosHashGet(dupCheck, &dupKey, dupKeyLen + 1))) {
|
if ((hashKey = taosHashGet(dupCheck, &dupKey, dupKeyLen + 1))) {
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
|
|
|
@ -858,9 +858,10 @@ static int32_t tdExecuteRSmaAsync(SSma *pSma, int64_t version, const void *pMsg,
|
||||||
}
|
}
|
||||||
|
|
||||||
if (inputType == STREAM_INPUT__DATA_SUBMIT) {
|
if (inputType == STREAM_INPUT__DATA_SUBMIT) {
|
||||||
|
char dupKey[40];
|
||||||
if (!dupVerCheck) {
|
if (!dupVerCheck) {
|
||||||
if (0 == atomic_val_compare_exchange_8(&dupVerInit, 0, 1)) {
|
if (0 == atomic_val_compare_exchange_8(&dupVerInit, 0, 1)) {
|
||||||
dupVerCheck = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
|
dupVerCheck = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
|
||||||
if (!dupVerCheck) ASSERT(0);
|
if (!dupVerCheck) ASSERT(0);
|
||||||
} else {
|
} else {
|
||||||
int32_t cnt = 0;
|
int32_t cnt = 0;
|
||||||
|
@ -873,14 +874,19 @@ static int32_t tdExecuteRSmaAsync(SSma *pSma, int64_t version, const void *pMsg,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
snprintf(dupKey, 40, "%d:%" PRIi64 ":%" PRIi64, SMA_VID(pSma), version);
|
||||||
|
int32_t dupKeyLen = strlen(dupKey);
|
||||||
|
assert(dupKeyLen < 40);
|
||||||
void *hashKey = NULL;
|
void *hashKey = NULL;
|
||||||
if ((hashKey = taosHashGet(dupVerCheck, &version, sizeof(version)))) {
|
if ((hashKey = taosHashGet(dupVerCheck, &dupKey, dupKeyLen + 1))) {
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
} else {
|
} else {
|
||||||
if (taosHashPut(dupVerCheck, &version, sizeof(version), NULL, 0) != 0) {
|
if (taosHashPut(dupVerCheck, &dupKey, dupKeyLen + 1, NULL, 0) != 0) {
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (tdExecuteRSmaImplAsync(pSma, version, pMsg, len, inputType, pRSmaInfo, suid) < 0) {
|
if (tdExecuteRSmaImplAsync(pSma, version, pMsg, len, inputType, pRSmaInfo, suid) < 0) {
|
||||||
tdReleaseRSmaInfo(pSma, pRSmaInfo);
|
tdReleaseRSmaInfo(pSma, pRSmaInfo);
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
|
|
Loading…
Reference in New Issue