Skip to content

Commit e23cfe2

Browse files
committed
Fixed issue where null values cause exception, and make sure state save exceptions bubble up
1 parent 4dcb298 commit e23cfe2

File tree

2 files changed

+41
-1
lines changed

2 files changed

+41
-1
lines changed

src/main/java/storm/trident/state/postgresql/PostgresqlState.java

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import storm.trident.state.map.OpaqueMap;
2828
import storm.trident.state.map.TransactionalMap;
2929
import backtype.storm.task.IMetricsContext;
30+
import backtype.storm.topology.FailedException;
3031

3132
import com.google.common.base.Function;
3233
import com.google.common.base.Joiner;
@@ -136,7 +137,7 @@ public void multiPut(final List<List<Object>> keys, final List<T> values) {
136137
.append(buildColumns())
137138
.append(") AS (")
138139
.append("VALUES ")
139-
.append(Joiner.on(", ").join(repeat("(" + Joiner.on(",").join(repeat("?", paramCount)) + ")", pkeys.size())))
140+
.append(Joiner.on(", ").join(repeat("(" + buildValueParams() + ")", pkeys.size())))
140141
.append("),")
141142
.append("updated AS (")
142143
.append("UPDATE ").append(config.getTable()).append(" t ")
@@ -198,6 +199,7 @@ public String apply(final String col) {
198199
ps.execute();
199200
} catch (final SQLException ex) {
200201
logger.error("Multiput update failed", ex);
202+
throw new FailedException(ex);
201203
} finally {
202204
if (ps != null) {
203205
try {
@@ -217,6 +219,16 @@ private String buildColumns() {
217219
return Joiner.on(",").join(cols);
218220
}
219221

222+
private String buildValueParams() {
223+
final List<String> types = Lists.newArrayList(config.getKeyTypes()); // the columns for the composite unique key
224+
types.addAll(getValueTypes());
225+
List<String> withCast = new ArrayList<>();
226+
for(String type : types) {
227+
withCast.add(String.format("?::%s", type));
228+
}
229+
return Joiner.on(",").join(withCast);
230+
}
231+
220232
private String buildKeyQuery(final int n) {
221233
final String single = "(" + Joiner.on(" AND ").join(Lists.transform(Arrays.asList(config.getKeyColumns()), new Function<String, String>() {
222234
@Override
@@ -243,6 +255,17 @@ public String apply(final String field) {
243255
return cols;
244256
}
245257

258+
private List<String> getValueTypes() {
259+
final List<String> cols = Lists.newArrayList(config.getValueTypes()); // the columns storing the values
260+
if (StateType.OPAQUE.equals(config.getType()) || StateType.TRANSACTIONAL.equals(config.getType())) {
261+
cols.add("bigint");
262+
}
263+
if (StateType.OPAQUE.equals(config.getType())) {
264+
cols.addAll(Lists.newArrayList(config.getValueTypes()));
265+
}
266+
return cols;
267+
}
268+
246269
/**
247270
* run the multi get query, passing in the list of keys and returning key tuples mapped to value tuples
248271
*

src/main/java/storm/trident/state/postgresql/PostgresqlStateConfig.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ public class PostgresqlStateConfig implements Serializable {
1212
private StateType type = StateType.OPAQUE;
1313
private String[] keyColumns;
1414
private String[] valueColumns;
15+
private String[] valueTypes;
16+
private String[] keyTypes;
1517
private int batchSize = DEFAULT_BATCH_SIZE;
1618
private int cacheSize = DEFAULT_CACHE_SIZE;
1719

@@ -74,4 +76,19 @@ public void setBatchSize(int batchSize) {
7476
this.batchSize = batchSize;
7577
}
7678

79+
public String[] getValueTypes() {
80+
return valueTypes;
81+
}
82+
83+
public void setValueTypes(String[] valueTypes) {
84+
this.valueTypes = valueTypes;
85+
}
86+
87+
public String[] getKeyTypes() {
88+
return keyTypes;
89+
}
90+
91+
public void setKeyTypes(String[] keyTypes) {
92+
this.keyTypes = keyTypes;
93+
}
7794
}

0 commit comments

Comments
 (0)