@@ -49,16 +49,20 @@ public class ThriftRestImpl extends AbstractComponent implements Rest.Iface {
49
49
logger .trace ("thrift message {}" , request );
50
50
}
51
51
final CountDownLatch latch = new CountDownLatch (1 );
52
- final AtomicReference <RestResponse > ref = new AtomicReference <RestResponse >();
52
+ final AtomicReference <org . elasticsearch . thrift . RestResponse > ref = new AtomicReference <org . elasticsearch . thrift . RestResponse >();
53
53
restController .dispatchRequest (new ThriftRestRequest (request ), new RestChannel () {
54
54
@ Override public void sendResponse (RestResponse response ) {
55
- ref .set (response );
55
+ try {
56
+ ref .set (convert (response ));
57
+ } catch (IOException e ) {
58
+ // ignore, should not happen...
59
+ }
56
60
latch .countDown ();
57
61
}
58
62
});
59
63
try {
60
64
latch .await ();
61
- return convert ( ref .get () );
65
+ return ref .get ();
62
66
} catch (Exception e ) {
63
67
throw new TException ("failed to generate response" , e );
64
68
}
@@ -67,7 +71,14 @@ public class ThriftRestImpl extends AbstractComponent implements Rest.Iface {
67
71
private org .elasticsearch .thrift .RestResponse convert (RestResponse response ) throws IOException {
68
72
org .elasticsearch .thrift .RestResponse tResponse = new org .elasticsearch .thrift .RestResponse (getStatus (response .status ()));
69
73
if (response .contentLength () > 0 ) {
70
- tResponse .setBody (ByteBuffer .wrap (response .content (), 0 , response .contentLength ()));
74
+ if (response .contentThreadSafe ()) {
75
+ tResponse .setBody (ByteBuffer .wrap (response .content (), 0 , response .contentLength ()));
76
+ } else {
77
+ // argh!, we need to copy it over since we are not on the same thread...
78
+ byte [] body = new byte [response .contentLength ()];
79
+ System .arraycopy (response .content (), 0 , body , 0 , response .contentLength ());
80
+ tResponse .setBody (ByteBuffer .wrap (body ));
81
+ }
71
82
}
72
83
return tResponse ;
73
84
}
0 commit comments