Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 76 additions & 23 deletions src/elasticotel/distro/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import annotations

import logging
from dataclasses import dataclass

from opentelemetry import trace

Expand All @@ -27,7 +30,7 @@

logger = logging.getLogger(__name__)

_LOG_LEVELS_MAP = {
_LOG_LEVELS_MAP: dict[str, int] = {
"trace": 5,
"debug": logging.DEBUG,
"info": logging.INFO,
Expand All @@ -38,16 +41,40 @@
}

DEFAULT_SAMPLING_RATE = 1.0
DEFAULT_LOGGING_LEVEL = "info"

LOGGING_LEVEL_CONFIG_KEY = "logging_level"
SAMPLING_RATE_CONFIG_KEY = "sampling_rate"


@dataclass
class ConfigItem:
value: str


@dataclass
class ConfigUpdate:
error_message: str = ""


# TODO: this should grow into a proper configuration store initialized from env vars and so on
@dataclass
class Config:
sampling_rate = ConfigItem(value=str(DEFAULT_SAMPLING_RATE))
logging_level = ConfigItem(value=DEFAULT_LOGGING_LEVEL)

def to_dict(self):
return {LOGGING_LEVEL_CONFIG_KEY: self.logging_level.value, SAMPLING_RATE_CONFIG_KEY: self.sampling_rate.value}


_config = Config()

def _handle_logging_level(config) -> str:
error_message = ""

def _handle_logging_level(remote_config) -> ConfigUpdate:
_config = _get_config()
# when config option has default value you don't get it so need to handle the default
config_logging_level = config.get("logging_level")
if config_logging_level is not None:
logging_level = _LOG_LEVELS_MAP.get(config_logging_level) # type: ignore[reportArgumentType]
else:
logging_level = logging.INFO
config_logging_level = remote_config.get(LOGGING_LEVEL_CONFIG_KEY, DEFAULT_LOGGING_LEVEL)
logging_level = _LOG_LEVELS_MAP.get(config_logging_level)

if logging_level is None:
logger.error("Logging level not handled: %s", config_logging_level)
Expand All @@ -56,11 +83,14 @@ def _handle_logging_level(config) -> str:
# update upstream and distro logging levels
logging.getLogger("opentelemetry").setLevel(logging_level)
logging.getLogger("elasticotel").setLevel(logging_level)
return error_message
_config.logging_level = ConfigItem(value=config_logging_level)
error_message = ""
return ConfigUpdate(error_message=error_message)


def _handle_sampling_rate(config) -> str:
config_sampling_rate = config.get("sampling_rate")
def _handle_sampling_rate(remote_config) -> ConfigUpdate:
_config = _get_config()
config_sampling_rate = remote_config.get(SAMPLING_RATE_CONFIG_KEY, str(DEFAULT_SAMPLING_RATE))
sampling_rate = DEFAULT_SAMPLING_RATE
if config_sampling_rate is not None:
try:
Expand All @@ -69,17 +99,17 @@ def _handle_sampling_rate(config) -> str:
raise ValueError()
except ValueError:
logger.error("Invalid `sampling_rate` from config `%s`", config_sampling_rate)
return f"Invalid sampling_rate {config_sampling_rate}"
return ConfigUpdate(error_message=f"Invalid sampling_rate {config_sampling_rate}")

sampler = getattr(trace.get_tracer_provider(), "sampler", None)
if sampler is None:
logger.debug("Cannot get sampler from tracer provider.")
return ""
return ConfigUpdate()

# FIXME: this needs to be updated for the consistent probability samplers
if not isinstance(sampler, ParentBasedTraceIdRatio):
logger.warning("Sampler %s is not supported, not applying sampling_rate.", type(sampler))
return ""
return ConfigUpdate()

# since sampler is parent based we need to update its root sampler
root_sampler = sampler._root # type: ignore[reportAttributeAccessIssue]
Expand All @@ -88,32 +118,55 @@ def _handle_sampling_rate(config) -> str:
root_sampler._rate = sampling_rate # type: ignore[reportAttributeAccessIssue]
root_sampler._bound = root_sampler.get_bound_for_rate(root_sampler._rate) # type: ignore[reportAttributeAccessIssue]
logger.debug("Updated sampler rate to %s", sampling_rate)
return ""
_config.sampling_rate = ConfigItem(value=config_sampling_rate)
return ConfigUpdate()


def _report_full_state(message: opamp_pb2.ServerToAgent):
return message.flags & opamp_pb2.ServerToAgentFlags_ReportFullState


def _get_config():
global _config
return _config


def opamp_handler(agent: OpAMPAgent, client: OpAMPClient, message: opamp_pb2.ServerToAgent):
# server wants us to report full state as it cannot recognize us as agent because
# e.g it may have been restarted and lost state.
if _report_full_state(message):
# here we're not returning explicitly but usually we don't get a remote config when we get the flag set
payload = client._build_full_state_message()
agent.send(payload=payload)

# we check config_hash because we need to track last received config and remote_config seems to be always truthy
if not message.remote_config or not message.remote_config.config_hash:
return

_config = _get_config()
error_messages = []
for config_filename, config in messages._decode_remote_config(message.remote_config):
for config_filename, remote_config in messages._decode_remote_config(message.remote_config):
# we don't have standardized config values so limit to configs coming from our backend
if config_filename == "elastic":
logger.debug("Config %s: %s", config_filename, config)
error_message = _handle_logging_level(config)
if error_message:
error_messages.append(error_message)
logger.debug("Config %s: %s", config_filename, remote_config)
config_update = _handle_logging_level(remote_config)
if config_update.error_message:
error_messages.append(config_update.error_message)

error_message = _handle_sampling_rate(config)
if error_message:
error_messages.append(error_message)
config_update = _handle_sampling_rate(remote_config)
if config_update.error_message:
error_messages.append(config_update.error_message)

error_message = "\n".join(error_messages)
status = opamp_pb2.RemoteConfigStatuses_FAILED if error_message else opamp_pb2.RemoteConfigStatuses_APPLIED
updated_remote_config = client._update_remote_config_status(
remote_config_hash=message.remote_config.config_hash, status=status, error_message=error_message
)

# update the cached effective config with what we updated
effective_config = {"elastic": _config.to_dict()}
client._update_effective_config(effective_config)

# if we changed the config send an ack to the server so we don't receive the same config at every heartbeat response
if updated_remote_config is not None:
payload = client._build_remote_config_status_response_message(updated_remote_config)
Expand Down
18 changes: 18 additions & 0 deletions src/opentelemetry/_opamp/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
| opamp_pb2.AgentCapabilities.AgentCapabilities_ReportsHeartbeat
| opamp_pb2.AgentCapabilities.AgentCapabilities_AcceptsRemoteConfig
| opamp_pb2.AgentCapabilities.AgentCapabilities_ReportsRemoteConfig
| opamp_pb2.AgentCapabilities.AgentCapabilities_ReportsEffectiveConfig
)


Expand Down Expand Up @@ -74,6 +75,7 @@ def __init__(
self._sequence_num: int = 0
self._instance_uid: bytes = uuid7().bytes
self._remote_config_status: opamp_pb2.RemoteConfigStatus | None = None
self._effective_config: opamp_pb2.EffectiveConfig | None = None

def _build_connection_message(self) -> bytes:
message = messages._build_presentation_message(
Expand Down Expand Up @@ -101,6 +103,10 @@ def _build_heartbeat_message(self) -> bytes:
data = messages._encode_message(message)
return data

def _update_effective_config(self, effective_config: dict[str, dict[str, str]]) -> opamp_pb2.EffectiveConfig:
self._effective_config = messages._build_effective_config_message(effective_config)
return self._effective_config

def _update_remote_config_status(
self, remote_config_hash: bytes, status: opamp_pb2.RemoteConfigStatuses.ValueType, error_message: str = ""
) -> opamp_pb2.RemoteConfigStatus | None:
Expand Down Expand Up @@ -132,6 +138,18 @@ def _build_remote_config_status_response_message(self, remote_config_status: opa
data = messages._encode_message(message)
return data

def _build_full_state_message(self) -> bytes:
message = messages._build_full_state_message(
instance_uid=self._instance_uid,
agent_description=self._agent_description,
remote_config_status=self._remote_config_status,
sequence_num=self._sequence_num,
effective_config=self._effective_config,
capabilities=_HANDLED_CAPABILITIES,
)
data = messages._encode_message(message)
return data

def _send(self, data: bytes):
token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True))
try:
Expand Down
30 changes: 30 additions & 0 deletions src/opentelemetry/_opamp/messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,36 @@ def _build_remote_config_status_response_message(
return command


def _build_effective_config_message(config: dict[str, dict[str, str]]):
agent_config_map = opamp_pb2.AgentConfigMap()
for filename, value in config.items():
body = json.dumps(value)
agent_config_map.config_map[filename].body = body.encode("utf-8")
agent_config_map.config_map[filename].content_type = "application/json"
return opamp_pb2.EffectiveConfig(
config_map=agent_config_map,
)


def _build_full_state_message(
instance_uid: bytes,
sequence_num: int,
agent_description: opamp_pb2.AgentDescription,
capabilities: int,
remote_config_status: opamp_pb2.RemoteConfigStatus | None,
effective_config: opamp_pb2.EffectiveConfig | None,
) -> opamp_pb2.AgentToServer:
command = opamp_pb2.AgentToServer(
instance_uid=instance_uid,
sequence_num=sequence_num,
agent_description=agent_description,
remote_config_status=remote_config_status,
effective_config=effective_config,
capabilities=capabilities,
)
return command


def _encode_message(data: opamp_pb2.AgentToServer) -> bytes:
return data.SerializeToString()

Expand Down
Loading