Skip to content

Commit c4b3f8d

Browse files
committed
refactor config to have a nicer constructor
1 parent 73942d0 commit c4b3f8d

File tree

3 files changed

+112
-83
lines changed

3 files changed

+112
-83
lines changed

src/main/java/storm/trident/state/mysql/MysqlState.java

Lines changed: 87 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import java.util.ArrayList;
99
import java.util.Arrays;
1010
import java.util.HashMap;
11+
import java.util.Iterator;
1112
import java.util.List;
1213
import java.util.Map;
1314

@@ -62,39 +63,42 @@ public static Factory newFactory(final MysqlStateConfig config) {
6263
@Override
6364
@SuppressWarnings({"unchecked","rawtypes"})
6465
public List<T> multiGet(final List<List<Object>> keys) {
65-
if (keys.isEmpty()) {
66-
return new ArrayList<>();
67-
}
68-
// build a query using select key1, keys2, ..., val1, val2, ..., [txid], [prev_val1], ... FROM table WHERE (key1 = ? AND keys = ? ...) OR ...
69-
final StringBuilder queryBuilder = new StringBuilder().append("SELECT ")
70-
.append(buildColumns())
71-
.append(" FROM ")
72-
.append(config.getTable())
73-
.append(" WHERE ")
74-
.append(buildKeyQuery(keys.size()));
75-
final Map<List<Object>, List<Object>> queryResults = query(queryBuilder.toString(), keys);
76-
// build the value list by ordering based on the input keys and looking up the query results, transform to transactional and opaque values as needed
77-
return Lists.transform(keys, new Function<List<Object>, T>() {
78-
@Override
79-
public T apply(final List<Object> key) {
80-
final List<Object> values = queryResults.get(key);
81-
if (values == null) {
82-
return null;
83-
} else {
84-
switch (config.getType()) {
85-
case OPAQUE: // partition the values list into 3 values [current], [txid], [prev]
86-
return (T) new OpaqueValue((Long) values.get(config.getValueColumns().length), // txid
87-
values.subList(0, config.getValueColumns().length), // curr values
88-
values.subList(config.getValueColumns().length, values.size())); // prev values
89-
case TRANSACTIONAL:
90-
return (T) new TransactionalValue((Long) values.get(config.getValueColumns().length), // txid
91-
values.subList(0, config.getValueColumns().length)); // curr values
92-
default:
93-
return (T) values;
66+
final List<List<List<Object>>> partitionedKeys = Lists.partition(keys, config.getBatchSize());
67+
final List<T> result = new ArrayList<>();
68+
for (final List<List<Object>> pkeys : partitionedKeys) {
69+
// build a query using select key1, keys2, ..., val1, val2, ..., [txid], [prev_val1], ... FROM table WHERE (key1 = ? AND keys = ? ...) OR ...
70+
final StringBuilder queryBuilder = new StringBuilder().append("SELECT ")
71+
.append(buildColumns())
72+
.append(" FROM ")
73+
.append(config.getTable())
74+
.append(" WHERE ")
75+
.append(buildKeyQuery(pkeys.size()));
76+
final Map<List<Object>, List<Object>> queryResults = query(queryBuilder.toString(), pkeys);
77+
// build the value list by ordering based on the input keys and looking up the query results, transform to transactional and opaque values as needed
78+
result.addAll(Lists.transform(pkeys, new Function<List<Object>, T>() {
79+
@Override
80+
public T apply(final List<Object> key) {
81+
final List<Object> values = queryResults.get(key);
82+
if (values == null) {
83+
return null;
84+
} else {
85+
switch (config.getType()) {
86+
case OPAQUE: // partition the values list into 3 values [current], [txid], [prev]
87+
return (T) new OpaqueValue((Long) values.get(config.getValueColumns().length), // txid
88+
values.subList(0, config.getValueColumns().length), // curr values
89+
values.subList(config.getValueColumns().length, values.size())); // prev values
90+
case TRANSACTIONAL:
91+
return (T) new TransactionalValue((Long) values.get(config.getValueColumns().length), // txid
92+
values.subList(0, config.getValueColumns().length)); // curr values
93+
default:
94+
return (T) values;
95+
}
9496
}
9597
}
96-
}
97-
});
98+
}));
99+
logger.debug(String.format("%1$d keys retrieved", pkeys.size()));
100+
}
101+
return result;
98102
}
99103

100104
/**
@@ -103,55 +107,62 @@ public T apply(final List<Object> key) {
103107
*/
104108
@Override
105109
public void multiPut(final List<List<Object>> keys, final List<T> values) {
106-
// build a query insert into table(key1, key2, ..., value1, value2, ... , [txid], [prev_val1], ...) values (?,?,...), ... ON DUPLICATE KEY UPDATE value1 = VALUES(value1), ...
107-
// how many params per row of data
108-
// opaque => keys + 2 * vals + 1
109-
// transactional => keys + vals + 1
110-
// non-transactional => keys + vals
111-
int paramCount = 0;
112-
switch (config.getType()) {
113-
case OPAQUE:
114-
paramCount += config.getValueColumns().length;
115-
case TRANSACTIONAL:
116-
paramCount += 1;
117-
default:
118-
paramCount += (config.getKeyColumns().length + config.getValueColumns().length);
119-
}
120-
final StringBuilder queryBuilder = new StringBuilder().append("INSERT INTO ")
121-
.append(config.getTable())
122-
.append("(")
123-
.append(buildColumns())
124-
.append(") VALUES ")
125-
.append(Joiner.on(",").join(repeat("(" + Joiner.on(",").join(repeat("?", paramCount)) + ")", keys.size())))
126-
.append(" ON DUPLICATE KEY UPDATE ")
127-
.append(Joiner.on(",").join(Lists.transform(getValueColumns(), new Function<String, String>() {
128-
@Override
129-
public String apply(final String col) {
130-
return col + " = VALUES(" + col + ")";
131-
}
132-
}))); // for every value column, constructs "valcol = VALUE(valcol)", joined by commas
133-
// run the update
134-
final List<Object> params = flattenPutParams(keys, values);
135-
PreparedStatement ps = null;
136-
int i = 0;
137-
try {
138-
ps = connection.prepareStatement(queryBuilder.toString());
139-
for (final Object param : params) {
140-
ps.setObject(++i, param);
110+
// partitions the keys and the values and run it over every one
111+
final Iterator<List<List<Object>>> partitionedKeys = Lists.partition(keys, config.getBatchSize()).iterator();
112+
final Iterator<List<T>> partitionedValues = Lists.partition(values, config.getBatchSize()).iterator();
113+
while (partitionedKeys.hasNext() && partitionedValues.hasNext()) {
114+
final List<List<Object>> pkeys = partitionedKeys.next();
115+
final List<T> pvalues = partitionedValues.next();
116+
// build a query insert into table(key1, key2, ..., value1, value2, ... , [txid], [prev_val1], ...) values (?,?,...), ... ON DUPLICATE KEY UPDATE value1 = VALUES(value1), ...
117+
// how many params per row of data
118+
// opaque => keys + 2 * vals + 1
119+
// transactional => keys + vals + 1
120+
// non-transactional => keys + vals
121+
int paramCount = 0;
122+
switch (config.getType()) {
123+
case OPAQUE:
124+
paramCount += config.getValueColumns().length;
125+
case TRANSACTIONAL:
126+
paramCount += 1;
127+
default:
128+
paramCount += (config.getKeyColumns().length + config.getValueColumns().length);
141129
}
142-
ps.execute();
143-
} catch (final SQLException ex) {
144-
logger.error("Multiput update failed", ex);
145-
} finally {
146-
if (ps != null) {
147-
try {
148-
ps.close();
149-
} catch (SQLException ex) {
150-
// don't care
130+
final StringBuilder queryBuilder = new StringBuilder().append("INSERT INTO ")
131+
.append(config.getTable())
132+
.append("(")
133+
.append(buildColumns())
134+
.append(") VALUES ")
135+
.append(Joiner.on(",").join(repeat("(" + Joiner.on(",").join(repeat("?", paramCount)) + ")", pkeys.size())))
136+
.append(" ON DUPLICATE KEY UPDATE ")
137+
.append(Joiner.on(",").join(Lists.transform(getValueColumns(), new Function<String, String>() {
138+
@Override
139+
public String apply(final String col) {
140+
return col + " = VALUES(" + col + ")";
141+
}
142+
}))); // for every value column, constructs "valcol = VALUE(valcol)", joined by commas
143+
// run the update
144+
final List<Object> params = flattenPutParams(pkeys, pvalues);
145+
PreparedStatement ps = null;
146+
int i = 0;
147+
try {
148+
ps = connection.prepareStatement(queryBuilder.toString());
149+
for (final Object param : params) {
150+
ps.setObject(++i, param);
151+
}
152+
ps.execute();
153+
} catch (final SQLException ex) {
154+
logger.error("Multiput update failed", ex);
155+
} finally {
156+
if (ps != null) {
157+
try {
158+
ps.close();
159+
} catch (SQLException ex) {
160+
// don't care
161+
}
151162
}
152163
}
164+
logger.debug(String.format("%1$d keys flushed", pkeys.size()));
153165
}
154-
logger.debug(String.format("%1$d keys flushed", keys.size()));
155166
}
156167

157168
private String buildColumns() {

src/main/java/storm/trident/state/mysql/MysqlStateConfig.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,11 @@ public class MysqlStateConfig implements Serializable {
1212
private StateType type = StateType.OPAQUE;
1313
private String[] keyColumns;
1414
private String[] valueColumns;
15-
private int cacheSize = DEFAULT;
15+
private int batchSize = DEFAULT_BATCH_SIZE;
16+
private int cacheSize = DEFAULT_CACHE_SIZE;
1617

17-
private static final int DEFAULT = 5000;
18+
private static final int DEFAULT_CACHE_SIZE = 5000;
19+
private static final int DEFAULT_BATCH_SIZE = 5000;
1820

1921
public String getUrl() {
2022
return url;
@@ -64,4 +66,12 @@ public void setValueColumns(String[] valueColumns) {
6466
this.valueColumns = valueColumns;
6567
}
6668

69+
public int getBatchSize() {
70+
return batchSize;
71+
}
72+
73+
public void setBatchSize(int batchSize) {
74+
this.batchSize = batchSize;
75+
}
76+
6777
}

src/test/java/storm/trident/mysql/MysqlStateTopology.java

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -65,12 +65,20 @@ public Fields getOutputFields() {
6565
}
6666

6767
@SuppressWarnings("serial")
68-
static class LoggingFilter extends BaseFilter {
68+
static class ThroughputLoggingFilter extends BaseFilter {
6969

70-
private static final Logger logger = Logger.getLogger(LoggingFilter.class);
70+
private static final Logger logger = Logger.getLogger(ThroughputLoggingFilter.class);
71+
private long count = 0;
72+
private Long start = System.nanoTime();
73+
private Long last = System.nanoTime();
7174

7275
public boolean isKeep(final TridentTuple tuple) {
73-
logger.info(tuple);
76+
count += 1;
77+
final long now = System.nanoTime();
78+
if (now - last > 5000000000L) { // emit every 5 seconds
79+
logger.info("tuples per second = " + (count * 1000000000L) / (now - start));
80+
last = now;
81+
}
7482
return true;
7583
}
7684
}
@@ -101,9 +109,9 @@ public List<Number> zero() {
101109
* @param args
102110
*/
103111
public static void main(final String[] args) {
104-
final String dburl = "jdbc:mysql://mysql1:3306/storm_test?user=???&password=???";
112+
final String dburl = "jdbc:mysql://mysql1:3306/storm_test?user=pmp&password=pmp";
105113
final TridentTopology topology = new TridentTopology();
106-
final GroupedStream stream = topology.newStream("test", new RandomTupleSpout()).groupBy(new Fields("a"));
114+
final GroupedStream stream = topology.newStream("test", new RandomTupleSpout()).each(new Fields(), new ThroughputLoggingFilter()).groupBy(new Fields("a"));
107115
final MysqlStateConfig config = new MysqlStateConfig();
108116
{
109117
config.setUrl(dburl);

0 commit comments

Comments
 (0)