fix exchange operator memory access problem
This commit is contained in:
parent
812777a664
commit
6fcd071144
|
@ -114,6 +114,12 @@ int32_t atomic_fetch_xor_32(int32_t volatile *ptr, int32_t val);
|
||||||
int64_t atomic_fetch_xor_64(int64_t volatile *ptr, int64_t val);
|
int64_t atomic_fetch_xor_64(int64_t volatile *ptr, int64_t val);
|
||||||
void *atomic_fetch_xor_ptr(void *ptr, void *val);
|
void *atomic_fetch_xor_ptr(void *ptr, void *val);
|
||||||
|
|
||||||
|
#ifdef _MSC_VER
|
||||||
|
#define tmemory_barrier(order) MemoryBarrier()
|
||||||
|
#else
|
||||||
|
#define tmemory_barrier(order) __sync_synchronize()
|
||||||
|
#endif
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -93,6 +93,7 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
tmemory_barrier();
|
||||||
SRetrieveTableRsp* pRsp = pDataInfo->pRsp;
|
SRetrieveTableRsp* pRsp = pDataInfo->pRsp;
|
||||||
SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pDataInfo->index);
|
SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pDataInfo->index);
|
||||||
|
|
||||||
|
@ -428,6 +429,7 @@ int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
tmemory_barrier();
|
||||||
pSourceDataInfo->status = EX_SOURCE_DATA_READY;
|
pSourceDataInfo->status = EX_SOURCE_DATA_READY;
|
||||||
code = tsem_post(&pExchangeInfo->ready);
|
code = tsem_post(&pExchangeInfo->ready);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
|
Loading…
Reference in New Issue