diff --git a/.gitignore b/.gitignore index a57f2ff..03f2482 100644 --- a/.gitignore +++ b/.gitignore @@ -23,20 +23,28 @@ ltmain.sh install-sh depcomp missing +compile +test-driver Makefile Makefile.in Doxyfile +doxy/ +doxyfile.stamp src/.deps src/.libs src/*.o src/*.lo src/*.la +src/bench src/testhash src/testsystem src/testutil src/testvlq src/sparkey src/test.sp* +src/*.log +src/*.trs + diff --git a/.travis.yml b/.travis.yml index 6786805..b791f22 100644 --- a/.travis.yml +++ b/.travis.yml @@ -2,6 +2,6 @@ language: c before_install: - sudo apt-get update -qq - - sudo apt-get install -qq libsnappy-dev + - sudo apt-get install -qq libsnappy-dev libzstd-dev script: autoreconf --install && ./configure && make && make check diff --git a/README.md b/README.md index 3befeb2..4459d34 100644 --- a/README.md +++ b/README.md @@ -4,11 +4,16 @@ library for working with sparkey index and log files (`libsparkey`), and a command line utility for getting info about and reading values from a sparkey index/log (`sparkey`). +### Travis +Continuous integration with [travis](https://travis-ci.org/spotify/sparkey). + +[![Build Status](https://travis-ci.org/spotify/sparkey.svg?branch=master)](https://travis-ci.org/spotify/sparkey) + Dependencies ------------ * GNU build system (autoconf, automake, libtool) -* [Snappy](https://code.google.com/p/snappy/) +* [Snappy](http://google.github.io/snappy/) Optional @@ -16,7 +21,6 @@ Optional Building -------- - autoreconf --install ./configure make @@ -26,8 +30,18 @@ API documentation can be generated with `doxygen`. Installing ---------- + sudo make install && sudo ldconfig - make install +Related projects +---------------- + +* [spotify/sparkey-python: Official python bindings](https://github.com/spotify/sparkey-python) +* [spotify/sparkey-java: Official java implementation](https://github.com/spotify/sparkey-java) +* [emnl/gnista: Unofficial ruby bindings](https://github.com/emnl/gnista) +* [adamtanner/sparkey: Unofficial ruby bindings](https://github.com/adamtanner/sparkey) +* [stephenmathieson/node-sparkey: Unofficial node bindings](https://github.com/stephenmathieson/node-sparkey) +* [tiegz/sparkey-go: Unofficial go bindings](https://github.com/tiegz/sparkey-go) +* [dflemstr/sparkey-rs: Unofficial rust bindings](https://github.com/dflemstr/sparkey-rs) Description ------------ @@ -62,6 +76,14 @@ users or other services. The fast and efficient bulk writes makes it feasible to and the fast random access reads makes it suitable for high throughput low latency services. For some services we have been able to saturate network interfaces while keeping cpu usage really low. +Limitations +----------- +The hash writing process requires memory allocation of num_entries * 16 * 1.3 bytes. +This means that you may run out of memory if trying to write a hash index for too many entries. +For instance, with 16 GB available RAM you may write 825 million entries. + +This limitation has been removed in sparkey-java, but it has not yet been implemented in this version. + Usage ----- Sparkey is meant to be used as a library embedded in other software. Take a look at the API documentation which gives examples on how to use it. @@ -196,8 +218,7 @@ Running it on a production-like server (Intel(R) Xeon(R) CPU L5630 @ 2.13GHz) we File format details ------------------- -Log file format ---------------- +### Log file format The contents of the log file starts with a constant size header, describing some metadata about the log file. After that is just a sequence of entries, where each entry consists of a type, key and a value. @@ -207,8 +228,7 @@ If A > 0, it's a PUT and the key length is A - 1, and the value length is B. (It gets slightly more complex if block level compression is used, but we'll ignore that for now.) -Hash file format ----------------- +### Hash file format The contents of the hash file starts with a constant size header, similarly to the log file. The rest of the file is a hash table, represented as capacity * slotsize bytes. The capacity is simply an upper bound of the number of live entries multiplied by a hash density factor > 1.0. @@ -223,6 +243,8 @@ Hash lookup algorithm ---------------------- One of few non-trivial parts in Sparkey is the way it does hash lookups. With hashtables there is always a risk of collisions. Even if the hash itself may not collide, the assigned slots may. +(It recently came to my attention that the method described below is basically the same thing as Robin Hood hashing with backward shift deletion) + Let's define displacement as the distance from the calculated optimal slot for a given hash to the slot it's actually placed in. Distance in this case is defined as the number of steps you need to move forward from your optimal slot to reach the actual slot. The trivial and naive solution for this is to simply start with an empty hash table, move through the entries and put them in the first available slot, starting from the optimal slot, and this is almost what we do. @@ -234,7 +256,123 @@ If we consider the average displacement, we can't really do better than that. We It's very easy to set up the hash table like this, we just need to do insertions into slots instead of appends. As soon as we reach a slot with a smaller displacement than our own, we shift the following slots up until the first empty slot one step and insert our own element. +Let's illustrate it with an example - let's start off with an empty hash table with a capacity of 7: + + hash value log offset optimal slot displacement + +------------+------------+ + slot 0 | | | + +------------+------------+ + slot 1 | | | + +------------+------------+ + slot 2 | | | + +------------+------------+ + slot 3 | | | + +------------+------------+ + slot 4 | | | + +------------+------------+ + slot 5 | | | + +------------+------------+ + slot 6 | | | + +------------+------------+ + +We add the key "key0" which happens to end up in slot 3, h("key0") % 7 == 3. The slot is empty, so this is trivial: + + hash value log offset optimal slot displacement + +------------+------------+ + slot 0 | | | + +------------+------------+ + slot 1 | | | + +------------+------------+ + slot 2 | | | + +------------+------------+ + slot 3 | h(key0) | 1 | 3 0 + +------------+------------+ + slot 4 | | | + +------------+------------+ + slot 5 | | | + +------------+------------+ + slot 6 | | | + +------------+------------+ + +Now we add "key1" which happens to end up in slot 4: + + hash value log offset optimal slot displacement + +------------+------------+ + slot 0 | | | + +------------+------------+ + slot 1 | | | + +------------+------------+ + slot 2 | | | + +------------+------------+ + slot 3 | h(key0) | 1 | 3 0 + +------------+------------+ + slot 4 | h(key1) | 11 | 4 0 + +------------+------------+ + slot 5 | | | + +------------+------------+ + slot 6 | | | + +------------+------------+ + +Now we add "key2" which also wants to be in slot 3. This is a conflict, so we skip forward until we found a slot which has a lower displacement than our current displacement. +When we find that slot, all following entries until the next empty slot move down one step: + + hash value log offset optimal slot displacement + +------------+------------+ + slot 0 | | | + +------------+------------+ + slot 1 | | | + +------------+------------+ + slot 2 | | | + +------------+------------+ + slot 3 | h(key0) | 1 | 3 0 + +------------+------------+ + slot 4 | h(key2) | 21 | 3 1 + +------------+------------+ + slot 5 | h(key1) | 11 | 4 1 + +------------+------------+ + slot 6 | | | + +------------+------------+ + +Let's add "key3" which maps to slot 5. We can't push down key1, because it already has displacement 1 and our current displacement for key3 is 0, so we have to move forward: + + hash value log offset optimal slot displacement + +------------+------------+ + slot 0 | | | + +------------+------------+ + slot 1 | | | + +------------+------------+ + slot 2 | | | + +------------+------------+ + slot 3 | h(key0) | 1 | 3 0 + +------------+------------+ + slot 4 | h(key2) | 21 | 3 1 + +------------+------------+ + slot 5 | h(key1) | 11 | 4 1 + +------------+------------+ + slot 6 | h(key3) | 31 | 5 1 + +------------+------------+ +Adding "key4" for slot 3. It ends up in slot 5 with displacement 2 and key3 loops around to slot 0: + + hash value log offset optimal slot displacement + +------------+------------+ + slot 0 | key(key3) | 31 | 5 2 + +------------+------------+ + slot 1 | | | + +------------+------------+ + slot 2 | | | + +------------+------------+ + slot 3 | h(key0) | 1 | 3 0 + +------------+------------+ + slot 4 | h(key2) | 21 | 3 1 + +------------+------------+ + slot 5 | h(key4) | 41 | 3 2 + +------------+------------+ + slot 6 | h(key1) | 11 | 4 2 + +------------+------------+ + +Now, if we search for key123 which maps to slot 3 (but doesn't exist!), we can stop scanning as soon as we reach slot 6, because then the current displacement (3) is higher than the displacement of the entry at the current slot (2). + Compression ----------- -Sparkey also supports block level compression using google snappy. You select a block size which is then used to split the contents of the log into blocks. Each block is compressed independently with snappy. This can be useful if your bottleneck is file size and there is a lot of redundant data across adjacent entries. The downside of using this is that during lookups, at least one block needs to be decompressed. The larger blocks you choose, the better compression you may get, but you will also have higher lookup cost. This is a tradeoff that needs to be emperically evaluated for each use case. +Sparkey also supports block level compression using google snappy. You select a block size which is then used to split the contents of the log into blocks. Each block is compressed independently with snappy. This can be useful if your bottleneck is file size and there is a lot of redundant data across adjacent entries. The downside of using this is that during lookups, at least one block needs to be decompressed. The larger blocks you choose, the better compression you may get, but you will also have higher lookup cost. This is a tradeoff that needs to be empirically evaluated for each use case. diff --git a/configure.ac b/configure.ac index 6213be8..6518c1e 100644 --- a/configure.ac +++ b/configure.ac @@ -18,8 +18,15 @@ AM_CONDITIONAL([HAVE_DOXYGEN], AC_SEARCH_LIBS([snappy_compress], [snappy],,[AC_MSG_ERROR([Could not find snappy]) ]) -AC_SEARCH_LIBS([clock_gettime], - [rt],,[AC_MSG_ERROR([Could not find rt]) +AC_SEARCH_LIBS([ZSTD_compress], + [zstd],,[AC_MSG_ERROR([Could not find zstd]) +]) + +AM_CONDITIONAL([NOT_APPLE], [test x$build_vendor != xapple]) +AM_COND_IF([NOT_APPLE], [ + AC_SEARCH_LIBS([clock_gettime], + [rt],,[AC_MSG_ERROR([Could not find rt]) + ]) ]) AC_CONFIG_FILES([ Makefile diff --git a/debian/changelog b/debian/changelog index a994f05..ba6109d 100644 --- a/debian/changelog +++ b/debian/changelog @@ -1,3 +1,9 @@ +sparkey (0.2.0) unstable; urgency=low + + * Make sparkey appendlog only split each input line once on the delimiter + + -- Kristofer Karlsson Tue, 18 Feb 2014 15:38:58 +0100 + sparkey (0.1.0) unstable; urgency=low * Initial release. diff --git a/src/Makefile.am b/src/Makefile.am index 29bb685..d9a79ca 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -6,7 +6,7 @@ MurmurHash3.h buf.c hashalgorithms.c hashiter.c hashwriter.c \ logreader.c returncodes.c util.c buf.h hashalgorithms.h hashiter.h \ sparkey.h util.h endiantools.c \ hashheader.c hashreader.c logheader.c logwriter.c MurmurHash3.c \ -sparkey-internal.h +sparkey-internal.h compress.c pkginclude_HEADERS = sparkey.h diff --git a/src/bench.c b/src/bench.c index 2d71fb9..c411960 100644 --- a/src/bench.c +++ b/src/bench.c @@ -18,7 +18,7 @@ #include #include #include -#include +#include #include #include #include @@ -106,17 +106,41 @@ static size_t total_file_size(const char** files) { } } -float wall() { +#ifdef __APPLE__ +#include +static float wall() { + static double multiplier = 0; + if (multiplier <= 0) { + mach_timebase_info_data_t info; + mach_timebase_info(&info); + multiplier = (double) info.numer / (double) info.denom / 1000000000.0; + } + return (float) (multiplier * mach_absolute_time()); +} +static float cpu() { + return wall(); +} + +#else + +#include +#ifdef CLOCK_MONOTONIC_RAW +#define CLOCK_SUITABLE CLOCK_MONOTONIC_RAW +#else +#define CLOCK_SUITABLE CLOCK_MONOTONIC +#endif +static float wall() { struct timespec tp; - clock_gettime(CLOCK_MONOTONIC, &tp); + clock_gettime(CLOCK_SUITABLE, &tp); return tp.tv_sec + 1e-9 * tp.tv_nsec; } -float cpu() { +static float cpu() { struct timespec tp; clock_gettime(CLOCK_PROCESS_CPUTIME_ID, &tp); return tp.tv_sec + 1e-9 * tp.tv_nsec; } +#endif typedef struct { char *name; @@ -157,6 +181,9 @@ static void sparkey_randomaccess(int n, int lookups) { sparkey_hashreader *myreader; sparkey_logiter *myiter; sparkey_assert(sparkey_hash_open(&myreader, "test.spi", "test.spl")); + + printf(" Number of hash collisions: %"PRIu64"\n", sparkey_hash_numcollisions(myreader)); + sparkey_logreader *logreader = sparkey_hash_getreader(myreader); sparkey_assert(sparkey_logiter_create(&myiter, logreader)); @@ -190,11 +217,15 @@ static void sparkey_create_uncompressed(int n) { sparkey_create(n, SPARKEY_COMPRESSION_NONE, 0); } -static void sparkey_create_compressed(int n) { - sparkey_create(n, SPARKEY_COMPRESSION_SNAPPY, 1024); +static void sparkey_create_snappy(int n) { + sparkey_create(n, SPARKEY_COMPRESSION_SNAPPY, 4 * 1024); +} + +static void sparkey_create_zstd(int n) { + sparkey_create(n, SPARKEY_COMPRESSION_ZSTD, 4 * 1024); } -static const char* sparkey_list[] = {"test.spi", "test.spl", NULL}; +static const char* sparkey_list[] = {"test.spi", "test.spl", NULL}; static const char** sparkey_files() { return sparkey_list; @@ -204,8 +235,12 @@ static candidate sparkey_candidate_uncompressed = { "Sparkey uncompressed", &sparkey_create_uncompressed, &sparkey_randomaccess, &sparkey_files }; -static candidate sparkey_candidate_compressed = { - "Sparkey compressed(1024)", &sparkey_create_compressed, &sparkey_randomaccess, &sparkey_files +static candidate sparkey_candidate_snappy = { + "Sparkey snappy(4K)", &sparkey_create_snappy, &sparkey_randomaccess, &sparkey_files +}; + +static candidate sparkey_candidate_zstd = { + "Sparkey zstd(4K)", &sparkey_create_zstd, &sparkey_randomaccess, &sparkey_files }; /* main */ @@ -217,7 +252,7 @@ void test(candidate *c, int n, int lookups) { rm_all_rec(c->files()); float t1_wall = wall(); - float t1_cpu = cpu(); + float t1_cpu = cpu(); c->create(n); @@ -226,7 +261,7 @@ void test(candidate *c, int n, int lookups) { printf(" creation time (wall): %2.2f\n", t2_wall - t1_wall); printf(" creation time (cpu): %2.2f\n", t2_cpu - t1_cpu); printf(" throughput (puts/cpusec): %2.2f\n", (float) n / (t2_cpu - t1_cpu)); - printf(" file size: %ld\n", total_file_size(c->files())); + printf(" file size: %zu\n", total_file_size(c->files())); c->randomaccess(n, lookups); @@ -246,10 +281,15 @@ int main() { test(&sparkey_candidate_uncompressed, 10*1000*1000, 1*1000*1000); test(&sparkey_candidate_uncompressed, 100*1000*1000, 1*1000*1000); - test(&sparkey_candidate_compressed, 1000, 1*1000*1000); - test(&sparkey_candidate_compressed, 1000*1000, 1*1000*1000); - test(&sparkey_candidate_compressed, 10*1000*1000, 1*1000*1000); - test(&sparkey_candidate_compressed, 100*1000*1000, 1*1000*1000); + test(&sparkey_candidate_snappy, 1000, 1*1000*1000); + test(&sparkey_candidate_snappy, 1000*1000, 1*1000*1000); + test(&sparkey_candidate_snappy, 10*1000*1000, 1*1000*1000); + test(&sparkey_candidate_snappy, 100*1000*1000, 1*1000*1000); + + test(&sparkey_candidate_zstd, 1000, 1*1000*1000); + test(&sparkey_candidate_zstd, 1000*1000, 1*1000*1000); + test(&sparkey_candidate_zstd, 10*1000*1000, 1*1000*1000); + test(&sparkey_candidate_zstd, 100*1000*1000, 1*1000*1000); return 0; } diff --git a/src/compress.c b/src/compress.c new file mode 100644 index 0000000..d2c38ab --- /dev/null +++ b/src/compress.c @@ -0,0 +1,94 @@ +/* +* Copyright (c) 2012-2020 Spotify AB +* +* Licensed under the Apache License, Version 2.0 (the "License"); you may not +* use this file except in compliance with the License. You may obtain a copy of +* the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +* License for the specific language governing permissions and limitations under +* the License. +*/ + +#include "sparkey-internal.h" +#include "sparkey.h" + +#include +#include + + +static uint32_t sparkey_snappy_max_compressed_size(uint32_t block_size) { + return snappy_max_compressed_length(block_size); +} + +static sparkey_returncode sparkey_snappy_decompress(uint8_t *input, uint32_t compressed_size, uint8_t *output, uint32_t *uncompressed_size) { + size_t rsize = *uncompressed_size; + snappy_status status = snappy_uncompress((char *) input, compressed_size, (char *) output, &rsize); + *uncompressed_size = rsize; + if (status == SNAPPY_OK) { + return SPARKEY_SUCCESS; + } + return SPARKEY_INTERNAL_ERROR; +} + +static sparkey_returncode sparkey_snappy_compress(uint8_t *input, uint32_t uncompressed_size, uint8_t *output, uint32_t *compressed_size) { + size_t rsize = *compressed_size; + snappy_status status = snappy_compress((char *) input, uncompressed_size, (char *) output, &rsize); + *compressed_size = rsize; + if (status == SNAPPY_OK) { + return SPARKEY_SUCCESS; + } + return SPARKEY_INTERNAL_ERROR; +} + +static uint32_t sparkey_zstd_max_compressed_size(uint32_t block_size) { + return ZSTD_compressBound(block_size); +} + +static sparkey_returncode sparkey_zstd_decompress(uint8_t *input, uint32_t compressed_size, uint8_t *output, uint32_t *uncompressed_size) { + size_t ret = ZSTD_decompress(output, *uncompressed_size, input, compressed_size); + if (ZSTD_isError(ret)) { + return SPARKEY_INTERNAL_ERROR; + } + *uncompressed_size = ret; + return SPARKEY_SUCCESS; +} + +static sparkey_returncode sparkey_zstd_compress(uint8_t *input, uint32_t uncompressed_size, uint8_t *output, uint32_t *compressed_size) { + size_t ret = ZSTD_compress(output, *compressed_size, input, uncompressed_size, 3); + if (ZSTD_isError(ret)) { + return SPARKEY_INTERNAL_ERROR; + } + *compressed_size = ret; + return SPARKEY_SUCCESS; +} + +struct sparkey_compressor sparkey_compressors[] = { + { + .max_compressed_size = NULL, + }, + { + .max_compressed_size = sparkey_snappy_max_compressed_size, + .decompress = sparkey_snappy_decompress, + .compress = sparkey_snappy_compress, + }, + { + .max_compressed_size = sparkey_zstd_max_compressed_size, + .decompress = sparkey_zstd_decompress, + .compress = sparkey_zstd_compress, + }, +}; + +int sparkey_uses_compressor(sparkey_compression_type t) { + switch (t) { + case SPARKEY_COMPRESSION_SNAPPY: + case SPARKEY_COMPRESSION_ZSTD: + return 1; + default: + return 0; + } +} diff --git a/src/endiantools.c b/src/endiantools.c index d095c4b..17eee63 100644 --- a/src/endiantools.c +++ b/src/endiantools.c @@ -33,20 +33,22 @@ #include "sparkey.h" static sparkey_returncode _write_full(int fd, uint8_t *buf, size_t count) { - ssize_t actual = write(fd, buf, count); - if (actual < 0) { - switch (errno) { - case ENOSPC: return SPARKEY_OUT_OF_DISK; - case EFBIG: return SPARKEY_FILE_SIZE_EXCEEDED; - case EBADF: return SPARKEY_FILE_CLOSED; - default: - printf("_write_full():%d bug: actual_read = %"PRIu64", wanted = %"PRIu64", errno = %d\n", __LINE__, (uint64_t)actual, (uint64_t)count, errno); - return SPARKEY_INTERNAL_ERROR; + while (count > 0) { + ssize_t actual = write(fd, buf, count); + if (actual < 0) { + switch (errno) { + case EINTR: + case EAGAIN: continue; + case ENOSPC: return SPARKEY_OUT_OF_DISK; + case EFBIG: return SPARKEY_FILE_SIZE_EXCEEDED; + case EBADF: return SPARKEY_FILE_CLOSED; + default: + fprintf(stderr, "_write_full():%d bug: actual_written = %"PRIu64", wanted = %"PRIu64", errno = %d\n", __LINE__, (uint64_t)actual, (uint64_t)count, errno); + return SPARKEY_INTERNAL_ERROR; + } } - } - if ((size_t) actual < count) { - printf("_write_full():%d bug: actual_read = %"PRIu64", wanted = %"PRIu64", errno = %d\n", __LINE__, (uint64_t)actual, (uint64_t)count, errno); - return SPARKEY_INTERNAL_ERROR; + count -= actual; + buf += actual; } return SPARKEY_SUCCESS; } diff --git a/src/hashreader.c b/src/hashreader.c index e62a7bb..23ee48e 100644 --- a/src/hashreader.c +++ b/src/hashreader.c @@ -39,6 +39,8 @@ sparkey_returncode sparkey_hash_open(sparkey_hashreader **reader_ref, const char return SPARKEY_INTERNAL_ERROR; } + reader->open_status = 0; + TRY(sparkey_load_hashheader(&reader->header, hash_filename), free_reader); TRY(sparkey_logreader_open_noalloc(&reader->log, log_filename), free_reader); if (reader->header.file_identifier != reader->log.header.file_identifier) { @@ -102,18 +104,18 @@ void sparkey_hash_close(sparkey_hashreader **reader_ref) { return; } - if (reader->open_status != MAGIC_VALUE_HASHREADER) { - return; - } sparkey_logreader_close_nodealloc(&reader->log); - reader->open_status = 0; - if (reader->data != NULL) { - munmap(reader->data, reader->data_len); - reader->data = NULL; + if (reader->open_status == MAGIC_VALUE_HASHREADER) { + reader->open_status = 0; + if (reader->data != NULL) { + munmap(reader->data, reader->data_len); + reader->data = NULL; + } + close(reader->fd); + reader->fd = -1; } - close(reader->fd); - reader->fd = -1; + free(reader); *reader_ref = NULL; } @@ -249,3 +251,7 @@ uint64_t sparkey_hash_numentries(sparkey_hashreader *reader) { return reader->header.num_entries; } +uint64_t sparkey_hash_numcollisions(sparkey_hashreader *reader) { + return reader->header.hash_collisions; +} + diff --git a/src/hashwriter.c b/src/hashwriter.c index be16f85..9a57ae2 100644 --- a/src/hashwriter.c +++ b/src/hashwriter.c @@ -101,7 +101,7 @@ static sparkey_returncode hash_delete(uint64_t wanted_slot, uint64_t hash, uint8 int entry_index2 = (int) (position2) & hash_header->entry_block_bitmask; position2 >>= hash_header->entry_block_bits; if (position2 < log->header.header_size || position2 >= log->header.data_end ) { - printf("hash_delete():%d bug: found pointer outside of range %"PRIu64"\n", __LINE__, position2); + fprintf(stderr, "hash_delete():%d bug: found pointer outside of range %"PRIu64"\n", __LINE__, position2); return SPARKEY_INTERNAL_ERROR; } if (hash == hash2) { @@ -111,7 +111,7 @@ static sparkey_returncode hash_delete(uint64_t wanted_slot, uint64_t hash, uint8 uint64_t keylen2 = ra_iter->keylen; uint64_t valuelen2 = ra_iter->valuelen; if (ra_iter->type != SPARKEY_ENTRY_PUT) { - printf("hash_delete():%d bug: expected a put entry but found %d\n", __LINE__, ra_iter->type); + fprintf(stderr, "hash_delete():%d bug: expected a put entry but found %d\n", __LINE__, ra_iter->type); return SPARKEY_INTERNAL_ERROR; } if (iter->keylen == keylen2) { @@ -162,7 +162,7 @@ static sparkey_returncode hash_delete(uint64_t wanted_slot, uint64_t hash, uint8 slot = 0; } } - printf("hash_put():%d bug: unreachable statement\n", __LINE__); + fprintf(stderr, "hash_put():%d bug: unreachable statement\n", __LINE__); return SPARKEY_INTERNAL_ERROR; } @@ -194,7 +194,7 @@ static sparkey_returncode hash_put(uint64_t wanted_slot, uint64_t hash, uint8_t uint64_t keylen2 = ra_iter->keylen; uint64_t valuelen2 = ra_iter->valuelen; if (ra_iter->type != SPARKEY_ENTRY_PUT) { - printf("hash_put():%d bug: expected a put entry but found %d\n", __LINE__, ra_iter->type); + fprintf(stderr, "hash_put():%d bug: expected a put entry but found %d\n", __LINE__, ra_iter->type); return SPARKEY_INTERNAL_ERROR; } if (iter->keylen == keylen2) { @@ -228,7 +228,7 @@ static sparkey_returncode hash_put(uint64_t wanted_slot, uint64_t hash, uint8_t slot = 0; } } - printf("hash_put():%d bug: unreachable statement\n", __LINE__); + fprintf(stderr, "hash_put():%d bug: unreachable statement\n", __LINE__); return SPARKEY_INTERNAL_ERROR; } @@ -287,7 +287,7 @@ static sparkey_returncode read_fully(int fd, uint8_t *buf, size_t count) { while (count > 0) { ssize_t actual_read = read(fd, buf, count); if (actual_read < 0) { - printf("read_fully():%d bug: actual_read = %"PRIu64", errno = %d\n", __LINE__, (uint64_t)actual_read, errno); + fprintf(stderr, "read_fully():%d bug: actual_read = %"PRIu64", errno = %d\n", __LINE__, (uint64_t)actual_read, errno); return SPARKEY_INTERNAL_ERROR; } count -= actual_read; @@ -324,7 +324,7 @@ static sparkey_returncode fill_hash(uint8_t *hashtable, const char *hash_filenam uint64_t buffer_size = slot_size * 1024; uint8_t *buf = malloc(buffer_size); if (buf == NULL) { - printf("fill_hash():%d bug: could not malloc %"PRIu64" bytes\n", __LINE__, buffer_size); + fprintf(stderr, "fill_hash():%d bug: could not malloc %"PRIu64" bytes\n", __LINE__, buffer_size); return SPARKEY_INTERNAL_ERROR; } @@ -342,7 +342,7 @@ static sparkey_returncode fill_hash(uint8_t *hashtable, const char *hash_filenam free(buf); if (close(fd) < 0) { if (returncode == SPARKEY_SUCCESS) { - printf("fill_hash():%d bug: could not close file. errno = %d\n", __LINE__, errno); + fprintf(stderr, "fill_hash():%d bug: could not close file. errno = %d\n", __LINE__, errno); returncode = SPARKEY_INTERNAL_ERROR; } } @@ -370,6 +370,7 @@ sparkey_returncode sparkey_hash_write(const char *hash_filename, const char *log uint64_t start; uint32_t hash_seed; int copy_old; + uint32_t old_hash_size = 0; returncode = sparkey_load_hashheader(&old_header, hash_filename); if (returncode == SPARKEY_SUCCESS && old_header.file_identifier == log_header.file_identifier && @@ -382,10 +383,7 @@ sparkey_returncode sparkey_hash_write(const char *hash_filename, const char *log hash_header.garbage_size = old_header.garbage_size; copy_old = 1; - if (old_header.data_end == log->header.data_end) { - // Nothing needs to be done - just exit - goto close_iter; - } + old_hash_size = old_header.hash_size; } else { cap = log_header.num_puts * 1.3; start = log_header.header_size; @@ -411,7 +409,7 @@ sparkey_returncode sparkey_hash_write(const char *hash_filename, const char *log } else { hash_header.address_size = 8; } - if (old_header.hash_size == 8 || hash_header.hash_capacity >= (1 << 23)) { + if (old_hash_size == 8 || hash_header.hash_capacity >= (1 << 23)) { hash_header.hash_size = 8; } else { hash_header.hash_size = 4; @@ -424,7 +422,7 @@ sparkey_returncode sparkey_hash_write(const char *hash_filename, const char *log goto close_iter; } } - if (hash_header.hash_size != old_header.hash_size) { + if (hash_header.hash_size != old_hash_size) { copy_old = 0; } hash_header.hash_algorithm = sparkey_get_hash_algorithm(hash_header.hash_size); @@ -433,7 +431,7 @@ sparkey_returncode sparkey_hash_write(const char *hash_filename, const char *log uint64_t hashsize = slot_size * hash_header.hash_capacity; uint8_t *hashtable = malloc(hashsize); if (hashtable == NULL) { - printf("sparkey_hash_write():%d bug: could not malloc %"PRIu64" bytes\n", __LINE__, hashsize); + fprintf(stderr, "sparkey_hash_write():%d bug: could not malloc %"PRIu64" bytes\n", __LINE__, hashsize); returncode = SPARKEY_INTERNAL_ERROR; goto close_iter; } @@ -445,6 +443,10 @@ sparkey_returncode sparkey_hash_write(const char *hash_filename, const char *log hash_header.hash_collisions = 0; if (copy_old) { + if (old_header.data_end == log->header.data_end) { + // Nothing needs to be done - just exit + goto close_iter; + } TRY(fill_hash(hashtable, hash_filename, &old_header, &hash_header), free_hashtable); TRY(sparkey_logiter_seek(iter, log, start), free_hashtable); } @@ -458,7 +460,7 @@ sparkey_returncode sparkey_hash_write(const char *hash_filename, const char *log case SPARKEY_ITER_ACTIVE: break; default: - printf("sparkey_hash_write():%d bug: invalid iter state: %d\n", __LINE__, iter->state); + fprintf(stderr, "sparkey_hash_write():%d bug: invalid iter state: %d\n", __LINE__, iter->state); returncode = SPARKEY_INTERNAL_ERROR; goto free_hashtable; break; diff --git a/src/logheader.c b/src/logheader.c index 2a2e6a1..ff472c9 100644 --- a/src/logheader.c +++ b/src/logheader.c @@ -22,7 +22,7 @@ #include "endiantools.h" #include "util.h" -static char * compression_types[] = { "Uncompressed", "Snappy", NULL }; +static char * compression_types[] = { "Uncompressed", "Snappy", "Zstd", NULL }; void print_logheader(sparkey_logheader *header) { printf("Log file version %d.%d\n", header->major_version, @@ -59,7 +59,7 @@ static sparkey_returncode logheader_version0(sparkey_logheader *header, FILE *fp if (header->num_deletes > header->data_end) { return SPARKEY_LOG_HEADER_CORRUPT; } - if (header->compression_type > SPARKEY_COMPRESSION_SNAPPY) { + if (header->compression_type > SPARKEY_COMPRESSION_ZSTD) { return SPARKEY_LOG_HEADER_CORRUPT; } return SPARKEY_SUCCESS; diff --git a/src/logreader.c b/src/logreader.c index b31b091..46f293b 100644 --- a/src/logreader.c +++ b/src/logreader.c @@ -21,8 +21,6 @@ #include #include -#include - #include "sparkey.h" #include "sparkey-internal.h" #include "logheader.h" @@ -168,21 +166,15 @@ sparkey_returncode sparkey_logiter_create(sparkey_logiter **iter_ref, sparkey_lo iter->block_len = 0; iter->state = SPARKEY_ITER_NEW; - switch (log->header.compression_type) { - case SPARKEY_COMPRESSION_NONE: - iter->compression_buf_allocated = 0; - break; - case SPARKEY_COMPRESSION_SNAPPY: + if (sparkey_uses_compressor(log->header.compression_type)) { iter->compression_buf_allocated = 1; iter->compression_buf = malloc(log->header.compression_block_size); if (iter->compression_buf == NULL) { free(iter); return SPARKEY_INTERNAL_ERROR; } - break; - default: - free(iter); - return SPARKEY_INTERNAL_ERROR; + } else { + iter->compression_buf_allocated = 0; } *iter_ref = iter; @@ -214,38 +206,29 @@ static sparkey_returncode seekblock(sparkey_logiter *iter, sparkey_logreader *lo if (iter->block_position == position) { return SPARKEY_SUCCESS; } - if (log->header.compression_type == SPARKEY_COMPRESSION_NONE) { - iter->compression_buf = &log->data[position]; - iter->block_position = position; - iter->next_block_position = log->header.data_end; - iter->block_len = log->data_len - position; - return SPARKEY_SUCCESS; - } - if (log->header.compression_type == SPARKEY_COMPRESSION_SNAPPY) { + if (sparkey_uses_compressor(log->header.compression_type)) { uint64_t pos = position; - // TODO: assert that size_t >= uint64_t - size_t compressed_size = read_vlq(log->data, &pos); + // TODO: assert that we're not reading > uint32_t + uint32_t compressed_size = read_vlq(log->data, &pos); uint64_t next_pos = pos + compressed_size; - const char *input = (char *) &log->data[pos]; + uint32_t uncompressed_size = log->header.compression_block_size; - size_t uncompressed_size = log->header.compression_block_size; - snappy_status status = snappy_uncompress(input, compressed_size, (char *) iter->compression_buf, &uncompressed_size); - switch (status) { - case SNAPPY_OK: break; - case SNAPPY_INVALID_INPUT: - return SPARKEY_INTERNAL_ERROR; - case SNAPPY_BUFFER_TOO_SMALL: - return SPARKEY_INTERNAL_ERROR; - default: - return SPARKEY_INTERNAL_ERROR; + sparkey_returncode ret = sparkey_compressors[log->header.compression_type].decompress( + &log->data[pos], compressed_size, iter->compression_buf, &uncompressed_size); + if (ret != SPARKEY_SUCCESS) { + return ret; } + iter->block_position = position; iter->next_block_position = next_pos; iter->block_len = uncompressed_size; - return SPARKEY_SUCCESS; + } else { + iter->compression_buf = &log->data[position]; + iter->block_position = position; + iter->next_block_position = log->header.data_end; + iter->block_len = log->data_len - position; } - - return SPARKEY_INTERNAL_ERROR; + return SPARKEY_SUCCESS; } sparkey_returncode sparkey_logiter_seek(sparkey_logiter *iter, sparkey_logreader *log, uint64_t position) { @@ -485,6 +468,14 @@ uint64_t sparkey_logreader_maxvaluelen(sparkey_logreader *log) { return log->header.max_value_len; } +int sparkey_logreader_get_compression_blocksize(sparkey_logreader *log) { + return log->header.compression_block_size; +} + +sparkey_compression_type sparkey_logreader_get_compression_type(sparkey_logreader *log) { + return log->header.compression_type; +} + sparkey_iter_state sparkey_logiter_state(sparkey_logiter *iter) { return iter->state; } diff --git a/src/logwriter.c b/src/logwriter.c index 56ddd8f..a746532 100644 --- a/src/logwriter.c +++ b/src/logwriter.c @@ -20,8 +20,6 @@ #include #include -#include - #include "util.h" #include "sparkey.h" #include "logheader.h" @@ -51,49 +49,45 @@ static sparkey_returncode assert_writer_open(sparkey_logwriter *log) { return SPARKEY_SUCCESS; } -sparkey_returncode sparkey_logwriter_create(sparkey_logwriter **log, const char *filename, sparkey_compression_type compression_type, int compression_block_size) { - *log = malloc(sizeof(sparkey_logwriter)); - if (*log == NULL) { - return SPARKEY_INTERNAL_ERROR; +sparkey_returncode sparkey_logwriter_create(sparkey_logwriter **log_ref, const char *filename, sparkey_compression_type compression_type, int compression_block_size) { + sparkey_returncode returncode; + int fd = 0; + sparkey_logwriter *l = malloc(sizeof(sparkey_logwriter)); + if (l == NULL) { + TRY(SPARKEY_INTERNAL_ERROR, error); } - sparkey_logwriter *l = *log; - switch (compression_type) { - case SPARKEY_COMPRESSION_NONE: - compression_block_size = 0; - l->compressed = NULL; - break; - case SPARKEY_COMPRESSION_SNAPPY: + if (sparkey_uses_compressor(compression_type)) { if (compression_block_size < 10) { - return SPARKEY_INVALID_COMPRESSION_BLOCK_SIZE; + TRY(SPARKEY_INVALID_COMPRESSION_BLOCK_SIZE, error); } - l->max_compressed_size = snappy_max_compressed_length(compression_block_size); + l->max_compressed_size = sparkey_compressors[compression_type].max_compressed_size(compression_block_size); l->compressed = malloc(l->max_compressed_size); if (l->compressed == NULL) { - return SPARKEY_INTERNAL_ERROR; + TRY(SPARKEY_INTERNAL_ERROR, error); } - break; - default: - return SPARKEY_INVALID_COMPRESSION_TYPE; + } else { + compression_block_size = 0; + l->compressed = NULL; } // Try removing it first, to avoid overwriting existing files that readers may be using. if (remove(filename) < 0) { int e = errno; if (e != ENOENT) { - return sparkey_remove_returncode(e); + TRY(sparkey_remove_returncode(e), error); } } - int fd = open(filename, O_WRONLY | O_TRUNC | O_CREAT, 00644); + fd = open(filename, O_WRONLY | O_TRUNC | O_CREAT, 00644); if (fd == -1) { - return sparkey_create_returncode(errno); + TRY(sparkey_create_returncode(errno), error); } l->fd = fd; l->header.compression_block_size = compression_block_size; l->header.compression_type = compression_type; - RETHROW(rand32(&(l->header.file_identifier))); + TRY(rand32(&(l->header.file_identifier)), error); l->header.data_end = LOG_HEADER_SIZE; l->header.major_version = LOG_MAJOR_VERSION; l->header.minor_version = LOG_MINOR_VERSION; @@ -105,69 +99,78 @@ sparkey_returncode sparkey_logwriter_create(sparkey_logwriter **log, const char l->header.max_key_len = 0; l->header.max_value_len = 0; - RETHROW(write_logheader(fd, &l->header)); + TRY(write_logheader(fd, &l->header), error); off_t pos = lseek(fd, 0, SEEK_CUR); if (pos != LOG_HEADER_SIZE) { - return SPARKEY_INTERNAL_ERROR; + TRY(SPARKEY_INTERNAL_ERROR, error); } - RETHROW(buf_init(&l->file_buf, 1024*1024)); - RETHROW(buf_init(&l->block_buf, compression_block_size)); + TRY(buf_init(&l->file_buf, 1024*1024), error); + TRY(buf_init(&l->block_buf, compression_block_size), error); l->entry_count = 0; l->open_status = MAGIC_VALUE_LOGWRITER; + *log_ref = l; return SPARKEY_SUCCESS; +error: + free(l); + if (fd > 0) close(fd); + return returncode; } -sparkey_returncode sparkey_logwriter_append(sparkey_logwriter *log, const char *filename) { - if (log->open_status == MAGIC_VALUE_LOGWRITER) { - RETHROW(sparkey_logwriter_close(&log)); +sparkey_returncode sparkey_logwriter_append(sparkey_logwriter **log_ref, const char *filename) { + sparkey_returncode returncode; + int fd = 0; + sparkey_logwriter *log = malloc(sizeof(sparkey_logwriter)); + if (log == NULL) { + TRY(SPARKEY_INTERNAL_ERROR, error); } - RETHROW(sparkey_load_logheader(&log->header, filename)); + TRY(sparkey_load_logheader(&log->header, filename), error); if (log->header.major_version != LOG_MAJOR_VERSION) { - return SPARKEY_WRONG_LOG_MAJOR_VERSION; + TRY(SPARKEY_WRONG_LOG_MAJOR_VERSION, error); } if (log->header.minor_version != LOG_MINOR_VERSION) { - return SPARKEY_UNSUPPORTED_LOG_MINOR_VERSION; + TRY(SPARKEY_UNSUPPORTED_LOG_MINOR_VERSION, error); } - switch (log->header.compression_type) { - case SPARKEY_COMPRESSION_NONE: - log->header.compression_block_size = 0; - log->compressed = NULL; - break; - case SPARKEY_COMPRESSION_SNAPPY: + if (sparkey_uses_compressor(log->header.compression_type)) { if (log->header.compression_block_size < 10) { - return SPARKEY_INVALID_COMPRESSION_BLOCK_SIZE; + TRY(SPARKEY_INVALID_COMPRESSION_BLOCK_SIZE, error); } - log->max_compressed_size = snappy_max_compressed_length(log->header.compression_block_size); + log->max_compressed_size = sparkey_compressors[log->header.compression_type].max_compressed_size( + log->header.compression_block_size); log->compressed = malloc(log->max_compressed_size); - break; - default: - return SPARKEY_INVALID_COMPRESSION_TYPE; + } else { + log->header.compression_block_size = 0; + log->compressed = NULL; } - int fd = open(filename, O_WRONLY, 00644); + fd = open(filename, O_WRONLY, 00644); if (fd == -1) { int e = errno; - return sparkey_create_returncode(e); + TRY(sparkey_create_returncode(e), error); } log->fd = fd; lseek(fd, log->header.data_end, SEEK_SET); - RETHROW(buf_init(&log->file_buf, 1024*1024)); - RETHROW(buf_init(&log->block_buf, log->header.compression_block_size)); + TRY(buf_init(&log->file_buf, 1024*1024), error); + TRY(buf_init(&log->block_buf, log->header.compression_block_size), error); log->entry_count = 0; log->open_status = MAGIC_VALUE_LOGWRITER; + *log_ref = log; return SPARKEY_SUCCESS; +error: + free(log); + if (fd > 0) close(fd); + return returncode; } -static sparkey_returncode flush_snappy(sparkey_logwriter *log) { +static sparkey_returncode flush_compressed(sparkey_logwriter *log) { log->flushed = 1; if (log->entry_count > (int) log->header.max_entries_per_block) { log->header.max_entries_per_block = log->entry_count; @@ -175,19 +178,16 @@ static sparkey_returncode flush_snappy(sparkey_logwriter *log) { log->entry_count = 0; sparkey_buf *block_buf = &log->block_buf; uint8_t *compressed = log->compressed; - uint32_t max_compressed_size = log->max_compressed_size; + uint32_t compressed_size = log->max_compressed_size; sparkey_buf *file_buf = &log->file_buf; int fd = log->fd; - size_t compressed_size = max_compressed_size; - snappy_status status = snappy_compress((char *) block_buf->start, buf_used(block_buf), (char *) compressed, &compressed_size); - switch (status) { - case SNAPPY_OK: break; - case SNAPPY_INVALID_INPUT: - case SNAPPY_BUFFER_TOO_SMALL: - default: - return SPARKEY_INTERNAL_ERROR; + sparkey_returncode ret = sparkey_compressors[log->header.compression_type].compress( + block_buf->start, buf_used(block_buf), compressed, &compressed_size); + if (ret != SPARKEY_SUCCESS) { + return ret; } + uint8_t buf1[10]; ptrdiff_t written1 = write_vlq(buf1, compressed_size); RETHROW(buf_add(file_buf, fd, buf1, written1)); @@ -200,7 +200,7 @@ static sparkey_returncode flush_snappy(sparkey_logwriter *log) { sparkey_returncode sparkey_logwriter_flush(sparkey_logwriter *log) { RETHROW(assert_writer_open(log)); if (buf_used(&log->block_buf) > 0) { - RETHROW(flush_snappy(log)); + RETHROW(flush_compressed(log)); } if (buf_used(&log->file_buf) > 0) { RETHROW(buf_flushfile(&log->file_buf, log->fd)); @@ -236,7 +236,7 @@ sparkey_returncode sparkey_logwriter_close(sparkey_logwriter **log) { return SPARKEY_SUCCESS; } -static sparkey_returncode snappy_add(sparkey_logwriter *log, const uint8_t *data, ptrdiff_t len) { +static sparkey_returncode compressed_add(sparkey_logwriter *log, const uint8_t *data, ptrdiff_t len) { sparkey_buf *block_buf = &log->block_buf; while (1) { @@ -250,7 +250,7 @@ static sparkey_returncode snappy_add(sparkey_logwriter *log, const uint8_t *data block_buf->cur += remaining; data += remaining; len -= remaining; - RETHROW(flush_snappy(log)); + RETHROW(flush_compressed(log)); } } return SPARKEY_SUCCESS; @@ -266,33 +266,28 @@ static sparkey_returncode log_add(sparkey_logwriter *log, uint64_t num1, uint64_ *datasize = written1 + written2 + len1 + len2; uint64_t remaining; - switch (log->header.compression_type) { - case SPARKEY_COMPRESSION_NONE: - RETHROW(buf_add(&log->file_buf, log->fd, buf1, written1)); - RETHROW(buf_add(&log->file_buf, log->fd, buf2, written2)); - RETHROW(buf_add(&log->file_buf, log->fd, data1, len1)); - RETHROW(buf_add(&log->file_buf, log->fd, data2, len2)); - break; - case SPARKEY_COMPRESSION_SNAPPY: + if (sparkey_uses_compressor(log->header.compression_type)) { remaining = buf_remaining(&log->block_buf); // todo: make it smarter by checking if it's better to flush directly uint64_t fits_in_one = written1 + written2 + len1 + len2 <= buf_size(&log->block_buf); uint64_t doesnt_fit_this = written1 + written2 + len1 + len2 > buf_remaining(&log->block_buf); if ((remaining < written1 + written2) || (fits_in_one && doesnt_fit_this)) { - RETHROW(flush_snappy(log)); + RETHROW(flush_compressed(log)); } log->entry_count++; log->flushed = 0; - RETHROW(snappy_add(log, buf1, written1)); - RETHROW(snappy_add(log, buf2, written2)); - RETHROW(snappy_add(log, data1, len1)); - RETHROW(snappy_add(log, data2, len2)); + RETHROW(compressed_add(log, buf1, written1)); + RETHROW(compressed_add(log, buf2, written2)); + RETHROW(compressed_add(log, data1, len1)); + RETHROW(compressed_add(log, data2, len2)); if (log->flushed && buf_used(&log->block_buf) > 0) { - RETHROW(flush_snappy(log)); + RETHROW(flush_compressed(log)); } - break; - default: - return SPARKEY_INTERNAL_ERROR; + } else { + RETHROW(buf_add(&log->file_buf, log->fd, buf1, written1)); + RETHROW(buf_add(&log->file_buf, log->fd, buf2, written2)); + RETHROW(buf_add(&log->file_buf, log->fd, data1, len1)); + RETHROW(buf_add(&log->file_buf, log->fd, data2, len2)); } return SPARKEY_SUCCESS; } diff --git a/src/main.c b/src/main.c index b2c247f..64facc1 100644 --- a/src/main.c +++ b/src/main.c @@ -1,5 +1,5 @@ /* -* Copyright (c) 2012-2013 Spotify AB +* Copyright (c) 2012-2014 Spotify AB * * Licensed under the Apache License, Version 2.0 (the "License"); you may not * use this file except in compliance with the License. You may obtain a copy of @@ -17,37 +17,129 @@ #include #include #include +#include +#include #include "logheader.h" #include "hashheader.h" +#include "endiantools.h" +#include "util.h" #include "sparkey.h" -void usage() { - printf("Usage: sparkey \n"); - printf("Commands: info [file...]\n"); - printf("Commands: get \n"); +#define MINIMUM_CAPACITY (1<<8) +#define MAXIMUM_CAPACITY (1<<28) +#define COMP_DEFAULT_BLOCKSIZE (1<<12) +#define COMP_MAX_BLOCKSIZE (1<<30) +#define COMP_MIN_BLOCKSIZE (1<<4) + +static void usage() { + fprintf(stderr, "Usage: sparkey []\n"); + fprintf(stderr, "Commands:\n"); + fprintf(stderr, " info - Show information about sparkey files.\n"); + fprintf(stderr, " get - Get the value associated with a key.\n"); + fprintf(stderr, " writehash - Generate a hash file from a log file.\n"); + fprintf(stderr, " createlog - Create an empty log file.\n"); + fprintf(stderr, " appendlog - Append key-value pairs to an existing log file.\n"); + fprintf(stderr, " rewrite - Rewrite an existing log/index file pair, " + "trimming away all replaced entries and " + "possibly changing the compression format.\n"); + fprintf(stderr, " help - Show this help text.\n"); } -int info(int argv, const char **args) { - int retval = 0; +static void usage_info() { + fprintf(stderr, "Usage: sparkey info file1 [file2, ...]\n"); + fprintf(stderr, " Show information about files. Files can be either index or log files.\n"); +} + +static void usage_get() { + fprintf(stderr, "Usage: sparkey get \n"); + fprintf(stderr, " Get the value for a specific key.\n"); + fprintf(stderr, " Returns 0 if found,\n"); + fprintf(stderr, " 1 on error,\n"); + fprintf(stderr, " 2 on not-found.\n"); +} + +static void usage_writehash() { + fprintf(stderr, "Usage: sparkey writehash \n"); + fprintf(stderr, " Write a new index file for a log file.\n"); + fprintf(stderr, " Creates and possibly overwrites a new file with file ending .spi\n"); +} + +static void usage_createlog() { + fprintf(stderr, "Usage: sparkey createlog [-c | -b ] \n"); + fprintf(stderr, " Create a new empty log file.\n"); + fprintf(stderr, "Options:\n"); + fprintf(stderr, " -c Compression algorithm [default: none]\n"); + fprintf(stderr, " -b Compression blocksize [default: %d]\n", + COMP_DEFAULT_BLOCKSIZE); + fprintf(stderr, " [min: %d, max: %d]\n", + COMP_MIN_BLOCKSIZE, COMP_MAX_BLOCKSIZE); +} + +static void usage_appendlog() { + fprintf(stderr, "Usage: sparkey appendlog [-d ] \n"); + fprintf(stderr, " Append data from STDIN to a log file with settings.\n"); + fprintf(stderr, " data must be formatted as a sequence of\n"); + fprintf(stderr, " \n"); + fprintf(stderr, "Options:\n"); + fprintf(stderr, " -d Delimiter char to split input records on [default: TAB]\n"); +} + +static void usage_rewrite() { + fprintf(stderr, "Usage: sparkey rewrite [-c | -b ] \n"); + fprintf(stderr, " Iterate over all entries in and create a new index and log pair\n"); + fprintf(stderr, "Options:\n"); + fprintf(stderr, " -c Compression algorithm [default: same as before]\n"); + fprintf(stderr, " -b Compression blocksize [default: same as before]\n"); + fprintf(stderr, " [min: %d, max: %d]\n", + COMP_MIN_BLOCKSIZE, COMP_MAX_BLOCKSIZE); +} + +static void assert(sparkey_returncode rc) { + if (rc != SPARKEY_SUCCESS) { + fprintf(stderr, "%s\n", sparkey_errstring(rc)); + // skip cleanup - program exit will clean up implicitly. + exit(1); + } +} + +static int info_file(const char *filename) { sparkey_logheader logheader; sparkey_hashheader hashheader; - for (int i = 0; i < argv; i++) { - const char *filename = args[i]; - sparkey_returncode res = sparkey_load_logheader(&logheader, filename); - if (res == SPARKEY_SUCCESS) { - printf("%s\n", filename); - print_logheader(&logheader); - } else { - sparkey_returncode res2 = sparkey_load_hashheader(&hashheader, filename); - if (res2 == SPARKEY_SUCCESS) { - printf("%s\n", args[i]); - print_hashheader(&hashheader); - } else { - printf("%s is neither a sparkey log file (%s) nor an index file (%s)\n", filename, sparkey_errstring(res), sparkey_errstring(res2)); - retval = 1; - } - } + sparkey_returncode res = sparkey_load_logheader(&logheader, filename); + if (res == SPARKEY_SUCCESS) { + printf("Filename: %s\n", filename); + print_logheader(&logheader); + printf("\n"); + return 0; + } + + if (res != SPARKEY_WRONG_LOG_MAGIC_NUMBER) { + fprintf(stderr, "%s: %s\n", filename, sparkey_errstring(res)); + return 1; + } + + res = sparkey_load_hashheader(&hashheader, filename); + if (res == SPARKEY_SUCCESS) { + printf("Filename: %s\n", filename); + print_hashheader(&hashheader); + printf("\n"); + return 0; + } + + if (res != SPARKEY_WRONG_HASH_MAGIC_NUMBER) { + fprintf(stderr, "%s: %s\n", filename, sparkey_errstring(res)); + return 1; + } + + fprintf(stderr, "%s: Not a sparkey file.\n", filename); + return 1; +} + +int info(int argc, char * const *argv) { + int retval = 0; + for (int i = 0; i < argc; i++) { + retval |= info_file(argv[i]); } return retval; } @@ -56,30 +148,12 @@ int get(const char *hashfile, const char *logfile, const char *key) { sparkey_hashreader *reader; sparkey_logreader *logreader; sparkey_logiter *iter; - sparkey_returncode errcode = sparkey_hash_open(&reader, hashfile, logfile); - if (errcode != SPARKEY_SUCCESS) { - puts(sparkey_errstring(errcode)); - puts("\n"); - return 1; - } + assert(sparkey_hash_open(&reader, hashfile, logfile)); logreader = sparkey_hash_getreader(reader); - errcode = sparkey_logiter_create(&iter, logreader); - if (errcode != SPARKEY_SUCCESS) { - sparkey_hash_close(&reader); - puts(sparkey_errstring(errcode)); - puts("\n"); - return 1; - } + assert(sparkey_logiter_create(&iter, logreader)); uint64_t keylen = strlen(key); - errcode = sparkey_hash_get(reader, (uint8_t*) key, keylen, iter); - if (errcode != SPARKEY_SUCCESS) { - sparkey_logiter_close(&iter); - sparkey_hash_close(&reader); - puts(sparkey_errstring(errcode)); - puts("\n"); - return 1; - } + assert(sparkey_hash_get(reader, (uint8_t*) key, keylen, iter)); int exitcode = 2; if (sparkey_logiter_state(iter) == SPARKEY_ITER_ACTIVE) { @@ -87,15 +161,8 @@ int get(const char *hashfile, const char *logfile, const char *key) { uint8_t * res; uint64_t len; do { - errcode = sparkey_logiter_valuechunk(iter, logreader, 1 << 31, &res, &len); - if (errcode != SPARKEY_SUCCESS) { - sparkey_logiter_close(&iter); - sparkey_hash_close(&reader); - puts(sparkey_errstring(errcode)); - puts("\n"); - return 1; - } - fwrite(res, 1, len, stdout); + assert(sparkey_logiter_valuechunk(iter, logreader, 1 << 31, &res, &len)); + assert(write_full(STDOUT_FILENO, res, len)); } while (len > 0); } sparkey_logiter_close(&iter); @@ -103,34 +170,338 @@ int get(const char *hashfile, const char *logfile, const char *key) { return exitcode; } -int main(int argv, const char **args) { - if (argv < 2) { +int writehash(const char *indexfile, const char *logfile) { + assert(sparkey_hash_write(indexfile, logfile, 0)); + return 0; +} + +static size_t read_line(char **buffer, size_t *capacity, FILE *input) { + char *buf = *buffer; + size_t cap = *capacity, pos = 0; + + if (cap < MINIMUM_CAPACITY) { + cap = MINIMUM_CAPACITY; + } else if (cap > MAXIMUM_CAPACITY) { + return pos; + } + + while (1) { + buf = realloc(buf, cap); + if (buf == NULL) { + return pos; + } + *buffer = buf; + *capacity = cap; + + if (fgets(buf + pos, cap - pos, input) == NULL) { + break; + } + + pos += strcspn(buf + pos, "\n"); + if (buf[pos] == '\n') { + break; + } + + cap *= 2; + } + + return pos; +} + +static int append(sparkey_logwriter *writer, char delimiter, FILE *input) { + char *line = NULL; + char *key = NULL; + char *value = NULL; + size_t size = 0; + sparkey_returncode returncode; + char delim[2]; + delim[0] = delimiter; + delim[1] = '\0'; + + for (size_t end = read_line(&line, &size, input); line[end] == '\n'; end = read_line(&line, &size, input)) { + // Split on the first delimiter + key = strtok(line, delim); + value = strtok(NULL, "\n"); + if (value != NULL) { + // Write to log + TRY(sparkey_logwriter_put(writer, strlen(key), (uint8_t*)key, strlen(value), (uint8_t*)value), put_fail); + } else { + goto split_fail; + } + } + + free(line); + return 0; + +split_fail: + free(line); + fprintf(stderr, "Cannot split input line, aborting early.\n"); + return 1; +put_fail: + free(line); + fprintf(stderr, "Cannot append line to log file, aborting early: %s\n", sparkey_errstring(returncode)); + return 1; +} + +int main(int argc, char * const *argv) { + if (argc < 2) { usage(); - return 1; + return 0; } - if (strcmp(args[1], "info") == 0) { - if (argv < 3) { - usage(); + const char *command = argv[1]; + if (strcmp(command, "info") == 0) { + if (argc < 3) { + usage_info(); return 1; } - return info(argv - 2, args + 2); - } else if (strcmp(args[1], "get") == 0) { - if (argv < 4) { - usage(); + return info(argc - 2, argv + 2); + } else if (strcmp(command, "get") == 0) { + if (argc < 4) { + usage_get(); return 1; } - const char *index_filename = args[2]; + const char *index_filename = argv[2]; char *log_filename = sparkey_create_log_filename(index_filename); if (log_filename == NULL) { - printf("index filename must end with .spi\n"); + fprintf(stderr, "index filename must end with .spi\n"); return 1; } - int retval = get(args[2], log_filename, args[3]); + int retval = get(argv[2], log_filename, argv[3]); free(log_filename); return retval; - } else { - printf("Unknown command: %s\n", args[1]); + } else if (strcmp(command, "writehash") == 0) { + if (argc < 3) { + usage_writehash(); + return 1; + } + const char *log_filename = argv[2]; + char *index_filename = sparkey_create_index_filename(log_filename); + if (index_filename == NULL) { + fprintf(stderr, "log filename must end with .spl\n"); + return 1; + } + int retval = writehash(index_filename, log_filename); + free(index_filename); + return retval; + } else if (strcmp(command, "createlog") == 0) { + opterr = 0; + optind = 2; + int opt_char; + int block_size = COMP_DEFAULT_BLOCKSIZE; + sparkey_compression_type compression_type = SPARKEY_COMPRESSION_NONE; + while ((opt_char = getopt (argc, argv, "b:c:")) != -1) { + switch (opt_char) { + case 'b': + if (sscanf(optarg, "%d", &block_size) != 1) { + fprintf(stderr, "Block size must be an integer, but was '%s'\n", optarg); + return 1; + } + if (block_size > COMP_MAX_BLOCKSIZE || block_size < COMP_MIN_BLOCKSIZE) { + fprintf(stderr, "Block size %d, not in range. Max is %d, min is %d\n", + block_size, COMP_MAX_BLOCKSIZE, COMP_MIN_BLOCKSIZE); + return 1; + } + break; + case 'c': + if (strcmp(optarg, "none") == 0) { + compression_type = SPARKEY_COMPRESSION_NONE; + } else if (strcmp(optarg, "snappy") == 0) { + compression_type = SPARKEY_COMPRESSION_SNAPPY; + } else if (strcmp(optarg, "zstd") == 0) { + compression_type = SPARKEY_COMPRESSION_ZSTD; + } else { + fprintf(stderr, "Invalid compression type: '%s'\n", optarg); + return 1; + } + break; + case '?': + if (optopt == 'b' || optopt == 'c') { + fprintf(stderr, "Option -%c requires an argument.\n", optopt); + } else if (isprint(optopt)) { + fprintf(stderr, "Unknown option '-%c'.\n", optopt); + } else { + fprintf(stderr, "Unknown option character '\\x%x'.\n", optopt); + } + return 1; + default: + fprintf(stderr, "Unknown option parsing failure\n"); + return 1; + } + } + + if (optind >= argc) { + usage_createlog(); + return 1; + } + + const char *log_filename = argv[optind]; + sparkey_logwriter *writer; + assert(sparkey_logwriter_create(&writer, log_filename, + compression_type, block_size)); + assert(sparkey_logwriter_close(&writer)); + return 0; + } else if (strcmp(command, "appendlog") == 0) { + opterr = 0; + optind = 2; + int opt_char; + char delimiter = '\t'; + while ((opt_char = getopt (argc, argv, "d:")) != -1) { + switch (opt_char) { + case 'd': + if (strlen(optarg) != 1) { + fprintf(stderr, "delimiter must be one character, but was '%s'\n", optarg); + return 1; + } + delimiter = optarg[0]; + break; + case '?': + if (optopt == 'd') { + fprintf(stderr, "Option -%c requires an argument.\n", optopt); + } else if (isprint(optopt)) { + fprintf(stderr, "Unknown option '-%c'.\n", optopt); + } else { + fprintf(stderr, "Unknown option character '\\x%x'.\n", optopt); + } + return 1; + default: + fprintf(stderr, "Unknown option parsing failure\n"); + return 1; + } + } + + if (optind >= argc) { + usage_appendlog(); + return 1; + } + + const char *log_filename = argv[optind]; + sparkey_logwriter *writer; + assert(sparkey_logwriter_append(&writer, log_filename)); + int rc = append(writer, delimiter, stdin); + assert(sparkey_logwriter_close(&writer)); + return rc; + } else if (strcmp(command, "rewrite") == 0) { + opterr = 0; + optind = 2; + int opt_char; + int block_size = -1; + sparkey_compression_type compression_type = SPARKEY_COMPRESSION_NONE; + int compression_set = 0; + while ((opt_char = getopt (argc, argv, "b:c:")) != -1) { + switch (opt_char) { + case 'b': + if (sscanf(optarg, "%d", &block_size) != 1) { + fprintf(stderr, "Block size must be an integer, but was '%s'\n", optarg); + return 1; + } + if (block_size > COMP_MAX_BLOCKSIZE || block_size < COMP_MIN_BLOCKSIZE) { + fprintf(stderr, "Block size %d, not in range. Max is %d, min is %d\n", + block_size, COMP_MAX_BLOCKSIZE, COMP_MIN_BLOCKSIZE); + return 1; + } + break; + case 'c': + compression_set = 1; + if (strcmp(optarg, "none") == 0) { + compression_type = SPARKEY_COMPRESSION_NONE; + } else if (strcmp(optarg, "snappy") == 0) { + compression_type = SPARKEY_COMPRESSION_SNAPPY; + } else if (strcmp(optarg, "zstd") == 0) { + compression_type = SPARKEY_COMPRESSION_ZSTD; + } else { + fprintf(stderr, "Invalid compression type: '%s'\n", optarg); + return 1; + } + break; + case '?': + if (optopt == 'b' || optopt == 'c') { + fprintf(stderr, "Option -%c requires an argument.\n", optopt); + } else if (isprint(optopt)) { + fprintf(stderr, "Unknown option '-%c'.\n", optopt); + } else { + fprintf(stderr, "Unknown option character '\\x%x'.\n", optopt); + } + return 1; + default: + fprintf(stderr, "Unknown option parsing failure\n"); + return 1; + } + } + + if (optind + 1 >= argc) { + usage_rewrite(); + return 1; + } + + const char *input_index_filename = argv[optind]; + const char *output_index_filename = argv[optind + 1]; + + if (strcmp(input_index_filename, output_index_filename) == 0) { + fprintf(stderr, "input and output must be different.\n"); + return 1; + } + + char *input_log_filename = sparkey_create_log_filename(input_index_filename); + if (input_log_filename == NULL) { + fprintf(stderr, "input filename must end with .spi but was '%s'\n", input_index_filename); + return 1; + } + + char *output_log_filename = sparkey_create_log_filename(output_index_filename); + if (output_log_filename == NULL) { + fprintf(stderr, "output filename must end with .spi but was '%s'\n", output_index_filename); + return 1; + } + + sparkey_hashreader *reader; + assert(sparkey_hash_open(&reader, input_index_filename, input_log_filename)); + sparkey_logreader *logreader = sparkey_hash_getreader(reader); + if (!compression_set) { + compression_type = sparkey_logreader_get_compression_type(logreader); + } + if (block_size == -1) { + block_size = sparkey_logreader_get_compression_blocksize(logreader); + } + + // TODO: skip rewrite if compression type and block size are unchanged, and there is no garbage in the log + + sparkey_logwriter *writer; + assert(sparkey_logwriter_create(&writer, output_log_filename, compression_type, block_size)); + + sparkey_logiter *iter; + assert(sparkey_logiter_create(&iter, logreader)); + + uint8_t *keybuf = malloc(sparkey_logreader_maxkeylen(logreader)); + uint8_t *valuebuf = malloc(sparkey_logreader_maxvaluelen(logreader)); + + while (1) { + assert(sparkey_logiter_next(iter, logreader)); + if (sparkey_logiter_state(iter) != SPARKEY_ITER_ACTIVE) { + break; + } + uint64_t wanted_keylen = sparkey_logiter_keylen(iter); + uint64_t actual_keylen; + assert(sparkey_logiter_fill_key(iter, logreader, wanted_keylen, keybuf, &actual_keylen)); + uint64_t wanted_valuelen = sparkey_logiter_valuelen(iter); + uint64_t actual_valuelen; + assert(sparkey_logiter_fill_value(iter, logreader, wanted_valuelen, valuebuf, &actual_valuelen)); + + assert(sparkey_logwriter_put(writer, wanted_keylen, keybuf, wanted_valuelen, valuebuf)); + } + free(keybuf); + free(valuebuf); + sparkey_logiter_close(&iter); + assert(sparkey_logwriter_close(&writer)); + sparkey_hash_close(&reader); + + writehash(output_index_filename, output_log_filename); + + return 0; + } else if (strcmp(command, "help") == 0 || strcmp(command, "--help") == 0 || strcmp(command, "-h") == 0) { usage(); + return 0; + } else { + fprintf(stderr, "Unknown command: %s\n", command); return 1; } } diff --git a/src/returncodes.h b/src/returncodes.h deleted file mode 100644 index 30ab1cf..0000000 --- a/src/returncodes.h +++ /dev/null @@ -1,68 +0,0 @@ -/* -* Copyright (c) 2012-2013 Spotify AB -* -* Licensed under the Apache License, Version 2.0 (the "License"); you may not -* use this file except in compliance with the License. You may obtain a copy of -* the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -* License for the specific language governing permissions and limitations under -* the License. -*/ -#ifndef SPARKEY_RETURNCODES_H -#define SPARKEY_RETURNCODES_H - -typedef enum { - SPARKEY_SUCCESS = 0, - SPARKEY_INTERNAL_ERROR = -1, - - SPARKEY_FILE_NOT_FOUND = -100, - SPARKEY_PERMISSION_DENIED = -101, - SPARKEY_TOO_MANY_OPEN_FILES = -102, - SPARKEY_FILE_TOO_LARGE = -103, - SPARKEY_FILE_ALREADY_EXISTS = -104, - SPARKEY_FILE_BUSY = -105, - SPARKEY_FILE_IS_DIRECTORY = -106, - SPARKEY_FILE_SIZE_EXCEEDED = -107, - SPARKEY_FILE_CLOSED = -108, - SPARKEY_OUT_OF_DISK = -109, - SPARKEY_UNEXPECTED_EOF = -110, - SPARKEY_MMAP_FAILED = -111, - - SPARKEY_WRONG_LOG_MAGIC_NUMBER = -200, - SPARKEY_WRONG_LOG_MAJOR_VERSION = -201, - SPARKEY_UNSUPPORTED_LOG_MINOR_VERSION = -202, - SPARKEY_LOG_TOO_SMALL = -203, - SPARKEY_LOG_CLOSED = -204, - SPARKEY_LOG_ITERATOR_INACTIVE = -205, - SPARKEY_LOG_ITERATOR_MISMATCH = -206, - SPARKEY_LOG_ITERATOR_CLOSED = -207, - SPARKEY_LOG_HEADER_CORRUPT = -208, - SPARKEY_INVALID_COMPRESSION_BLOCK_SIZE = -209, - SPARKEY_INVALID_COMPRESSION_TYPE = -210, - - SPARKEY_WRONG_HASH_MAGIC_NUMBER = -300, - SPARKEY_WRONG_HASH_MAJOR_VERSION = -301, - SPARKEY_UNSUPPORTED_HASH_MINOR_VERSION = -302, - SPARKEY_HASH_TOO_SMALL = -303, - SPARKEY_HASH_CLOSED = -304, - SPARKEY_FILE_IDENTIFIER_MISMATCH = -305, - SPARKEY_HASH_HEADER_CORRUPT = -306, - SPARKEY_HASH_SIZE_INVALID = -307, - -} sparkey_returncode; - -/** - * Get a human readable string from a return code. - * @param code a return code - * @returns a string representing the return code. - */ -const char * sparkey_errstring(sparkey_returncode code); - -#endif - - diff --git a/src/sparkey-internal.h b/src/sparkey-internal.h index 509122c..3c1175e 100644 --- a/src/sparkey-internal.h +++ b/src/sparkey-internal.h @@ -87,4 +87,13 @@ struct sparkey_hashreader { sparkey_returncode sparkey_logreader_open_noalloc(sparkey_logreader *log, const char *filename); void sparkey_logreader_close_nodealloc(sparkey_logreader *log); +struct sparkey_compressor { + uint32_t (*max_compressed_size)(uint32_t block_size); + sparkey_returncode (*decompress)(uint8_t *input, uint32_t compressed_size, uint8_t *output, uint32_t *uncompressed_size); + sparkey_returncode (*compress)(uint8_t *input, uint32_t uncompressed_size, uint8_t *output, uint32_t *compressed_size); +}; + +extern struct sparkey_compressor sparkey_compressors[3]; +int sparkey_uses_compressor(sparkey_compression_type t); + #endif diff --git a/src/sparkey.h b/src/sparkey.h index 2ee14ed..b6f46eb 100644 --- a/src/sparkey.h +++ b/src/sparkey.h @@ -39,8 +39,8 @@ * \endcode * - Write to the log: * \code - * const char *key = "mykey"; - * const char *value = "this is my value"; + * const char *mykey = "mykey"; + * const char *myvalue = "this is my value"; * sparkey_returncode returncode = sparkey_logwriter_put(mywriter, strlen(mykey), (uint8_t*)mykey, strlen(myvalue), (uint8_t*)myvalue); * // TODO: check the returncode * \endcode @@ -205,6 +205,10 @@ #include +#ifdef __cplusplus +extern "C" { +#endif + typedef enum { SPARKEY_SUCCESS = 0, SPARKEY_INTERNAL_ERROR = -1, @@ -262,7 +266,8 @@ typedef struct sparkey_logwriter sparkey_logwriter; typedef enum { SPARKEY_COMPRESSION_NONE, - SPARKEY_COMPRESSION_SNAPPY + SPARKEY_COMPRESSION_SNAPPY, + SPARKEY_COMPRESSION_ZSTD } sparkey_compression_type; typedef enum { @@ -289,7 +294,7 @@ typedef struct sparkey_hashreader sparkey_hashreader; /** * Creates a new Sparkey log file, possibly overwriting an already existing. - * @param log a reference to a reference to a sparkey_logwriter structure that gets allocated and initialized by this call. + * @param log a double reference to a sparkey_logwriter structure that gets allocated and initialized by this call. * @param filename the file to create. * @param compression_type NONE or SNAPPY, specifies if block compression should be used or not. * @param compression_block_size is only relevant if compression type is not NONE. @@ -300,12 +305,12 @@ sparkey_returncode sparkey_logwriter_create(sparkey_logwriter **log, const char /** * Append to an existing Sparkey log file. - * @param log a reference to an allocated but not initialized sparkey_logwriter struct. + * @param log a double reference to a sparkey_logwriter structure that gets allocated and initialized by this call. * @param filename the file to open for appending. * It represents the maximum number of bytes of an uncompressed block. * @return SPARKEY_SUCCESS if all goes well. */ -sparkey_returncode sparkey_logwriter_append(sparkey_logwriter *log, const char *filename); +sparkey_returncode sparkey_logwriter_append(sparkey_logwriter **log, const char *filename); /** * Append a key/value pair to the log file @@ -383,6 +388,20 @@ uint64_t sparkey_logreader_maxkeylen(sparkey_logreader *log); */ uint64_t sparkey_logreader_maxvaluelen(sparkey_logreader *log); +/** + * Get the blocksize for a reader + * @param log a reference to a logreader. + * @returns the blocksize + */ +int sparkey_logreader_get_compression_blocksize(sparkey_logreader *log); + +/** + * Get the compression type for a reader + * @param log a reference to a logreader. + * @returns the compression type + */ +sparkey_compression_type sparkey_logreader_get_compression_type(sparkey_logreader *log); + /** * Initializes a logiter and associates it with a logreader. * The logreader must be open. The logiter is not threadsafe. @@ -627,6 +646,8 @@ sparkey_returncode sparkey_logiter_hashnext(sparkey_logiter *iter, sparkey_hashr uint64_t sparkey_hash_numentries(sparkey_hashreader *reader); +uint64_t sparkey_hash_numcollisions(sparkey_hashreader *reader); + /* util */ /** @@ -637,5 +658,17 @@ uint64_t sparkey_hash_numentries(sparkey_hashreader *reader); */ char * sparkey_create_log_filename(const char *index_filename); +/** + * Allocates and creates a string denoting an index file from a log file. + * This is simply a string replacement of .spl$ to .spi$ + * @param log_filename the filename representing the log file + * @returns NULL if the log_filename does not end with ".spl" + */ +char * sparkey_create_index_filename(const char *log_filename); + +#ifdef __cplusplus +} +#endif + #endif diff --git a/src/testsystem.c b/src/testsystem.c index cdd5778..a03b04a 100644 --- a/src/testsystem.c +++ b/src/testsystem.c @@ -229,6 +229,33 @@ void verify(sparkey_compression_type compression, int blocksize, int hashsize, i sparkey_logiter_close(&myiter); } +void verify_files_closed() { + // Verify that SPARKEY_FILE_IDENTIFIER_MISMATCH is returned appropriately. + sparkey_logwriter *writer; + assert_equals(SPARKEY_SUCCESS, sparkey_logwriter_create(&writer, "test1.spl", + SPARKEY_COMPRESSION_NONE, 4096)); + assert_equals(SPARKEY_SUCCESS, sparkey_logwriter_close(&writer)); + assert_equals(1, writer == NULL); + + assert_equals(SPARKEY_SUCCESS, sparkey_logwriter_create(&writer, "test2.spl", + SPARKEY_COMPRESSION_NONE, 4096)); + assert_equals(SPARKEY_SUCCESS, sparkey_logwriter_close(&writer)); + assert_equals(1, writer == NULL); + + // Now create a hash for test1. + assert_equals(SPARKEY_SUCCESS, sparkey_hash_write("test1.spi", "test1.spl", 0)); + + // and try to open the wrong files: + sparkey_hashreader* reader; + sparkey_returncode rc = sparkey_hash_open( + &reader, + "test1.spi", + "test2.spl" + ); + + assert_equals(SPARKEY_FILE_IDENTIFIER_MISMATCH, rc); +} + int main() { verify(SPARKEY_COMPRESSION_NONE, 0, 0, 0, 0, 0); verify(SPARKEY_COMPRESSION_NONE, 0, 0, 1, 0, 0); @@ -237,17 +264,21 @@ int main() { verify(SPARKEY_COMPRESSION_NONE, 0, 0, 0, 0, 100); verify(SPARKEY_COMPRESSION_NONE, 0, 0, 100, 10, 5); - verify(SPARKEY_COMPRESSION_SNAPPY, 10, 0, 100, 0, 0); - verify(SPARKEY_COMPRESSION_SNAPPY, 20, 0, 100, 0, 0); - verify(SPARKEY_COMPRESSION_SNAPPY, 100, 0, 100, 0, 0); - verify(SPARKEY_COMPRESSION_SNAPPY, 100, 0, 1000, 0, 0); - verify(SPARKEY_COMPRESSION_SNAPPY, 1000, 0, 1000, 0, 0); + for (sparkey_compression_type t = SPARKEY_COMPRESSION_SNAPPY; t <= SPARKEY_COMPRESSION_ZSTD; t++) { + verify(t, 10, 0, 100, 0, 0); + verify(t, 20, 0, 100, 0, 0); + verify(t, 100, 0, 100, 0, 0); + verify(t, 100, 0, 1000, 0, 0); + verify(t, 1000, 0, 1000, 0, 0); - verify(SPARKEY_COMPRESSION_SNAPPY, 100, 0, 1000, 100, 0); - verify(SPARKEY_COMPRESSION_SNAPPY, 100, 0, 1000, 100, 50); + verify(t, 100, 0, 1000, 100, 0); + verify(t, 100, 0, 1000, 100, 50); + + verify(t, 100, 4, 1000, 0, 0); + verify(t, 100, 8, 1000, 0, 0); + } - verify(SPARKEY_COMPRESSION_SNAPPY, 100, 4, 1000, 0, 0); - verify(SPARKEY_COMPRESSION_SNAPPY, 100, 8, 1000, 0, 0); + verify_files_closed(); printf("Success!\n"); } diff --git a/src/util.c b/src/util.c index 281d818..aeec183 100644 --- a/src/util.c +++ b/src/util.c @@ -30,7 +30,7 @@ sparkey_returncode sparkey_open_returncode(int e) { case ENOENT: return SPARKEY_FILE_NOT_FOUND; case EOVERFLOW: return SPARKEY_FILE_TOO_LARGE; default: - printf("_sparkey_open_returncode():%d error: errno = %d\n", __LINE__, e); + fprintf(stderr, "_sparkey_open_returncode():%d error: errno = %d\n", __LINE__, e); return SPARKEY_INTERNAL_ERROR; } } @@ -45,7 +45,7 @@ sparkey_returncode sparkey_create_returncode(int e) { case ENFILE: case EMFILE: return SPARKEY_TOO_MANY_OPEN_FILES; default: - printf("_sparkey_create_returncode():%d error: errno = %d\n", __LINE__, e); + fprintf(stderr, "_sparkey_create_returncode():%d error: errno = %d\n", __LINE__, e); return SPARKEY_INTERNAL_ERROR; } } @@ -59,27 +59,37 @@ sparkey_returncode sparkey_remove_returncode(int e) { case EISDIR: return SPARKEY_FILE_IS_DIRECTORY; case EOVERFLOW: return SPARKEY_FILE_TOO_LARGE; default: - printf("_sparkey_remove_returncode():%d error: errno = %d\n", __LINE__, e); + fprintf(stderr, "_sparkey_remove_returncode():%d error: errno = %d\n", __LINE__, e); return SPARKEY_INTERNAL_ERROR; } } -char * sparkey_create_log_filename(const char *index_filename) { - if (index_filename == NULL) return NULL; - size_t l = strlen(index_filename); +static inline char * _create_filename(const char *input, const char *from, char to) { + if (input == NULL) return NULL; + size_t l = strlen(input); // Paranoia - avoid ridiculously long filenames. if (l > 10000) return NULL; - // Too short to contain .spi - if (l < 4) return NULL; + // Too short to contain from + if (l < strlen(from)) return NULL; + + if (memcmp(&input[l - strlen(from)], from, strlen(from))) return NULL; + + size_t data_size = sizeof(char) * (l + 1); - if (memcmp(&index_filename[l - 4], ".spi", 4)) return NULL; + char *output = malloc(data_size); + if (output == NULL) return NULL; + memcpy(output, input, data_size); - char *log_filename = strdup(index_filename); - if (log_filename == NULL) return NULL; + output[l - 1] = to; + return output; +} - log_filename[l - 1] = 'l'; - return log_filename; +char * sparkey_create_log_filename(const char *index_filename) { + return _create_filename(index_filename, ".spi", 'l'); } +char * sparkey_create_index_filename(const char *log_filename) { + return _create_filename(log_filename, ".spl", 'i'); +}