diff --git a/include/os/osAtomic.h b/include/os/osAtomic.h index 48b7b8c56f..97bb6f53ba 100644 --- a/include/os/osAtomic.h +++ b/include/os/osAtomic.h @@ -114,6 +114,12 @@ int32_t atomic_fetch_xor_32(int32_t volatile *ptr, int32_t val); int64_t atomic_fetch_xor_64(int64_t volatile *ptr, int64_t val); void *atomic_fetch_xor_ptr(void *ptr, void *val); +#ifdef _MSC_VER +#define tmemory_barrier(order) MemoryBarrier() +#else +#define tmemory_barrier(order) __sync_synchronize() +#endif + #ifdef __cplusplus } #endif diff --git a/source/libs/executor/src/exchangeoperator.c b/source/libs/executor/src/exchangeoperator.c index 06dd43e170..631a92f1be 100644 --- a/source/libs/executor/src/exchangeoperator.c +++ b/source/libs/executor/src/exchangeoperator.c @@ -93,6 +93,7 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn goto _error; } + tmemory_barrier(); SRetrieveTableRsp* pRsp = pDataInfo->pRsp; SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pDataInfo->index); @@ -428,6 +429,7 @@ int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code) { } } + tmemory_barrier(); pSourceDataInfo->status = EX_SOURCE_DATA_READY; code = tsem_post(&pExchangeInfo->ready); if (code != TSDB_CODE_SUCCESS) {