Skip to content

[Observation] Initial implementation of Observed for transactional tracked values over time #79817

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Jun 10, 2025

Conversation

phausler
Copy link
Contributor

@phausler phausler commented Mar 6, 2025

This is an implementation for the feature swiftlang/swift-evolution#2726

@phausler phausler requested a review from a team as a code owner March 20, 2025 22:08
@phausler
Copy link
Contributor Author

@swift-ci please test

@phausler
Copy link
Contributor Author

@swift-ci please smoke test

// there are two versions;
// either the tracking has never yet started at all and we need to prime the pump
// or the tracking has already started and we are going to await a change
if State.startTracking(state) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe not of interest for an initial implementation, but it occurred to me that we could perhaps reduce the number of lock acquisitions by merging the tracking logic and id generation. we could maybe even even get away with dropping the separate storage field for the tracking flag too if we reserved a sentinel id value to mean 'not yet tracking'.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That might be possible but it would have to be refactored very carefully because it does need a unique id before the suspension due to cancellation. The other option would be to use a custom token that passes in a pointer to the current stack location... which I would sincerely hope is unique, and since it does not overstep the asynchronous function we don't have any sort of risk of it being used beyond the frame, plus there is no indirection, just identity.

That optimization is future work but perhaps something worth looking into. However the importance of which is perhaps not huge since the lock acquire should be rather un-contenteded and when it is has a VERY short execution time.

Copy link
Contributor

@jamieQ jamieQ May 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no longer relevant with the latest changes.

edit: oops, thought i could resolve this, but maybe not


// install a willChange continuation into the set of continuations
// this must take a locally unique id (to the active calls of next)
static func willChange(_ state: _ManagedCriticalState<State>, id: Int) async {
Copy link
Contributor

@jamieQ jamieQ Mar 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think this also needs to get an appropriate isolated parameter, probably analogous to trackEmission(). otherwise it will presumably suspend when called from next() and that can (will?) break the iteration logic.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not believe so, that not having the isolation is actually behaving exactly as intended - else it wouldn't enqueue at the right edge of the scheduling (using the isolation here would practically speaking potentially skip an actual change).

Copy link
Contributor

@jamieQ jamieQ Mar 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since this is awaited from next(), which executes on the iterator's isolation, then under the current language semantics, since it is nonisolated and async, won't it generally suspend so that this runs off the calling actor (if present)? why would we want this code to run on the global executor? if the observation tracking fires before we form and register the continuation, don't we just end up in a broken state?


edit: upon reflection, i see my earlier comment was perhaps ambiguous & confusing regarding the suggestion to change things to be like trackEmission(), since there are 2 methods with that name. i meant that this function should probably have an isolated parameter to ensure it runs on the iterator's isolation (not the observed source isolation).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the current isolations and calls should be a guarantee for that; but I could see that adding that might future proof it so that if the careful balance isn't maintained in the future then it would retain the same correct behavior; that is a decent refinement

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the current isolations and calls should be a guarantee for that

i tested the prototype out, and it appears they are not. if you update the prototype's implementation of next() to something like this:

    public mutating func next(isolation iterationIsolation: isolated (any Actor)? = #isolation) async throws(Failure) -> Element? {
      guard let state else { return nil }
      let id = State.generation(state)
      do {
        if State.startTracking(state) {
          return try await trackEmission(isolation: iterationIsolation, state: state, id: id)
        } else {
          // alias isolation to demonstrate the issue with actor hopping
          let isolationAlias = iterationIsolation
          await withTaskCancellationHandler {
            // N.B. this closure is currently nonisolated since it does not capture the isolated parameter
            // isolationAlias?.assertIsolated("not isolated in next()")

            // and even if we capture isolation in this closure, the call to `State.willChange()` will hop to the global executor
            // iterationIsolation?.assertIsolated("now we're isolated")
            await State.willChange(state, id: id)
          } onCancel: {
            State.cancel(state, id: id)
          }
          return try await trackEmission(isolation: iterationIsolation, state: state, id: id)
        }
      } catch {
        return try terminate(throwing: error, id: id)
      }
    }
  }

then, assuming you're iterating on an actor, if you comment out the first isolation assertion (via the alias), it will crash. if you comment out the second one, it will pass (as the isolated parameter is then directly captured via the closure), but the call to State.willChange() will then cross an isolation boundary (you can confirm by iterating from the main actor and adding an isolation assertion into that method).

the output of this example in godbolt (read & un-comment some of the commented-out parts) can be used to see the issue. the example also highlights the more general concern with how changes to tracked properties occurring between a read of the Observed closure's value and the installation of the next 'will change' continuation can cause the sequence to effectively break.

Copy link
Contributor Author

@phausler phausler Apr 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To me that means that withTaskCancellationHandler needs to correctly forward it's isolation - that is highly unexpected if that is the case: @hborla do we need to fix withTaskCancellationHandler to address that?

slight correction:
I think it should be invoking this one:

public func withTaskCancellationHandler<T>(operation: () async throws -> T, onCancel handler: @Sendable () -> Void, isolation: isolated (any Actor)? = #isolation) async rethrows -> T

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a fix for this locally... but it is kinda nasty and I can ship the fix if absolutely necessary but it seems to be a failure more generally than this

func next(isolation: isolated (any Actor)? = #isolation) async {
    MainActor.assertIsolated() // passes
    await withTaskCancellationHandler {
        MainActor.assertIsolated() // fails!
    } onCancel: { }
}

@MainActor
func callOnMain() async {
  await next()
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To me that means that withTaskCancellationHandler needs to correctly forward it's isolation - that is highly unexpected if that is the case

i agree that it is surprising – this behavior was discussed somewhat recently in this forum post. i believe even if you explicitly pass the isolation as a parameter, unless the closure also captures it (directly, not via an alias), then it will (under the current language semantics) run on the concurrent executor since it will be nonisolated and async. this change enables the assertion to pass:

func next(isolation: isolated (any Actor)? = #isolation) async {
    MainActor.assertIsolated() // passes
    await withTaskCancellationHandler {
        _ = isolation
        MainActor.assertIsolated() // passes
    } onCancel: { }
}

@phausler
Copy link
Contributor Author

@swift-ci please smoke test

1 similar comment
@phausler
Copy link
Contributor Author

@swift-ci please smoke test

@phausler
Copy link
Contributor Author

@swift-ci please smoke test macOS

@phausler
Copy link
Contributor Author

@swift-ci please smoke test

…off an `untilFinished` version that allows developers to specify when the iteration should terminate (including Optional elements)
@phausler
Copy link
Contributor Author

@swift-ci please smoke test

@phausler
Copy link
Contributor Author

@swift-ci please build toolchain

@phausler
Copy link
Contributor Author

@swift-ci please smoke test

@phausler
Copy link
Contributor Author

@swift-ci please smoke test

@@ -31,13 +31,18 @@ _$s11Observation0A9RegistrarVSHAAMc
_$s11Observation0A9RegistrarVSQAAMc
_$s11Observation0A9RegistrarVSeAAMc
_$s11Observation10ObservableMp
_$s11Observation8ObservedV13untilFinishedyACyxq_GAC9IterationOyxq__GyYbq_YKYAcFZ
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please change test/abi/macOS/arm64/observation.swift instead of editing the baselines. (you also don't need to edit the assert test unless there's an asserts only API)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the other file is not discoverable btw... the error should indicate exactly what needs to happen instead - and also it should ideally not require mangled names too >.>

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Azoy is the altered version correct now?

@phausler
Copy link
Contributor Author

@swift-ci please smoke test

@phausler
Copy link
Contributor Author

@swift-ci please smoke test

// and then return the freshly minted continuation to
// be immediately resumed
if case .cancelled = state.continuations[id] {
state.continuations[id] = nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

was this eviction logic intentionally removed?

)

func withIsolatedTaskCancellationHandler<T: Sendable>(
operation: @isolated(any) () async throws -> T,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the purpose of the reimplementation here? to carry the @isolated(any) annotation? what observable behavior is altered by this? does it prevent dynamic suspension when awaiting the operation?

///
/// - Parameters:
/// - isolation: The concurrency isolation domain of the caller.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this doesn't appear to be a parameter

/// parts of `Observed` do not add additional concurrency protection for these cases; so types must
/// be safe to maintain the safe construction and usage of `Observed` when called in an explicitly
/// `nonisolated` isolation domain.
/// The emit closure is responsible for extracting a value out of a single or many `@Observable` types.
///
/// - Parameters:
/// - isolation: The concurrency isolation domain of the caller.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: no longer appears to be a parameter

static func willChange(isolation iterationIsolation: isolated (any Actor)? = #isolation, state: _ManagedCriticalState<State>, id: Int) async {
return await withUnsafeContinuation(isolation: iterationIsolation) { continuation in
state.withCriticalRegion { state in
defer { state.dirty = false }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

after thinking about this a bit more, it seems to me resetting the flag in this way may admit the same problem as before, albeit in a more convoluted manner:

two iterators in separate Tasks, A & B both started & suspended awaiting a change

onChange event occurs
  - dirty = true
  - resume A & B continuations

A resumes
B resumes

A invokes closure
A onChange observer installed
A returns value from next()

B invokes closure
B onChange observer installed
B returns value from next()

onChange event occurs
  - dirty = true
  - no continuations to resume

!! N.B. we now have no observation tracking installed

A begins willChange – just before critical region
A Task cancelled
A records canceled tombstone
A enters willChange critical region
A exits with canceled path
  - dirty = false

B enters willChange
B sees dirty = false
B installs continuation

at this point B is awaiting a willChange, but there is
no longer observation tracking installed, so it will never
be resumed (unless a new iterator is created at some point
and a change to the observed values occurs after that).

hopefully there are no logical flaws there... i haven't tried to actually implement an example that exercises this scenario

@phausler
Copy link
Contributor Author

@swift-ci please smoke test

}
} onChange: { [state] in
// resume all cases where the awaiting continuations are awaiting a willSet
State.emitWillChange(state)
Copy link

@ronnienessa ronnienessa May 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It appears this implementation can leak the state object if no further changes are emitted after every observer has stopped iterating the Observability async sequence. A similar concern came up during the SE-0395 review.

For long-lived observers (e.g. watching a model for the entire process lifetime), that leak may be acceptable. In workloads where tasks start and cancel Observability iterations frequently, though, the leak could accumulate and become problematic.

Have you explored adding a bit more bookkeeping—perhaps reference counting or a shared cancellation token—so that state is released as soon as the final iterator goes away? I’d be interested to hear whether that trade-off was considered or if there’s a better approach.

@phausler
Copy link
Contributor Author

phausler commented Jun 2, 2025

@swift-ci please smoke test linux

@phausler phausler merged commit d9c73d3 into swiftlang:main Jun 10, 2025
3 checks passed
phausler added a commit to phausler/swift that referenced this pull request Jun 11, 2025
stephentyrone added a commit that referenced this pull request Jun 12, 2025
…acked values over time (#79817) (#82197)

- **Explanation**:
This is an implementation for the feature
swiftlang/swift-evolution#2726
- **Scope**:
This is targeted specifically to the Observation module, no language or
runtime changes
- **Issues**:

- **Original PRs**:
#79817
- **Risk**:
Low
- **Testing**:
This was tested in external forms (which need to be merged in)
- **Reviewers**:

---------

Co-authored-by: Stephen Canon <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants