35
35
36
36
import java .io .File ;
37
37
import java .io .IOException ;
38
+ import java .nio .ByteBuffer ;
38
39
import java .util .concurrent .atomic .AtomicInteger ;
40
+ import java .util .concurrent .atomic .AtomicLong ;
39
41
40
42
/**
41
43
* @author kimchy (shay.banon)
@@ -54,7 +56,8 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog
54
56
55
57
private final AtomicInteger operationCounter = new AtomicInteger ();
56
58
57
- private long lastPosition = 0 ;
59
+ private AtomicLong lastPosition = new AtomicLong (0 );
60
+ private AtomicLong lastWrittenPosition = new AtomicLong (0 );
58
61
59
62
private RafReference raf ;
60
63
@@ -113,7 +116,8 @@ public File location() {
113
116
@ Override public void newTranslog () throws TranslogException {
114
117
synchronized (mutex ) {
115
118
operationCounter .set (0 );
116
- lastPosition = 0 ;
119
+ lastPosition .set (0 );
120
+ lastWrittenPosition .set (0 );
117
121
this .id = id + 1 ;
118
122
if (raf != null ) {
119
123
raf .decreaseRefCount (true );
@@ -131,7 +135,8 @@ public File location() {
131
135
@ Override public void newTranslog (long id ) throws TranslogException {
132
136
synchronized (mutex ) {
133
137
operationCounter .set (0 );
134
- lastPosition = 0 ;
138
+ lastPosition .set (0 );
139
+ lastWrittenPosition .set (0 );
135
140
this .id = id ;
136
141
if (raf != null ) {
137
142
raf .decreaseRefCount (true );
@@ -158,12 +163,15 @@ public File location() {
158
163
out .seek (0 );
159
164
out .writeInt (size - 4 );
160
165
166
+ long position = lastPosition .getAndAdd (size );
167
+ // use channel#write and not raf#write since it allows for concurrent writes
168
+ // with regards to positions
169
+ raf .channel ().write (ByteBuffer .wrap (out .unsafeByteArray (), 0 , size ), position );
170
+ if (syncOnEachOperation ) {
171
+ raf .channel ().force (false );
172
+ }
161
173
synchronized (mutex ) {
162
- raf .raf ().write (out .unsafeByteArray (), 0 , size );
163
- if (syncOnEachOperation ) {
164
- sync ();
165
- }
166
- lastPosition += size ;
174
+ lastWrittenPosition .getAndAdd (size );
167
175
operationCounter .incrementAndGet ();
168
176
}
169
177
} catch (Exception e ) {
@@ -175,10 +183,11 @@ public File location() {
175
183
synchronized (mutex ) {
176
184
try {
177
185
raf .increaseRefCount ();
186
+ raf .channel ().force (true ); // sync here, so we make sure we read back teh data?
178
187
if (useStream ) {
179
- return new FsStreamSnapshot (shardId , this .id , raf , lastPosition , operationCounter .get (), operationCounter .get ());
188
+ return new FsStreamSnapshot (shardId , this .id , raf , lastWrittenPosition . get () , operationCounter .get (), operationCounter .get ());
180
189
} else {
181
- return new FsChannelSnapshot (shardId , this .id , raf , lastPosition , operationCounter .get (), operationCounter .get ());
190
+ return new FsChannelSnapshot (shardId , this .id , raf , lastWrittenPosition . get () , operationCounter .get (), operationCounter .get ());
182
191
}
183
192
} catch (Exception e ) {
184
193
throw new TranslogException (shardId , "Failed to snapshot" , e );
@@ -193,12 +202,13 @@ public File location() {
193
202
}
194
203
try {
195
204
raf .increaseRefCount ();
205
+ raf .channel ().force (true ); // sync here, so we make sure we read back teh data?
196
206
if (useStream ) {
197
- FsStreamSnapshot newSnapshot = new FsStreamSnapshot (shardId , id , raf , lastPosition , operationCounter .get (), operationCounter .get () - snapshot .totalOperations ());
207
+ FsStreamSnapshot newSnapshot = new FsStreamSnapshot (shardId , id , raf , lastWrittenPosition . get () , operationCounter .get (), operationCounter .get () - snapshot .totalOperations ());
198
208
newSnapshot .seekForward (snapshot .position ());
199
209
return newSnapshot ;
200
210
} else {
201
- FsChannelSnapshot newSnapshot = new FsChannelSnapshot (shardId , id , raf , lastPosition , operationCounter .get (), operationCounter .get () - snapshot .totalOperations ());
211
+ FsChannelSnapshot newSnapshot = new FsChannelSnapshot (shardId , id , raf , lastWrittenPosition . get () , operationCounter .get (), operationCounter .get () - snapshot .totalOperations ());
202
212
newSnapshot .seekForward (snapshot .position ());
203
213
return newSnapshot ;
204
214
}
@@ -212,7 +222,7 @@ public File location() {
212
222
synchronized (mutex ) {
213
223
if (raf != null ) {
214
224
try {
215
- raf .raf ().getFD (). sync ( );
225
+ raf .channel ().force ( true );
216
226
} catch (Exception e ) {
217
227
// ignore
218
228
}
0 commit comments