Skip to content

Commit 21fa8c3

Browse files
gustavocovasjohanbrandhorst
authored andcommitted
Fix EOF handling in client side streaming (grpc-ecosystem#962)
* Add integration test for grpc-ecosystem#961 Co-authored-by: Jonas Arilho <[email protected]> * Add verification for io.EOF after stream.Send() on generated code template (grpc-ecosystem#961) Co-authored-by: Jonas Arilho <[email protected]> * Add more values on testABEBulkCreateWithError, run go mod tidy Fixes grpc-ecosystem#961
1 parent 0d29f14 commit 21fa8c3

File tree

5 files changed

+78
-1
lines changed

5 files changed

+78
-1
lines changed

examples/integration/integration_test.go

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"strings"
1616
"sync"
1717
"testing"
18+
"time"
1819

1920
"github.com/golang/protobuf/jsonpb"
2021
"github.com/golang/protobuf/proto"
@@ -263,6 +264,7 @@ func TestABE(t *testing.T) {
263264
testABECreate(t, 8080)
264265
testABECreateBody(t, 8080)
265266
testABEBulkCreate(t, 8080)
267+
testABEBulkCreateWithError(t, 8080)
266268
testABELookup(t, 8080)
267269
testABELookupNotFound(t, 8080)
268270
testABEList(t, 8080)
@@ -549,6 +551,65 @@ func testABEBulkCreate(t *testing.T, port int) {
549551
}
550552
}
551553

554+
func testABEBulkCreateWithError(t *testing.T, port int) {
555+
count := 0
556+
r, w := io.Pipe()
557+
go func(w io.WriteCloser) {
558+
defer func() {
559+
if cerr := w.Close(); cerr != nil {
560+
t.Errorf("w.Close() failed with %v; want success", cerr)
561+
}
562+
}()
563+
for _, val := range []string{
564+
"foo", "bar", "baz", "qux", "quux",
565+
} {
566+
time.Sleep(1 * time.Millisecond)
567+
568+
want := gw.ABitOfEverything{
569+
StringValue: fmt.Sprintf("strprefix/%s", val),
570+
}
571+
var m jsonpb.Marshaler
572+
if err := m.Marshal(w, &want); err != nil {
573+
t.Fatalf("m.Marshal(%#v, w) failed with %v; want success", want, err)
574+
}
575+
if _, err := io.WriteString(w, "\n"); err != nil {
576+
t.Errorf("w.Write(%q) failed with %v; want success", "\n", err)
577+
return
578+
}
579+
count++
580+
}
581+
}(w)
582+
583+
apiURL := fmt.Sprintf("http://localhost:%d/v1/example/a_bit_of_everything/bulk", port)
584+
request, err := http.NewRequest("POST", apiURL, r)
585+
if err != nil {
586+
t.Fatalf("http.NewRequest(%q, %q, nil) failed with %v; want success", "POST", apiURL, err)
587+
}
588+
request.Header.Add("Grpc-Metadata-error", "some error")
589+
590+
resp, err := http.DefaultClient.Do(request)
591+
if err != nil {
592+
t.Errorf("http.Post(%q) failed with %v; want success", apiURL, err)
593+
return
594+
}
595+
defer resp.Body.Close()
596+
buf, err := ioutil.ReadAll(resp.Body)
597+
if err != nil {
598+
t.Errorf("ioutil.ReadAll(resp.Body) failed with %v; want success", err)
599+
return
600+
}
601+
602+
if got, want := resp.StatusCode, http.StatusBadRequest; got != want {
603+
t.Errorf("resp.StatusCode = %d; want %d", got, want)
604+
t.Logf("%s", buf)
605+
}
606+
607+
var msg errorBody
608+
if err := json.Unmarshal(buf, &msg); err != nil {
609+
t.Fatalf("json.Unmarshal(%s, &msg) failed with %v; want success", buf, err)
610+
}
611+
}
612+
552613
func testABELookup(t *testing.T, port int) {
553614
apiURL := fmt.Sprintf("http://localhost:%d/v1/example/a_bit_of_everything", port)
554615
cresp, err := http.Post(apiURL, "application/json", strings.NewReader(`

examples/proto/examplepb/flow_combination.pb.gw.go

Lines changed: 3 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

examples/proto/examplepb/stream.pb.gw.go

Lines changed: 3 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

examples/server/a_bit_of_everything.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,8 +65,15 @@ func (s *_ABitOfEverythingServer) CreateBody(ctx context.Context, msg *examples.
6565
}
6666

6767
func (s *_ABitOfEverythingServer) BulkCreate(stream examples.StreamService_BulkCreateServer) error {
68-
count := 0
6968
ctx := stream.Context()
69+
70+
if header, ok := metadata.FromIncomingContext(ctx); ok {
71+
if v, ok := header["error"]; ok {
72+
return status.Errorf(codes.InvalidArgument, "error metadata: %v", v)
73+
}
74+
}
75+
76+
count := 0
7077
for {
7178
msg, err := stream.Recv()
7279
if err == io.EOF {

protoc-gen-grpc-gateway/gengateway/template.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -254,6 +254,9 @@ func request_{{.Method.Service.GetName}}_{{.Method.GetName}}_{{.Index}}(ctx cont
254254
return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err)
255255
}
256256
if err = stream.Send(&protoReq); err != nil {
257+
if err == io.EOF {
258+
break
259+
}
257260
grpclog.Infof("Failed to send request: %v", err)
258261
return nil, metadata, err
259262
}

0 commit comments

Comments
 (0)