chore: add check

This commit is contained in:
kailixu 2023-07-11 14:05:09 +08:00
parent d37a760655
commit 314123ef34
1 changed files with 27 additions and 0 deletions

View File

@ -845,6 +845,10 @@ static FORCE_INLINE void tdReleaseRSmaInfo(SSma *pSma, SRSmaInfo *pInfo) {
* @param suid
* @return int32_t
*/
static SHashObj* dupVerCheck = NULL;
static int8_t dupVerInit = 0;
static int32_t tdExecuteRSmaAsync(SSma *pSma, int64_t version, const void *pMsg, int32_t len, int32_t inputType,
tb_uid_t suid) {
SRSmaInfo *pRSmaInfo = tdAcquireRSmaInfoBySuid(pSma, suid);
@ -854,6 +858,29 @@ static int32_t tdExecuteRSmaAsync(SSma *pSma, int64_t version, const void *pMsg,
}
if (inputType == STREAM_INPUT__DATA_SUBMIT) {
if (!dupVerCheck) {
if (0 == atomic_val_compare_exchange_8(&dupVerInit, 0, 1)) {
dupVerCheck = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
if (!dupVerCheck) ASSERT(0);
} else {
int32_t cnt = 0;
while (!dupVerCheck) {
++cnt;
if (cnt > 1000) {
sched_yield();
cnt = 0;
}
}
}
}
void *hashKey = NULL;
if ((hashKey = taosHashGet(dupVerCheck, &version, sizeof(version)))) {
ASSERT(0);
} else {
if (taosHashPut(dupVerCheck, &version, sizeof(version), NULL, 0) != 0) {
ASSERT(0);
}
}
if (tdExecuteRSmaImplAsync(pSma, version, pMsg, len, inputType, pRSmaInfo, suid) < 0) {
tdReleaseRSmaInfo(pSma, pRSmaInfo);
return TSDB_CODE_FAILED;