Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
166 changes: 96 additions & 70 deletions src/concurrency/data_race.rs
Original file line number Diff line number Diff line change
Expand Up @@ -275,20 +275,20 @@ impl MemoryCellClocks {
/// not used previously as atomic memory.
fn load_acquire(
&mut self,
clocks: &mut ThreadClockSet,
thread_clocks: &mut ThreadClockSet,
index: VectorIdx,
) -> Result<(), DataRace> {
self.atomic_read_detect(clocks, index)?;
self.atomic_read_detect(thread_clocks, index)?;
if let Some(atomic) = self.atomic() {
clocks.clock.join(&atomic.sync_vector);
thread_clocks.clock.join(&atomic.sync_vector);
}
Ok(())
}

/// Checks if the memory cell access is ordered with all prior atomic reads and writes
fn race_free_with_atomic(&self, clocks: &ThreadClockSet) -> bool {
fn race_free_with_atomic(&self, thread_clocks: &ThreadClockSet) -> bool {
if let Some(atomic) = self.atomic() {
atomic.read_vector <= clocks.clock && atomic.write_vector <= clocks.clock
atomic.read_vector <= thread_clocks.clock && atomic.write_vector <= thread_clocks.clock
} else {
true
}
Expand All @@ -299,81 +299,97 @@ impl MemoryCellClocks {
/// not used previously as atomic memory.
fn load_relaxed(
&mut self,
clocks: &mut ThreadClockSet,
thread_clocks: &mut ThreadClockSet,
index: VectorIdx,
) -> Result<(), DataRace> {
self.atomic_read_detect(clocks, index)?;
self.atomic_read_detect(thread_clocks, index)?;
if let Some(atomic) = self.atomic() {
clocks.fence_acquire.join(&atomic.sync_vector);
thread_clocks.fence_acquire.join(&atomic.sync_vector);
}
Ok(())
}

/// Update the memory cell data-race tracking for atomic
/// store release semantics.
fn store_release(&mut self, clocks: &ThreadClockSet, index: VectorIdx) -> Result<(), DataRace> {
self.atomic_write_detect(clocks, index)?;
fn store_release(
&mut self,
thread_clocks: &ThreadClockSet,
index: VectorIdx,
) -> Result<(), DataRace> {
self.atomic_write_detect(thread_clocks, index)?;
let atomic = self.atomic_mut();
atomic.sync_vector.clone_from(&clocks.clock);
atomic.sync_vector.clone_from(&thread_clocks.clock);
Ok(())
}

/// Update the memory cell data-race tracking for atomic
/// store relaxed semantics.
fn store_relaxed(&mut self, clocks: &ThreadClockSet, index: VectorIdx) -> Result<(), DataRace> {
self.atomic_write_detect(clocks, index)?;
fn store_relaxed(
&mut self,
thread_clocks: &ThreadClockSet,
index: VectorIdx,
) -> Result<(), DataRace> {
self.atomic_write_detect(thread_clocks, index)?;

// The handling of release sequences was changed in C++20 and so
// the code here is different to the paper since now all relaxed
// stores block release sequences. The exception for same-thread
// relaxed stores has been removed.
let atomic = self.atomic_mut();
atomic.sync_vector.clone_from(&clocks.fence_release);
atomic.sync_vector.clone_from(&thread_clocks.fence_release);
Ok(())
}

/// Update the memory cell data-race tracking for atomic
/// store release semantics for RMW operations.
fn rmw_release(&mut self, clocks: &ThreadClockSet, index: VectorIdx) -> Result<(), DataRace> {
self.atomic_write_detect(clocks, index)?;
fn rmw_release(
&mut self,
thread_clocks: &ThreadClockSet,
index: VectorIdx,
) -> Result<(), DataRace> {
self.atomic_write_detect(thread_clocks, index)?;
let atomic = self.atomic_mut();
atomic.sync_vector.join(&clocks.clock);
atomic.sync_vector.join(&thread_clocks.clock);
Ok(())
}

/// Update the memory cell data-race tracking for atomic
/// store relaxed semantics for RMW operations.
fn rmw_relaxed(&mut self, clocks: &ThreadClockSet, index: VectorIdx) -> Result<(), DataRace> {
self.atomic_write_detect(clocks, index)?;
fn rmw_relaxed(
&mut self,
thread_clocks: &ThreadClockSet,
index: VectorIdx,
) -> Result<(), DataRace> {
self.atomic_write_detect(thread_clocks, index)?;
let atomic = self.atomic_mut();
atomic.sync_vector.join(&clocks.fence_release);
atomic.sync_vector.join(&thread_clocks.fence_release);
Ok(())
}

/// Detect data-races with an atomic read, caused by a non-atomic write that does
/// not happen-before the atomic-read.
fn atomic_read_detect(
&mut self,
clocks: &ThreadClockSet,
thread_clocks: &ThreadClockSet,
index: VectorIdx,
) -> Result<(), DataRace> {
log::trace!("Atomic read with vectors: {:#?} :: {:#?}", self, clocks);
log::trace!("Atomic read with vectors: {:#?} :: {:#?}", self, thread_clocks);
let atomic = self.atomic_mut();
atomic.read_vector.set_at_index(&clocks.clock, index);
if self.write <= clocks.clock[self.write_index] { Ok(()) } else { Err(DataRace) }
atomic.read_vector.set_at_index(&thread_clocks.clock, index);
if self.write <= thread_clocks.clock[self.write_index] { Ok(()) } else { Err(DataRace) }
}

/// Detect data-races with an atomic write, either with a non-atomic read or with
/// a non-atomic write.
fn atomic_write_detect(
&mut self,
clocks: &ThreadClockSet,
thread_clocks: &ThreadClockSet,
index: VectorIdx,
) -> Result<(), DataRace> {
log::trace!("Atomic write with vectors: {:#?} :: {:#?}", self, clocks);
log::trace!("Atomic write with vectors: {:#?} :: {:#?}", self, thread_clocks);
let atomic = self.atomic_mut();
atomic.write_vector.set_at_index(&clocks.clock, index);
if self.write <= clocks.clock[self.write_index] && self.read <= clocks.clock {
atomic.write_vector.set_at_index(&thread_clocks.clock, index);
if self.write <= thread_clocks.clock[self.write_index] && self.read <= thread_clocks.clock {
Ok(())
} else {
Err(DataRace)
Expand All @@ -384,21 +400,21 @@ impl MemoryCellClocks {
/// returns true if a data-race is detected.
fn read_race_detect(
&mut self,
clocks: &mut ThreadClockSet,
thread_clocks: &mut ThreadClockSet,
index: VectorIdx,
current_span: Span,
) -> Result<(), DataRace> {
log::trace!("Unsynchronized read with vectors: {:#?} :: {:#?}", self, clocks);
log::trace!("Unsynchronized read with vectors: {:#?} :: {:#?}", self, thread_clocks);
if !current_span.is_dummy() {
clocks.clock[index].span = current_span;
thread_clocks.clock[index].span = current_span;
}
if self.write <= clocks.clock[self.write_index] {
if self.write <= thread_clocks.clock[self.write_index] {
let race_free = if let Some(atomic) = self.atomic() {
atomic.write_vector <= clocks.clock
atomic.write_vector <= thread_clocks.clock
} else {
true
};
self.read.set_at_index(&clocks.clock, index);
self.read.set_at_index(&thread_clocks.clock, index);
if race_free { Ok(()) } else { Err(DataRace) }
} else {
Err(DataRace)
Expand All @@ -409,22 +425,23 @@ impl MemoryCellClocks {
/// returns true if a data-race is detected.
fn write_race_detect(
&mut self,
clocks: &mut ThreadClockSet,
thread_clocks: &mut ThreadClockSet,
index: VectorIdx,
write_type: WriteType,
current_span: Span,
) -> Result<(), DataRace> {
log::trace!("Unsynchronized write with vectors: {:#?} :: {:#?}", self, clocks);
log::trace!("Unsynchronized write with vectors: {:#?} :: {:#?}", self, thread_clocks);
if !current_span.is_dummy() {
clocks.clock[index].span = current_span;
thread_clocks.clock[index].span = current_span;
}
if self.write <= clocks.clock[self.write_index] && self.read <= clocks.clock {
if self.write <= thread_clocks.clock[self.write_index] && self.read <= thread_clocks.clock {
let race_free = if let Some(atomic) = self.atomic() {
atomic.write_vector <= clocks.clock && atomic.read_vector <= clocks.clock
atomic.write_vector <= thread_clocks.clock
&& atomic.read_vector <= thread_clocks.clock
} else {
true
};
self.write = clocks.clock[index];
self.write = thread_clocks.clock[index];
self.write_index = index;
self.write_type = write_type;
if race_free {
Expand Down Expand Up @@ -764,24 +781,24 @@ impl VClockAlloc {
fn report_data_race<'tcx>(
global: &GlobalState,
thread_mgr: &ThreadManager<'_, '_>,
range: &MemoryCellClocks,
mem_clocks: &MemoryCellClocks,
action: &str,
is_atomic: bool,
ptr_dbg: Pointer<AllocId>,
) -> InterpResult<'tcx> {
let (current_index, current_clocks) = global.current_thread_state(thread_mgr);
let write_clock;
let (other_action, other_thread, other_clock) = if range.write
> current_clocks.clock[range.write_index]
let (other_action, other_thread, other_clock) = if mem_clocks.write
> current_clocks.clock[mem_clocks.write_index]
{
// Convert the write action into the vector clock it
// represents for diagnostic purposes.
write_clock = VClock::new_with_index(range.write_index, range.write);
(range.write_type.get_descriptor(), range.write_index, &write_clock)
} else if let Some(idx) = Self::find_gt_index(&range.read, &current_clocks.clock) {
("Read", idx, &range.read)
write_clock = VClock::new_with_index(mem_clocks.write_index, mem_clocks.write);
(mem_clocks.write_type.get_descriptor(), mem_clocks.write_index, &write_clock)
} else if let Some(idx) = Self::find_gt_index(&mem_clocks.read, &current_clocks.clock) {
("Read", idx, &mem_clocks.read)
} else if !is_atomic {
if let Some(atomic) = range.atomic() {
if let Some(atomic) = mem_clocks.atomic() {
if let Some(idx) = Self::find_gt_index(&atomic.write_vector, &current_clocks.clock)
{
("Atomic Store", idx, &atomic.write_vector)
Expand Down Expand Up @@ -832,10 +849,10 @@ impl VClockAlloc {
thread_mgr: &ThreadManager<'_, '_>,
) -> bool {
if global.race_detecting() {
let (_, clocks) = global.current_thread_state(thread_mgr);
let (_, thread_clocks) = global.current_thread_state(thread_mgr);
let alloc_ranges = self.alloc_ranges.borrow();
for (_, range) in alloc_ranges.iter(range.start, range.size) {
if !range.race_free_with_atomic(&clocks) {
for (_, mem_clocks) in alloc_ranges.iter(range.start, range.size) {
if !mem_clocks.race_free_with_atomic(&thread_clocks) {
return false;
}
}
Expand All @@ -857,16 +874,18 @@ impl VClockAlloc {
let current_span = machine.current_span();
let global = machine.data_race.as_ref().unwrap();
if global.race_detecting() {
let (index, mut clocks) = global.current_thread_state_mut(&machine.threads);
let (index, mut thread_clocks) = global.current_thread_state_mut(&machine.threads);
let mut alloc_ranges = self.alloc_ranges.borrow_mut();
for (offset, range) in alloc_ranges.iter_mut(range.start, range.size) {
if let Err(DataRace) = range.read_race_detect(&mut clocks, index, current_span) {
drop(clocks);
for (offset, mem_clocks) in alloc_ranges.iter_mut(range.start, range.size) {
if let Err(DataRace) =
mem_clocks.read_race_detect(&mut thread_clocks, index, current_span)
{
drop(thread_clocks);
// Report data-race.
return Self::report_data_race(
global,
&machine.threads,
range,
mem_clocks,
"Read",
false,
Pointer::new(alloc_id, offset),
Expand All @@ -890,17 +909,22 @@ impl VClockAlloc {
let current_span = machine.current_span();
let global = machine.data_race.as_mut().unwrap();
if global.race_detecting() {
let (index, mut clocks) = global.current_thread_state_mut(&machine.threads);
for (offset, range) in self.alloc_ranges.get_mut().iter_mut(range.start, range.size) {
if let Err(DataRace) =
range.write_race_detect(&mut clocks, index, write_type, current_span)
{
drop(clocks);
let (index, mut thread_clocks) = global.current_thread_state_mut(&machine.threads);
for (offset, mem_clocks) in
self.alloc_ranges.get_mut().iter_mut(range.start, range.size)
{
if let Err(DataRace) = mem_clocks.write_race_detect(
&mut thread_clocks,
index,
write_type,
current_span,
) {
drop(thread_clocks);
// Report data-race
return Self::report_data_race(
global,
&machine.threads,
range,
mem_clocks,
write_type.get_descriptor(),
false,
Pointer::new(alloc_id, offset),
Expand Down Expand Up @@ -1125,16 +1149,17 @@ trait EvalContextPrivExt<'mir, 'tcx: 'mir>: MiriInterpCxExt<'mir, 'tcx> {
data_race.maybe_perform_sync_operation(
&this.machine.threads,
current_span,
|index, mut clocks| {
for (offset, range) in
|index, mut thread_clocks| {
for (offset, mem_clocks) in
alloc_meta.alloc_ranges.borrow_mut().iter_mut(base_offset, size)
{
if let Err(DataRace) = op(range, &mut clocks, index, atomic) {
mem::drop(clocks);
if let Err(DataRace) = op(mem_clocks, &mut thread_clocks, index, atomic)
{
mem::drop(thread_clocks);
return VClockAlloc::report_data_race(
data_race,
&this.machine.threads,
range,
mem_clocks,
description,
true,
Pointer::new(alloc_id, offset),
Expand All @@ -1150,13 +1175,14 @@ trait EvalContextPrivExt<'mir, 'tcx: 'mir>: MiriInterpCxExt<'mir, 'tcx> {

// Log changes to atomic memory.
if log::log_enabled!(log::Level::Trace) {
for (_offset, range) in alloc_meta.alloc_ranges.borrow().iter(base_offset, size)
for (_offset, mem_clocks) in
alloc_meta.alloc_ranges.borrow().iter(base_offset, size)
{
log::trace!(
"Updated atomic memory({:?}, size={}) to {:#?}",
place.ptr,
size.bytes(),
range.atomic_ops
mem_clocks.atomic_ops
);
}
}
Expand Down
6 changes: 2 additions & 4 deletions tests/pass/concurrency/windows_join_multiple.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,8 @@ fn main() {
})
.into_raw_handle() as usize;

let waiter = move || {
unsafe {
assert_eq!(WaitForSingleObject(blocker, INFINITE), 0);
}
let waiter = move || unsafe {
assert_eq!(WaitForSingleObject(blocker, INFINITE), 0);
};

let waiter1 = thread::spawn(waiter);
Expand Down