Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
d37979e
Add new route types
estringana Aug 13, 2025
259212e
Fix errors
estringana Aug 13, 2025
5cb6537
Add missing action
estringana Aug 14, 2025
70c4e7e
Type enums
estringana Aug 21, 2025
b4c2d57
Simplify structs
estringana Aug 21, 2025
27e0534
wip
estringana Aug 25, 2025
7da8e32
wip
estringana Aug 25, 2025
becf9e4
Amend endpoints payload
estringana Aug 25, 2025
a359ff9
Fix return
estringana Aug 25, 2025
8a9ff91
Avoid writing all endpoints to shared memory
estringana Aug 26, 2025
1fd27d5
Fix cloning too much for telemetry workers
bwoebi Sep 18, 2025
e3cf9d0
Add more fields to endpoints
estringana Sep 23, 2025
9f68011
Temp changes
estringana Sep 25, 2025
1087177
Add string vectors
estringana Sep 25, 2025
a27db1c
Add i32 vector
estringana Sep 29, 2025
c9b5466
Add authentication vec
estringana Sep 29, 2025
8f78c23
Add serde json
estringana Sep 29, 2025
710e269
Replace char_c for slices
estringana Sep 29, 2025
e3ff460
Make response code a non vector
estringana Sep 29, 2025
f702551
Drop pointers
estringana Sep 30, 2025
96e1445
Amend linting
estringana Sep 30, 2025
4c25224
Wip
estringana Oct 6, 2025
1341fca
Fix add-routes-collection (#1264)
estringana Oct 15, 2025
855e1b6
Rename app endpoint payload
estringana Oct 15, 2025
58722e7
Ensure it returns always a box
estringana Oct 16, 2025
4817698
Remove response code
estringana Oct 17, 2025
de389af
Remove non used fields
estringana Oct 17, 2025
2736637
Revert changes non required
estringana Oct 17, 2025
31ba596
Stop using store
estringana Oct 17, 2025
869a272
Store endpoints on a hashset
estringana Oct 20, 2025
47f7d57
Lint
estringana Oct 20, 2025
436bbdf
License
estringana Oct 20, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion LICENSE-3rdparty.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
root_name: builder, build_common, tools, datadog-alloc, datadog-crashtracker, ddcommon, ddtelemetry, datadog-ddsketch, cc_utils, datadog-crashtracker-ffi, ddcommon-ffi, datadog-ipc, datadog-ipc-macros, tarpc, tarpc-plugins, tinybytes, spawn_worker, datadog-library-config, datadog-library-config-ffi, datadog-live-debugger, datadog-live-debugger-ffi, datadog-profiling, datadog-profiling-protobuf, datadog-profiling-ffi, data-pipeline-ffi, data-pipeline, datadog-trace-protobuf, datadog-trace-stats, datadog-trace-utils, datadog-trace-normalization, dogstatsd-client, datadog-log, datadog-log-ffi, ddsketch-ffi, ddtelemetry-ffi, symbolizer-ffi, datadog-profiling-replayer, datadog-remote-config, datadog-sidecar, datadog-sidecar-macros, datadog-sidecar-ffi, datadog-trace-obfuscation, datadog-tracer-flare, sidecar_mockgen, test_spawn_from_lib
root_name: builder, build_common, tools, datadog-alloc, datadog-crashtracker, ddcommon, ddtelemetry, datadog-ddsketch, ddcommon-ffi, cc_utils, datadog-crashtracker-ffi, datadog-ipc, datadog-ipc-macros, tarpc, tarpc-plugins, tinybytes, spawn_worker, datadog-library-config, datadog-library-config-ffi, datadog-live-debugger, datadog-live-debugger-ffi, datadog-profiling, datadog-profiling-protobuf, datadog-profiling-ffi, data-pipeline-ffi, data-pipeline, datadog-trace-protobuf, datadog-trace-stats, datadog-trace-utils, datadog-trace-normalization, dogstatsd-client, datadog-log, datadog-log-ffi, ddsketch-ffi, ddtelemetry-ffi, symbolizer-ffi, datadog-profiling-replayer, datadog-remote-config, datadog-sidecar, datadog-sidecar-macros, datadog-sidecar-ffi, datadog-trace-obfuscation, datadog-tracer-flare, sidecar_mockgen, test_spawn_from_lib
third_party_libraries:
- package_name: addr2line
package_version: 0.24.2
Expand Down
1 change: 1 addition & 0 deletions datadog-sidecar-ffi/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ paste = "1"
libc = "0.2"
tracing = { version = "0.1", default-features = false }
rmp-serde = "1.1.1"
serde_json = "1.0"


[target.'cfg(windows)'.dependencies]
Expand Down
30 changes: 30 additions & 0 deletions datadog-sidecar-ffi/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,36 @@ pub unsafe extern "C" fn ddog_sidecar_telemetry_enqueueConfig(
MaybeError::None
}

/// Reports an endpoint to the telemetry.
#[no_mangle]
#[allow(clippy::missing_safety_doc)]
pub unsafe extern "C" fn ddog_sidecar_telemetry_addEndpoint(
transport: &mut Box<SidecarTransport>,
instance_id: &InstanceId,
queue_id: &QueueId,
method: ddtelemetry::data::Method,
path: CharSlice,
operation_name: CharSlice,
resource_name: CharSlice,
) -> MaybeError {
#[allow(clippy::unwrap_used)]
let endpoint = TelemetryActions::AddEndpoint(ddtelemetry::data::Endpoint {
method: Some(method),
path: Some(path.to_utf8_lossy().into_owned()),
operation_name: operation_name.to_utf8_lossy().into_owned(),
resource_name: resource_name.to_utf8_lossy().into_owned(),
});

try_c!(blocking::enqueue_actions(
transport,
instance_id,
queue_id,
vec![SidecarAction::Telemetry(endpoint)],
));

MaybeError::None
}

/// Reports a dependency to the telemetry.
#[no_mangle]
#[allow(clippy::missing_safety_doc)]
Expand Down
9 changes: 7 additions & 2 deletions datadog-sidecar/src/service/sidecar_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use std::collections::{HashMap, HashSet};
use std::pin::Pin;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use std::time::Duration;
use std::time::{Duration, SystemTime};
use tracing::{debug, error, info, trace, warn};

use futures::FutureExt;
Expand Down Expand Up @@ -422,7 +422,7 @@ impl SidecarInterface for SidecarServer {
);
let mut telemetry = telemetry_mutex.lock_or_panic();

let mut actions_to_process = vec![];
let mut actions_to_process: Vec<SidecarAction> = vec![];
let mut composer_paths_to_process = vec![];
let mut buffered_info_changed = false;
let mut remove_entry = false;
Expand Down Expand Up @@ -450,6 +450,11 @@ impl SidecarInterface for SidecarServer {
SidecarAction::ClearQueueId => {
remove_entry = true;
}
SidecarAction::Telemetry(TelemetryActions::AddEndpoint(_)) => {
telemetry.last_endpoints_push = SystemTime::now();
buffered_info_changed = true;
actions_to_process.push(action);
}
SidecarAction::Telemetry(TelemetryActions::Lifecycle(
LifecycleAction::Stop,
)) => {
Expand Down
3 changes: 3 additions & 0 deletions datadog-sidecar/src/service/telemetry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ pub struct TelemetryCachedClient {
pub config_sent: bool,
pub buffered_integrations: HashSet<Integration>,
pub buffered_composer_paths: HashSet<PathBuf>,
pub last_endpoints_push: SystemTime,
pub telemetry_metrics: HashMap<String, ContextKey>,
pub handle: Option<JoinHandle<()>>,
}
Expand Down Expand Up @@ -101,6 +102,7 @@ impl TelemetryCachedClient {
config_sent: false,
buffered_integrations: HashSet::new(),
buffered_composer_paths: HashSet::new(),
last_endpoints_push: SystemTime::UNIX_EPOCH,
telemetry_metrics: Default::default(),
handle: None,
}
Expand All @@ -111,6 +113,7 @@ impl TelemetryCachedClient {
&self.config_sent,
&self.buffered_integrations,
&self.buffered_composer_paths,
&self.last_endpoints_push,
)) {
self.shm_writer.write(&buf);
} else {
Expand Down
3 changes: 2 additions & 1 deletion ddtelemetry/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ tracing = { version = "0.1", default-features = false }
uuid = { version = "1.3", features = ["v4"] }
hashbrown = "0.15"

ddcommon = { path = "../ddcommon", default-features = false }
ddcommon = { path = "../ddcommon", default-features = false}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MMmm do I need this?

ddcommon-ffi = { path = "../ddcommon-ffi", default-features = false }
datadog-ddsketch = { path = "../ddsketch" }

[target."cfg(unix)".dependencies]
Expand Down
2 changes: 2 additions & 0 deletions ddtelemetry/src/data/payload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ pub enum Payload {
AppDependenciesLoaded(AppDependenciesLoaded),
AppIntegrationsChange(AppIntegrationsChange),
AppClientConfigurationChange(AppClientConfigurationChange),
AppEndpoints(AppEndpoints),
AppHeartbeat(#[serde(skip_serializing)] ()),
AppClosing(#[serde(skip_serializing)] ()),
GenerateMetrics(GenerateMetrics),
Expand All @@ -29,6 +30,7 @@ impl Payload {
AppDependenciesLoaded(_) => "app-dependencies-loaded",
AppIntegrationsChange(_) => "app-integrations-change",
AppClientConfigurationChange(_) => "app-client-configuration-change",
AppEndpoints(_) => "app-endpoints",
AppHeartbeat(_) => "app-heartbeat",
AppClosing(_) => "app-closing",
GenerateMetrics(_) => "generate-metrics",
Expand Down
58 changes: 58 additions & 0 deletions ddtelemetry/src/data/payloads.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
// Copyright 2021-Present Datadog, Inc. https://www.datadoghq.com/
// SPDX-License-Identifier: Apache-2.0

use std::hash::Hasher;

use crate::data::metrics;

use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -61,6 +63,12 @@ pub struct AppClientConfigurationChange {
pub configuration: Vec<Configuration>,
}

#[derive(Debug, Serialize)]
pub struct AppEndpoints {
pub is_first: bool,
pub endpoints: Vec<serde_json::Value>,
}

#[derive(Serialize, Debug)]
pub struct GenerateMetrics {
pub series: Vec<metrics::Serie>,
Expand Down Expand Up @@ -95,3 +103,53 @@ pub enum LogLevel {
Warn,
Debug,
}

#[derive(Serialize, Deserialize, Debug, PartialEq, Eq, Hash, Clone)]
#[serde(rename_all = "UPPERCASE")]
#[repr(C)]
pub enum Method {
Get = 0,
Post = 1,
Put = 2,
Delete = 3,
Patch = 4,
Head = 5,
Options = 6,
Trace = 7,
Connect = 8,
Other = 9, //This is specified as "*" in the OpenAPI spec
}

#[derive(Serialize, Deserialize, Debug, Clone, Default)]
pub struct Endpoint {
#[serde(default)]
pub method: Option<Method>,
#[serde(default)]
pub path: Option<String>,
pub operation_name: String,
pub resource_name: String,
}

impl PartialEq for Endpoint {
fn eq(&self, other: &Self) -> bool {
self.resource_name == other.resource_name
}
}

impl Eq for Endpoint {}

impl std::hash::Hash for Endpoint {
fn hash<H: Hasher>(&self, state: &mut H) {
self.resource_name.hash(state);
}
}

impl Endpoint {
pub fn to_json_value(&self) -> serde_json::Result<serde_json::Value> {
let result = serde_json::to_value(self);
match result {
Ok(value) => Ok(value),
Err(err) => Err(err),
}
}
}
36 changes: 30 additions & 6 deletions ddtelemetry/src/worker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,11 @@ pub mod store;

use crate::{
config::Config,
data::{self, Application, Dependency, Host, Integration, Log, Payload, Telemetry},
data::{self, Application, Dependency, Endpoint, Host, Integration, Log, Payload, Telemetry},
metrics::{ContextKey, MetricBuckets, MetricContexts},
};
use ddcommon::Endpoint;
use ddcommon::{hyper_migration, tag::Tag, worker::Worker};

use std::fmt::Debug;
use std::iter::Sum;
use std::ops::Add;
use std::{
Expand All @@ -26,6 +24,7 @@ use std::{
},
time,
};
use std::{collections::HashSet, fmt::Debug};

use crate::metrics::MetricBucketStats;
use futures::{
Expand Down Expand Up @@ -89,6 +88,7 @@ pub enum TelemetryActions {
AddDependency(Dependency),
AddIntegration(Integration),
AddLog((LogIdentifier, Log)),
AddEndpoint(Endpoint),
Lifecycle(LifecycleAction),
#[serde(skip)]
CollectStats(oneshot::Sender<TelemetryWorkerStats>),
Expand Down Expand Up @@ -120,6 +120,7 @@ struct TelemetryWorkerData {
dependencies: store::Store<Dependency>,
configurations: store::Store<data::Configuration>,
integrations: store::Store<data::Integration>,
endpoints: HashSet<data::Endpoint>,
logs: store::QueueHashMap<LogIdentifier, Log>,
metric_contexts: MetricContexts,
metric_buckets: MetricBuckets,
Expand Down Expand Up @@ -350,7 +351,11 @@ impl TelemetryWorker {
}
}
}
AddConfig(_) | AddDependency(_) | AddIntegration(_) | Lifecycle(ExtendedHeartbeat) => {}
AddConfig(_)
| AddDependency(_)
| AddIntegration(_)
| AddEndpoint(_)
| Lifecycle(ExtendedHeartbeat) => {}
Lifecycle(Stop) => {
if !self.data.started {
return BREAK;
Expand Down Expand Up @@ -407,6 +412,9 @@ impl TelemetryWorker {
AddDependency(dep) => self.data.dependencies.insert(dep),
AddIntegration(integration) => self.data.integrations.insert(integration),
AddConfig(cfg) => self.data.configurations.insert(cfg),
AddEndpoint(endpoint) => {
self.data.endpoints.insert(endpoint);
}
AddLog((identifier, log)) => {
let (l, new) = self.data.logs.get_mut_or_insert(identifier, log);
if !new {
Expand Down Expand Up @@ -551,6 +559,18 @@ impl TelemetryWorker {
},
))
}
if !self.data.endpoints.is_empty() {
payloads.push(data::Payload::AppEndpoints(data::AppEndpoints {
is_first: true,
endpoints: self
.data
.endpoints
.iter()
.map(|e| e.to_json_value().unwrap_or_default())
.filter(|e| e.is_object())
.collect(),
}));
}
payloads
}

Expand Down Expand Up @@ -653,6 +673,7 @@ impl TelemetryWorker {
.data
.configurations
.removed_flushed(p.configuration.len()),
AppEndpoints(_) => self.data.endpoints.clear(),
MessageBatch(batch) => {
for p in batch {
self.payload_sent_success(p);
Expand Down Expand Up @@ -756,7 +777,7 @@ impl TelemetryWorker {
let timeout_ms = if let Some(endpoint) = self.config.endpoint.as_ref() {
endpoint.timeout_ms
} else {
Endpoint::DEFAULT_TIMEOUT
ddcommon::Endpoint::DEFAULT_TIMEOUT
};

debug!(
Expand Down Expand Up @@ -995,7 +1016,7 @@ impl TelemetryWorkerHandle {
}
}

/// How many dependencies/integrations/configs we keep in memory at most
/// How many dependencies/integrations/configs/endpoints we keep in memory at most
pub const MAX_ITEMS: usize = 5000;

#[derive(Debug, Default, Clone, Copy)]
Expand All @@ -1015,6 +1036,7 @@ pub struct TelemetryWorkerBuilder {
pub dependencies: store::Store<data::Dependency>,
pub integrations: store::Store<data::Integration>,
pub configurations: store::Store<data::Configuration>,
pub endpoints: HashSet<data::Endpoint>,
pub native_deps: bool,
pub rust_shared_lib_deps: bool,
pub config: Config,
Expand Down Expand Up @@ -1065,6 +1087,7 @@ impl TelemetryWorkerBuilder {
dependencies: store::Store::new(MAX_ITEMS),
integrations: store::Store::new(MAX_ITEMS),
configurations: store::Store::new(MAX_ITEMS),
endpoints: HashSet::new(),
native_deps: true,
rust_shared_lib_deps: false,
config: Config::default(),
Expand Down Expand Up @@ -1095,6 +1118,7 @@ impl TelemetryWorkerBuilder {
dependencies: self.dependencies,
integrations: self.integrations,
configurations: self.configurations,
endpoints: self.endpoints,
logs: store::QueueHashMap::default(),
metric_contexts: contexts.clone(),
metric_buckets: MetricBuckets::default(),
Expand Down
Loading