Skip to content

Commit 466e339

Browse files
qmfrederikk8s-ci-robot
authored andcommitted
Prototype Exec API (kubernetes-client#271)
1 parent 644cf76 commit 466e339

File tree

10 files changed

+590
-8
lines changed

10 files changed

+590
-8
lines changed
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
using System.IO;
2+
using System.Threading.Tasks;
3+
4+
namespace k8s
5+
{
6+
/// <summary>
7+
/// A prototype for a callback which asynchronously processes the standard input, standard output and standard error of a command executing in
8+
/// a container.
9+
/// </summary>
10+
/// <param name="stdIn">
11+
/// The standard intput stream of the process.
12+
/// </param>
13+
/// <param name="stdOut">
14+
/// The standard output stream of the process.
15+
/// </param>
16+
/// <param name="stdErr">
17+
/// The standard error stream of the remote process.
18+
/// </param>
19+
/// <returns>
20+
/// A <see cref="Task"/> which represents the asynchronous processing of the process input, output and error streams. This task
21+
/// should complete once you're done interacting with the remote process.
22+
/// </returns>
23+
public delegate Task ExecAsyncCallback(Stream stdIn, Stream stdOut, Stream stdErr);
24+
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
using System.Collections.Generic;
2+
using System.Threading;
3+
using System.Threading.Tasks;
4+
5+
namespace k8s
6+
{
7+
public partial interface IKubernetes
8+
{
9+
/// <summary>
10+
/// Executes a command in a container in a pod.
11+
/// </summary>
12+
/// <param name="name">
13+
/// The name of the pod which contains the container in which to execute the ocmmand.
14+
/// </param>
15+
/// <param name="namespace">
16+
/// The namespace of the container.
17+
/// </param>
18+
/// <param name="container">
19+
/// The container in which to run the command.
20+
/// </param>
21+
/// <param name="command">
22+
/// The command to execute.
23+
/// </param>
24+
/// <param name="action">
25+
/// A callback which processes the standard input, standard output and standard error.
26+
/// </param>
27+
/// <param name="cancellationToken">
28+
/// A <see cref="CancellationToken"/> which can be used to cancel the asynchronous operation.
29+
/// </param>
30+
/// <returns>
31+
/// A <see cref="Task"/> which represents the asynchronous operation.
32+
/// </returns>
33+
Task<int> NamespacedPodExecAsync(string name, string @namespace, string container, IEnumerable<string> command, bool tty, ExecAsyncCallback action, CancellationToken cancellationToken);
34+
}
35+
}

src/KubernetesClient/IKubernetes.WebSocket.cs

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,57 @@ public partial interface IKubernetes
109109
/// </return>
110110
Task<WebSocket> WebSocketNamespacedPodExecAsync(string name, string @namespace = "default", IEnumerable<string> command = null, string container = null, bool stderr = true, bool stdin = true, bool stdout = true, bool tty = true, string webSocketSubProtol = null, Dictionary<string, List<string>> customHeaders = null, CancellationToken cancellationToken = default(CancellationToken));
111111

112+
/// <summary>
113+
/// Executes a command in a pod.
114+
/// </summary>
115+
/// <param name='name'>
116+
/// name of the Pod
117+
/// </param>
118+
/// <param name='namespace'>
119+
/// object name and auth scope, such as for teams and projects
120+
/// </param>
121+
/// <param name='command'>
122+
/// Command is the remote command to execute. argv array. Not executed within a
123+
/// shell.
124+
/// </param>
125+
/// <param name='container'>
126+
/// Container in which to execute the command. Defaults to only container if
127+
/// there is only one container in the pod.
128+
/// </param>
129+
/// <param name='stderr'>
130+
/// Redirect the standard error stream of the pod for this call. Defaults to
131+
/// <see langword="true"/>.
132+
/// </param>
133+
/// <param name='stdin'>
134+
/// Redirect the standard input stream of the pod for this call. Defaults to
135+
/// <see langword="true"/>.
136+
/// </param>
137+
/// <param name='stdout'>
138+
/// Redirect the standard output stream of the pod for this call. Defaults to
139+
/// <see langword="true"/>.
140+
/// </param>
141+
/// <param name='tty'>
142+
/// TTY if true indicates that a tty will be allocated for the exec call.
143+
/// Defaults to <see langword="true"/>.
144+
/// </param>
145+
/// <param name="webSocketSubProtocol">
146+
/// The Kubernetes-specific WebSocket sub protocol to use. See <see cref="WebSocketProtocol"/> for a list of available
147+
/// protocols.
148+
/// </param>
149+
/// <param name='customHeaders'>
150+
/// Headers that will be added to request.
151+
/// </param>
152+
/// <param name='cancellationToken'>
153+
/// The cancellation token.
154+
/// </param>
155+
/// <exception cref="ArgumentNullException">
156+
/// Thrown when a required parameter is null
157+
/// </exception>
158+
/// <return>
159+
/// A <see cref="IStreamDemuxer"/> which can be used to communicate with the process running in the pod.
160+
/// </return>
161+
Task<IStreamDemuxer> MuxedStreamNamespacedPodExecAsync(string name, string @namespace = "default", IEnumerable<string> command = null, string container = null, bool stderr = true, bool stdin = true, bool stdout = true, bool tty = true, string webSocketSubProtol = WebSocketProtocol.V4BinaryWebsocketProtocol, Dictionary<string, List<string>> customHeaders = null, CancellationToken cancellationToken = default(CancellationToken));
162+
112163
/// <summary>
113164
/// Start port forwarding one or more ports of a pod.
114165
/// </summary>
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.IO;
4+
using System.Threading;
5+
using System.Threading.Tasks;
6+
7+
namespace k8s
8+
{
9+
/// <summary>
10+
/// <para>
11+
/// The <see cref="IStreamDemuxer"/> interface allows you to interact with processes running in a container in a Kubernetes pod. You can start an exec or attach command
12+
/// by calling <see cref="Kubernetes.WebSocketNamespacedPodExecAsync(string, string, IEnumerable{string}, string, bool, bool, bool, bool, Dictionary{string, List{string}}, CancellationToken)"/>
13+
/// or <see cref="Kubernetes.WebSocketNamespacedPodAttachAsync(string, string, string, bool, bool, bool, bool, Dictionary{string, List{string}}, CancellationToken)"/>. These methods
14+
/// will return you a <see cref="WebSocket"/> connection.
15+
/// </para>
16+
/// <para>
17+
/// Kubernetes 'multiplexes' multiple channels over this <see cref="WebSocket"/> connection, such as standard input, standard output and standard error. The <see cref="StreamDemuxer"/>
18+
/// allows you to extract individual <see cref="Stream"/>s from this <see cref="WebSocket"/> class. You can then use these streams to send/receive data from that process.
19+
/// </para>
20+
/// </summary>
21+
public interface IStreamDemuxer : IDisposable
22+
{
23+
/// <summary>
24+
/// Starts reading the data sent by the server.
25+
/// </summary>
26+
void Start();
27+
28+
/// <summary>
29+
/// Gets a <see cref="Stream"/> which allows you to read to and/or write from a remote channel.
30+
/// </summary>
31+
/// <param name="inputIndex">
32+
/// The index of the channel from which to read.
33+
/// </param>
34+
/// <param name="outputIndex">
35+
/// The index of the channel to which to write.
36+
/// </param>
37+
/// <returns>
38+
/// A <see cref="Stream"/> which allows you to read/write to the requested channels.
39+
/// </returns>
40+
Stream GetStream(ChannelIndex? inputIndex, ChannelIndex? outputIndex);
41+
42+
/// <summary>
43+
/// Gets a <see cref="Stream"/> which allows you to read to and/or write from a remote channel.
44+
/// </summary>
45+
/// <param name="inputIndex">
46+
/// The index of the channel from which to read.
47+
/// </param>
48+
/// <param name="outputIndex">
49+
/// The index of the channel to which to write.
50+
/// </param>
51+
/// <returns>
52+
/// A <see cref="Stream"/> which allows you to read/write to the requested channels.
53+
/// </returns>
54+
Stream GetStream(byte? inputIndex, byte? outputIndex);
55+
56+
/// <summary>
57+
/// Directly writes data to a channel.
58+
/// </summary>
59+
/// <param name="index">
60+
/// The index of the channel to which to write.
61+
/// </param>
62+
/// <param name="buffer">
63+
/// The buffer from which to read data.
64+
/// </param>
65+
/// <param name="offset">
66+
/// The offset at which to start reading.
67+
/// </param>
68+
/// <param name="count">
69+
/// The number of bytes to read.
70+
/// </param>
71+
/// <param name="cancellationToken">
72+
/// A <see cref="CancellationToken"/> which can be used to cancel the asynchronous operation.
73+
/// </param>
74+
/// <returns>
75+
/// A <see cref="Task"/> which represents the asynchronous operation.
76+
/// </returns>
77+
Task Write(ChannelIndex index, byte[] buffer, int offset, int count, CancellationToken cancellationToken = default(CancellationToken));
78+
79+
/// <summary>
80+
/// Directly writes data to a channel.
81+
/// </summary>
82+
/// <param name="index">
83+
/// The index of the channel to which to write.
84+
/// </param>
85+
/// <param name="buffer">
86+
/// The buffer from which to read data.
87+
/// </param>
88+
/// <param name="offset">
89+
/// The offset at which to start reading.
90+
/// </param>
91+
/// <param name="count">
92+
/// The number of bytes to read.
93+
/// </param>
94+
/// <param name="cancellationToken">
95+
/// A <see cref="CancellationToken"/> which can be used to cancel the asynchronous operation.
96+
/// </param>
97+
/// <returns>
98+
/// A <see cref="Task"/> which represents the asynchronous operation.
99+
/// </returns>
100+
Task Write(byte index, byte[] buffer, int offset, int count, CancellationToken cancellationToken = default(CancellationToken));
101+
}
102+
}
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
using k8s.Models;
2+
using Microsoft.Rest;
3+
using Microsoft.Rest.Serialization;
4+
using System;
5+
using System.Collections.Generic;
6+
using System.IO;
7+
using System.Linq;
8+
using System.Threading;
9+
using System.Threading.Tasks;
10+
11+
namespace k8s
12+
{
13+
public partial class Kubernetes
14+
{
15+
public async Task<int> NamespacedPodExecAsync(string name, string @namespace, string container, IEnumerable<string> command, bool tty, ExecAsyncCallback action, CancellationToken cancellationToken)
16+
{
17+
// All other parameters are being validated by MuxedStreamNamespacedPodExecAsync
18+
if (action == null)
19+
{
20+
throw new ArgumentNullException(nameof(action));
21+
}
22+
23+
try
24+
{
25+
using (var muxedStream = await this.MuxedStreamNamespacedPodExecAsync(name: name, @namespace: @namespace, command: command, container: container, tty: tty, cancellationToken: cancellationToken).ConfigureAwait(false))
26+
using (Stream stdIn = muxedStream.GetStream(null, ChannelIndex.StdIn))
27+
using (Stream stdOut= muxedStream.GetStream(ChannelIndex.StdOut, null))
28+
using (Stream stdErr = muxedStream.GetStream(ChannelIndex.StdErr, null))
29+
using (Stream error = muxedStream.GetStream(ChannelIndex.Error, null))
30+
using (StreamReader errorReader = new StreamReader(error))
31+
{
32+
muxedStream.Start();
33+
34+
await action(stdIn, stdOut, stdErr).ConfigureAwait(false);
35+
36+
var errors = await errorReader.ReadToEndAsync().ConfigureAwait(false);
37+
38+
// StatusError is defined here:
39+
// https://github.com/kubernetes/kubernetes/blob/068e1642f63a1a8c48c16c18510e8854a4f4e7c5/staging/src/k8s.io/apimachinery/pkg/api/errors/errors.go#L37
40+
var returnMessage = SafeJsonConvert.DeserializeObject<V1Status>(errors);
41+
return GetExitCodeOrThrow(returnMessage);
42+
}
43+
}
44+
catch (HttpOperationException httpEx) when (httpEx.Body is V1Status)
45+
{
46+
throw new KubernetesException((V1Status)httpEx.Body);
47+
}
48+
}
49+
50+
/// <summary>
51+
/// Determines the process' exit code based on a <see cref="V1Status"/> message.
52+
///
53+
/// This will:
54+
/// - return 0 if the process completed successfully
55+
/// - return the exit code if the process completed with a non-zero exit code
56+
/// - throw a <see cref="KubernetesException"/> in all other cases.
57+
/// </summary>
58+
/// <param name="status">
59+
/// A <see cref="V1Status"/> object.
60+
/// </param>
61+
/// <returns>
62+
/// The process exit code.
63+
/// </returns>
64+
public static int GetExitCodeOrThrow(V1Status status)
65+
{
66+
if (status == null)
67+
{
68+
throw new ArgumentNullException(nameof(status));
69+
}
70+
71+
if (status.Status == "Success")
72+
{
73+
return 0;
74+
}
75+
else if (status.Status == "Failure" && status.Reason == "NonZeroExitCode")
76+
{
77+
var exitCodeString = status.Details.Causes.FirstOrDefault(c => c.Reason == "ExitCode")?.Message;
78+
79+
if (int.TryParse(exitCodeString, out int exitCode))
80+
{
81+
return exitCode;
82+
}
83+
else
84+
{
85+
throw new KubernetesException(status);
86+
}
87+
}
88+
else
89+
{
90+
throw new KubernetesException(status);
91+
}
92+
}
93+
}
94+
}

src/KubernetesClient/Kubernetes.WebSocket.cs

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,15 @@ public partial class Kubernetes
3232
}
3333

3434
/// <inheritdoc/>
35-
public Task<WebSocket> WebSocketNamespacedPodExecAsync(string name, string @namespace = "default", IEnumerable<string> command = null, string container = null, bool stderr = true, bool stdin = true, bool stdout = true, bool tty = true, string webSocketSubProtol = null, Dictionary<string, List<string>> customHeaders = null, CancellationToken cancellationToken = default(CancellationToken))
35+
public virtual async Task<IStreamDemuxer> MuxedStreamNamespacedPodExecAsync(string name, string @namespace = "default", IEnumerable<string> command = null, string container = null, bool stderr = true, bool stdin = true, bool stdout = true, bool tty = true, string webSocketSubProtol = WebSocketProtocol.V4BinaryWebsocketProtocol, Dictionary<string, List<string>> customHeaders = null, CancellationToken cancellationToken = default(CancellationToken))
36+
{
37+
WebSocket webSocket = await this.WebSocketNamespacedPodExecAsync(name: name, @namespace: @namespace, command: command, container: container, tty: tty, cancellationToken: cancellationToken).ConfigureAwait(false);
38+
StreamDemuxer muxer = new StreamDemuxer(webSocket);
39+
return muxer;
40+
}
41+
42+
/// <inheritdoc/>
43+
public virtual Task<WebSocket> WebSocketNamespacedPodExecAsync(string name, string @namespace = "default", IEnumerable<string> command = null, string container = null, bool stderr = true, bool stdin = true, bool stdout = true, bool tty = true, string webSocketSubProtol = WebSocketProtocol.V4BinaryWebsocketProtocol, Dictionary<string, List<string>> customHeaders = null, CancellationToken cancellationToken = default(CancellationToken))
3644
{
3745
if (name == null)
3846
{
@@ -54,6 +62,20 @@ public partial class Kubernetes
5462
throw new ArgumentOutOfRangeException(nameof(command));
5563
}
5664

65+
if (command == null)
66+
{
67+
throw new ArgumentNullException(nameof(command));
68+
}
69+
70+
var commandArray = command.ToArray();
71+
foreach (var c in commandArray)
72+
{
73+
if (c.Length > 0 && c[0] == 0xfeff)
74+
{
75+
throw new InvalidOperationException($"Detected an attempt to execute a command which starts with a Unicode byte order mark (BOM). This is probably incorrect. The command was {c}");
76+
}
77+
}
78+
5779
// Tracing
5880
bool _shouldTrace = ServiceClientTracing.IsEnabled;
5981
string _invocationId = null;

src/KubernetesClient/MuxedStream.cs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,10 @@ public MuxedStream(StreamDemuxer muxer, ByteBuffer inputBuffer, byte? outputInde
3434
throw new ArgumentException("You must specify at least inputBuffer or outputIndex");
3535
}
3636

37-
this.muxer = muxer ?? throw new ArgumentNullException(nameof(muxer));
37+
if (outputIndex != null)
38+
{
39+
this.muxer = muxer ?? throw new ArgumentNullException(nameof(muxer));
40+
}
3841
}
3942

4043
/// <inheritdoc/>

0 commit comments

Comments
 (0)