Skip to content

Conversation

ashgti
Copy link
Contributor

@ashgti ashgti commented Aug 21, 2025

This reverts commit 0f33b90 and includes a fix for the added test that was submitted between my last update and pull.

@ashgti ashgti requested a review from JDevlieghere as a code owner August 21, 2025 19:59
@ashgti ashgti requested a review from da-viper August 21, 2025 19:59
@llvmbot llvmbot added the lldb label Aug 21, 2025
@llvmbot
Copy link
Member

llvmbot commented Aug 21, 2025

@llvm/pr-subscribers-lldb

Author: John Harrison (ashgti)

Changes

This reverts commit 0f33b90 and includes a fix for the added test that was submitted between my last update and pull.


Patch is 82.96 KiB, truncated to 20.00 KiB below, full version: https://github.com/llvm/llvm-project/pull/154832.diff

15 Files Affected:

  • (modified) lldb/packages/Python/lldbsuite/test/tools/lldb-dap/dap_server.py (+459-347)
  • (modified) lldb/packages/Python/lldbsuite/test/tools/lldb-dap/lldbdap_testcase.py (+51-36)
  • (modified) lldb/test/API/tools/lldb-dap/attach/TestDAP_attach.py (+4-4)
  • (modified) lldb/test/API/tools/lldb-dap/breakpoint-assembly/TestDAP_breakpointAssembly.py (+4-5)
  • (modified) lldb/test/API/tools/lldb-dap/breakpoint-events/TestDAP_breakpointEvents.py (+2-2)
  • (modified) lldb/test/API/tools/lldb-dap/breakpoint/TestDAP_setBreakpoints.py (+22-11)
  • (modified) lldb/test/API/tools/lldb-dap/cancel/TestDAP_cancel.py (+8-13)
  • (modified) lldb/test/API/tools/lldb-dap/commands/TestDAP_commands.py (+6-9)
  • (modified) lldb/test/API/tools/lldb-dap/console/TestDAP_console.py (+3-9)
  • (modified) lldb/test/API/tools/lldb-dap/instruction-breakpoint/TestDAP_instruction_breakpoint.py (+1-1)
  • (modified) lldb/test/API/tools/lldb-dap/launch/TestDAP_launch.py (+8-17)
  • (modified) lldb/test/API/tools/lldb-dap/module-event/TestDAP_module_event.py (+5-5)
  • (modified) lldb/test/API/tools/lldb-dap/module/TestDAP_module.py (+4-6)
  • (modified) lldb/test/API/tools/lldb-dap/output/TestDAP_output.py (+3-3)
  • (modified) lldb/test/API/tools/lldb-dap/progress/TestDAP_Progress.py (+1-1)
diff --git a/lldb/packages/Python/lldbsuite/test/tools/lldb-dap/dap_server.py b/lldb/packages/Python/lldbsuite/test/tools/lldb-dap/dap_server.py
index 7acb9c89b8b7d..0608ac3fd83be 100644
--- a/lldb/packages/Python/lldbsuite/test/tools/lldb-dap/dap_server.py
+++ b/lldb/packages/Python/lldbsuite/test/tools/lldb-dap/dap_server.py
@@ -12,15 +12,91 @@
 import sys
 import threading
 import time
-from typing import Any, Optional, Union, BinaryIO, TextIO
+from typing import (
+    Any,
+    Optional,
+    Dict,
+    cast,
+    List,
+    Callable,
+    IO,
+    Union,
+    BinaryIO,
+    TextIO,
+    TypedDict,
+    Literal,
+)
 
 ## DAP type references
-Event = dict[str, Any]
-Request = dict[str, Any]
-Response = dict[str, Any]
+
+
+class Event(TypedDict):
+    type: Literal["event"]
+    seq: int
+    event: str
+    body: Any
+
+
+class Request(TypedDict, total=False):
+    type: Literal["request"]
+    seq: int
+    command: str
+    arguments: Any
+
+
+class Response(TypedDict):
+    type: Literal["response"]
+    seq: int
+    request_seq: int
+    success: bool
+    command: str
+    message: Optional[str]
+    body: Any
+
+
 ProtocolMessage = Union[Event, Request, Response]
 
 
+class Source(TypedDict, total=False):
+    name: str
+    path: str
+    sourceReference: int
+
+    @staticmethod
+    def build(
+        *,
+        name: Optional[str] = None,
+        path: Optional[str] = None,
+        source_reference: Optional[int] = None,
+    ) -> "Source":
+        """Builds a source from the given name, path or source_reference."""
+        if not name and not path and not source_reference:
+            raise ValueError(
+                "Source.build requires either name, path, or source_reference"
+            )
+
+        s = Source()
+        if name:
+            s["name"] = name
+        if path:
+            if not name:
+                s["name"] = os.path.basename(path)
+            s["path"] = path
+        if source_reference is not None:
+            s["sourceReference"] = source_reference
+        return s
+
+
+class Breakpoint(TypedDict, total=False):
+    id: int
+    verified: bool
+    source: Source
+
+    @staticmethod
+    def is_verified(src: "Breakpoint") -> bool:
+        return src.get("verified", False)
+
+
 def dump_memory(base_addr, data, num_per_line, outfile):
     data_len = len(data)
     hex_string = binascii.hexlify(data)
@@ -58,7 +134,9 @@ def dump_memory(base_addr, data, num_per_line, outfile):
         outfile.write("\n")
 
 
-def read_packet(f, verbose=False, trace_file=None):
+def read_packet(
+    f: IO[bytes], trace_file: Optional[IO[str]] = None
+) -> Optional[ProtocolMessage]:
     """Decode a JSON packet that starts with the content length and is
     followed by the JSON bytes from a file 'f'. Returns None on EOF.
     """
@@ -70,19 +148,13 @@ def read_packet(f, verbose=False, trace_file=None):
     prefix = "Content-Length: "
     if line.startswith(prefix):
         # Decode length of JSON bytes
-        if verbose:
-            print('content: "%s"' % (line))
         length = int(line[len(prefix) :])
-        if verbose:
-            print('length: "%u"' % (length))
         # Skip empty line
-        line = f.readline()
-        if verbose:
-            print('empty: "%s"' % (line))
+        separator = f.readline().decode()
+        if separator != "":
+            Exception("malformed DAP content header, unexpected line: " + separator)
         # Read JSON bytes
-        json_str = f.read(length)
-        if verbose:
-            print('json: "%s"' % (json_str))
+        json_str = f.read(length).decode()
         if trace_file:
             trace_file.write("from adapter:\n%s\n" % (json_str))
         # Decode the JSON bytes into a python dictionary
@@ -95,7 +167,7 @@ def packet_type_is(packet, packet_type):
     return "type" in packet and packet["type"] == packet_type
 
 
-def dump_dap_log(log_file):
+def dump_dap_log(log_file: Optional[str]) -> None:
     print("========= DEBUG ADAPTER PROTOCOL LOGS =========", file=sys.stderr)
     if log_file is None:
         print("no log file available", file=sys.stderr)
@@ -105,58 +177,6 @@ def dump_dap_log(log_file):
     print("========= END =========", file=sys.stderr)
 
 
-class Source(object):
-    def __init__(
-        self,
-        path: Optional[str] = None,
-        source_reference: Optional[int] = None,
-        raw_dict: Optional[dict[str, Any]] = None,
-    ):
-        self._name = None
-        self._path = None
-        self._source_reference = None
-        self._raw_dict = None
-
-        if path is not None:
-            self._name = os.path.basename(path)
-            self._path = path
-        elif source_reference is not None:
-            self._source_reference = source_reference
-        elif raw_dict is not None:
-            self._raw_dict = raw_dict
-        else:
-            raise ValueError("Either path or source_reference must be provided")
-
-    def __str__(self):
-        return f"Source(name={self.name}, path={self.path}), source_reference={self.source_reference})"
-
-    def as_dict(self):
-        if self._raw_dict is not None:
-            return self._raw_dict
-
-        source_dict = {}
-        if self._name is not None:
-            source_dict["name"] = self._name
-        if self._path is not None:
-            source_dict["path"] = self._path
-        if self._source_reference is not None:
-            source_dict["sourceReference"] = self._source_reference
-        return source_dict
-
-
-class Breakpoint(object):
-    def __init__(self, obj):
-        self._breakpoint = obj
-
-    def is_verified(self):
-        """Check if the breakpoint is verified."""
-        return self._breakpoint.get("verified", False)
-
-    def source(self):
-        """Get the source of the breakpoint."""
-        return self._breakpoint.get("source", {})
-
-
 class NotSupportedError(KeyError):
     """Raised if a feature is not supported due to its capabilities."""
 
@@ -174,26 +194,42 @@ def __init__(
         self.log_file = log_file
         self.send = send
         self.recv = recv
-        self.recv_packets: list[Optional[ProtocolMessage]] = []
-        self.recv_condition = threading.Condition()
-        self.recv_thread = threading.Thread(target=self._read_packet_thread)
-        self.process_event_body = None
-        self.exit_status: Optional[int] = None
-        self.capabilities: dict[str, Any] = {}
-        self.progress_events: list[Event] = []
-        self.reverse_requests = []
-        self.sequence = 1
-        self.threads = None
-        self.thread_stop_reasons = {}
-        self.recv_thread.start()
-        self.output_condition = threading.Condition()
-        self.output: dict[str, list[str]] = {}
-        self.configuration_done_sent = False
-        self.initialized = False
-        self.frame_scopes = {}
+
+        # Packets that have been received and processed but have not yet been
+        # requested by a test case.
+        self._pending_packets: List[Optional[ProtocolMessage]] = []
+        # Received packets that have not yet been processed.
+        self._recv_packets: List[Optional[ProtocolMessage]] = []
+        # Used as a mutex for _recv_packets and for notify when _recv_packets
+        # changes.
+        self._recv_condition = threading.Condition()
+        self._recv_thread = threading.Thread(target=self._read_packet_thread)
+
+        # session state
         self.init_commands = init_commands
+        self.exit_status: Optional[int] = None
+        self.capabilities: Dict = {}
+        self.initialized: bool = False
+        self.configuration_done_sent: bool = False
+        self.process_event_body: Optional[Dict] = None
+        self.terminated: bool = False
+        self.events: List[Event] = []
+        self.progress_events: List[Event] = []
+        self.reverse_requests: List[Request] = []
+        self.module_events: List[Dict] = []
+        self.sequence: int = 1
+        self.output: Dict[str, str] = {}
+
+        # debuggee state
+        self.threads: Optional[dict] = None
+        self.thread_stop_reasons: Dict[str, Any] = {}
+        self.frame_scopes: Dict[str, Any] = {}
+        # keyed by breakpoint id
         self.resolved_breakpoints: dict[str, Breakpoint] = {}
 
+        # trigger enqueue thread
+        self._recv_thread.start()
+
     @classmethod
     def encode_content(cls, s: str) -> bytes:
         return ("Content-Length: %u\r\n\r\n%s" % (len(s), s)).encode("utf-8")
@@ -210,267 +246,324 @@ def validate_response(cls, command, response):
             )
 
     def _read_packet_thread(self):
-        done = False
         try:
-            while not done:
+            while True:
                 packet = read_packet(self.recv, trace_file=self.trace_file)
                 # `packet` will be `None` on EOF. We want to pass it down to
                 # handle_recv_packet anyway so the main thread can handle unexpected
                 # termination of lldb-dap and stop waiting for new packets.
-                done = not self._handle_recv_packet(packet)
+                if not self._handle_recv_packet(packet):
+                    break
         finally:
             dump_dap_log(self.log_file)
 
-    def get_modules(self, startModule: int = 0, moduleCount: int = 0):
-        module_list = self.request_modules(startModule, moduleCount)["body"]["modules"]
+    def get_modules(
+        self, start_module: Optional[int] = None, module_count: Optional[int] = None
+    ) -> Dict:
+        resp = self.request_modules(start_module, module_count)
+        if not resp["success"]:
+            raise ValueError(f"request_modules failed: {resp!r}")
         modules = {}
+        module_list = resp["body"]["modules"]
         for module in module_list:
             modules[module["name"]] = module
         return modules
 
-    def get_output(self, category, timeout=0.0, clear=True):
-        self.output_condition.acquire()
-        output = None
+    def get_output(self, category: str, clear=True) -> str:
+        output = ""
         if category in self.output:
-            output = self.output[category]
+            output = self.output.get(category, "")
             if clear:
                 del self.output[category]
-        elif timeout != 0.0:
-            self.output_condition.wait(timeout)
-            if category in self.output:
-                output = self.output[category]
-                if clear:
-                    del self.output[category]
-        self.output_condition.release()
         return output
 
-    def collect_output(self, category, timeout_secs, pattern, clear=True):
-        end_time = time.time() + timeout_secs
-        collected_output = ""
-        while end_time > time.time():
-            output = self.get_output(category, timeout=0.25, clear=clear)
-            if output:
-                collected_output += output
-                if pattern is not None and pattern in output:
-                    break
-        return collected_output if collected_output else None
+    def collect_output(
+        self,
+        category: str,
+        timeout: float,
+        pattern: Optional[str] = None,
+        clear=True,
+    ) -> str:
+        """Collect output from 'output' events.
+        Args:
+            category: The category to collect.
+            timeout: The max duration for collecting output.
+            pattern:
+                Optional, if set, return once this pattern is detected in the
+                collected output.
+        Returns:
+            The collected output.
+        """
+        deadline = time.monotonic() + timeout
+        output = self.get_output(category, clear)
+        while deadline >= time.monotonic() and (
+            pattern is None or pattern not in output
+        ):
+            event = self.wait_for_event(["output"], timeout=deadline - time.monotonic())
+            if not event:  # Timeout or EOF
+                break
+            output += self.get_output(category, clear=clear)
+        return output
 
     def _enqueue_recv_packet(self, packet: Optional[ProtocolMessage]):
-        self.recv_condition.acquire()
-        self.recv_packets.append(packet)
-        self.recv_condition.notify()
-        self.recv_condition.release()
+        with self.recv_condition:
+            self.recv_packets.append(packet)
+            self.recv_condition.notify()
 
     def _handle_recv_packet(self, packet: Optional[ProtocolMessage]) -> bool:
-        """Called by the read thread that is waiting for all incoming packets
-        to store the incoming packet in "self.recv_packets" in a thread safe
-        way. This function will then signal the "self.recv_condition" to
-        indicate a new packet is available. Returns True if the caller
-        should keep calling this function for more packets.
+        """Handles an incoming packet.
+
+        Called by the read thread that is waiting for all incoming packets
+        to store the incoming packet in "self._recv_packets" in a thread safe
+        way. This function will then signal the "self._recv_condition" to
+        indicate a new packet is available.
+
+        Args:
+            packet: A new packet to store.
+
+        Returns:
+            True if the caller should keep calling this function for more
+            packets.
         """
-        # If EOF, notify the read thread by enqueuing a None.
-        if not packet:
-            self._enqueue_recv_packet(None)
-            return False
-
-        # Check the packet to see if is an event packet
-        keepGoing = True
-        packet_type = packet["type"]
-        if packet_type == "event":
-            event = packet["event"]
-            body = None
-            if "body" in packet:
-                body = packet["body"]
-            # Handle the event packet and cache information from these packets
-            # as they come in
-            if event == "output":
-                # Store any output we receive so clients can retrieve it later.
-                category = body["category"]
-                output = body["output"]
-                self.output_condition.acquire()
-                if category in self.output:
-                    self.output[category] += output
-                else:
-                    self.output[category] = output
-                self.output_condition.notify()
-                self.output_condition.release()
-                # no need to add 'output' event packets to our packets list
-                return keepGoing
-            elif event == "initialized":
-                self.initialized = True
-            elif event == "process":
-                # When a new process is attached or launched, remember the
-                # details that are available in the body of the event
-                self.process_event_body = body
-            elif event == "exited":
-                # Process exited, mark the status to indicate the process is not
-                # alive.
-                self.exit_status = body["exitCode"]
-            elif event == "continued":
-                # When the process continues, clear the known threads and
-                # thread_stop_reasons.
-                all_threads_continued = body.get("allThreadsContinued", True)
-                tid = body["threadId"]
-                if tid in self.thread_stop_reasons:
-                    del self.thread_stop_reasons[tid]
-                self._process_continued(all_threads_continued)
-            elif event == "stopped":
-                # Each thread that stops with a reason will send a
-                # 'stopped' event. We need to remember the thread stop
-                # reasons since the 'threads' command doesn't return
-                # that information.
-                self._process_stopped()
-                tid = body["threadId"]
-                self.thread_stop_reasons[tid] = body
-            elif event.startswith("progress"):
-                # Progress events come in as 'progressStart', 'progressUpdate',
-                # and 'progressEnd' events. Keep these around in case test
-                # cases want to verify them.
-                self.progress_events.append(packet)
-            elif event == "breakpoint":
-                # Breakpoint events are sent when a breakpoint is resolved
-                self._update_verified_breakpoints([body["breakpoint"]])
-            elif event == "capabilities":
-                # Update the capabilities with new ones from the event.
-                self.capabilities.update(body["capabilities"])
-
-        elif packet_type == "response":
-            if packet["command"] == "disconnect":
-                keepGoing = False
-        self._enqueue_recv_packet(packet)
-        return keepGoing
+        with self._recv_condition:
+            self._recv_packets.append(packet)
+            self._recv_condition.notify()
+            # packet is None on EOF
+            return packet is not None and not (
+                packet["type"] == "response" and packet["command"] == "disconnect"
+            )
+
+    def _recv_packet(
+        self,
+        *,
+        predicate: Optional[Callable[[ProtocolMessage], bool]] = None,
+        timeout: Optional[float] = None,
+    ) -> Optional[ProtocolMessage]:
+        """Processes received packets from the adapter.
+        Updates the DebugCommunication stateful properties based on the received
+        packets in the order they are received.
+        NOTE: The only time the session state properties should be updated is
+        during this call to ensure consistency during tests.
+        Args:
+            predicate:
+                Optional, if specified, returns the first packet that matches
+                the given predicate.
+            timeout:
+                Optional, if specified, processes packets until either the
+                timeout occurs or the predicate matches a packet, whichever
+                occurs first.
+        Returns:
+            The first matching packet for the given predicate, if specified,
+            otherwise None.
+        """
+        assert (
+            threading.current_thread != self._recv_thread
+        ), "Must not be called from the _recv_thread"
+
+        def process_until_match():
+            self._process_recv_packets()
+            for i, packet in enumerate(self._pending_packets):
+                if packet is None:
+                    # We need to return a truthy value to break out of the
+                    # wait_for, use `EOFError` as an indicator of EOF.
+                    return EOFError()
+                if predicate and predicate(packet):
+                    self._pending_packets.pop(i)
+                    return packet
+
+        with self._recv_condition:
+            packet = self._recv_condition.wait_for(process_until_match, timeout)
+            return None if isinstance(packet, EOFError) else packet
+
+    def _process_recv_packets(self) -> None:
+        """Process received packets, updating the session state."""
+        with self._recv_condition:
+            for packet in self._recv_packets:
+                # Handle events that may modify any stateful properties of
+                # the DAP session.
+                if packet and packet["type"] == "event":
+                    self._handle_event(packet)
+                elif packet and packet["type"] == "request":
+                    # Handle reverse requests and keep processing.
+                    self._handle_reverse_request(packet)
+                # Move the packet to the pending queue.
+                self._pending_packets.append(packet)
+            self._recv_packets.clear()
+
+    def _handle_event(self, packet: Event) -> None:
+        """Handle any events that modify debug session state we track."""
+        event = packet["event"]
+        body: Optional[Dict] = packet.get("body", None)
+
+        if event == "output" and body:
+            # Store any output we receive so clients can retrieve it later.
+            category = body["category"]
+            output = body["output"]
+            if category in self.output:
+                self.output[category] += output
+            else:
+                self.output[category] = output
+        elif event == "initialized":
+            self.initialized = True
+        elif event == "process":
+            # When a new ...
[truncated]

@ashgti ashgti merged commit 36d07ad into llvm:main Aug 21, 2025
11 checks passed
JDevlieghere pushed a commit to swiftlang/llvm-project that referenced this pull request Oct 10, 2025
)" (llvm#154832)

This reverts commit 0f33b90 and
includes a fix for the added test that was submitted between my last
update and pull.

(cherry picked from commit 36d07ad)
JDevlieghere pushed a commit to swiftlang/llvm-project that referenced this pull request Oct 13, 2025
)" (llvm#154832)

This reverts commit 0f33b90 and
includes a fix for the added test that was submitted between my last
update and pull.

(cherry picked from commit 36d07ad)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants