Skip to content

Appkins/streams #3

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Dec 10, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
184 changes: 92 additions & 92 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -44,33 +44,33 @@ project(${PROJECT} CXX)
###
# compilation options
###
if(WIN32)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} /W3 /O2 /bigobj")
if (WIN32)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} /W3 /O2 /bigobj")

# was causing conflics with gtest build
string(REPLACE "/RTC1" "" CMAKE_CXX_FLAGS_DEBUG ${CMAKE_CXX_FLAGS_DEBUG})
# was causing conflics with gtest build
string(REPLACE "/RTC1" "" CMAKE_CXX_FLAGS_DEBUG ${CMAKE_CXX_FLAGS_DEBUG})

if("${MSVC_RUNTIME_LIBRARY_CONFIG}" STREQUAL "")
set(MSVC_RUNTIME_LIBRARY_CONFIG "/MT")
endif()
if ("${MSVC_RUNTIME_LIBRARY_CONFIG}" STREQUAL "")
set(MSVC_RUNTIME_LIBRARY_CONFIG "/MT")
endif ()

foreach(flag_var CMAKE_CXX_FLAGS CMAKE_CXX_FLAGS_DEBUG CMAKE_CXX_FLAGS_RELEASE)
if("${MSVC_RUNTIME_LIBRARY_CONFIG}" STREQUAL "/MT")
string(REPLACE "/MD" "/MT" ${flag_var} "${${flag_var}}")
elseif("${MSVC_RUNTIME_LIBRARY_CONFIG}" STREQUAL "/MD")
string(REPLACE "/MT" "/MD" ${flag_var} "${${flag_var}}")
else()
string(REPLACE "/MD" "${MSVC_RUNTIME_LIBRARY_CONFIG}" ${flag_var} "${${flag_var}}")
string(REPLACE "/MT" "${MSVC_RUNTIME_LIBRARY_CONFIG}" ${flag_var} "${${flag_var}}")
endif()
endforeach()
foreach (flag_var CMAKE_CXX_FLAGS CMAKE_CXX_FLAGS_DEBUG CMAKE_CXX_FLAGS_RELEASE)
if ("${MSVC_RUNTIME_LIBRARY_CONFIG}" STREQUAL "/MT")
string(REPLACE "/MD" "/MT" ${flag_var} "${${flag_var}}")
elseif ("${MSVC_RUNTIME_LIBRARY_CONFIG}" STREQUAL "/MD")
string(REPLACE "/MT" "/MD" ${flag_var} "${${flag_var}}")
else ()
string(REPLACE "/MD" "${MSVC_RUNTIME_LIBRARY_CONFIG}" ${flag_var} "${${flag_var}}")
string(REPLACE "/MT" "${MSVC_RUNTIME_LIBRARY_CONFIG}" ${flag_var} "${${flag_var}}")
endif ()
endforeach ()

add_definitions(-D_UNICODE)
add_definitions(-DUNICODE)
add_definitions(-DWIN32_LEAN_AND_MEAN)
else()
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11 -W -Wall -Wextra -O3")
endif(WIN32)
add_definitions(-D_UNICODE)
add_definitions(-DUNICODE)
add_definitions(-DWIN32_LEAN_AND_MEAN)
else ()
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11 -W -Wall -Wextra -O3")
endif (WIN32)


###
Expand All @@ -81,9 +81,9 @@ find_library(TACOPIE_LIBRARY tacopie)
set(CPP_REDIS_INCLUDES ${PROJECT_SOURCE_DIR}/includes)
set(DEPS_INCLUDES ${PROJECT_SOURCE_DIR}/deps/include)

if(NOT USE_CUSTOM_TCP_CLIENT)
set(DEPS_INCLUDES ${DEPS_INCLUDES} ${TACOPIE_INCLUDE_DIR})
endif()
if (NOT USE_CUSTOM_TCP_CLIENT)
set(DEPS_INCLUDES ${DEPS_INCLUDES} ${TACOPIE_INCLUDE_DIR})
endif ()

set(DEPS_LIBRARIES ${PROJECT_SOURCE_DIR}/deps/lib)

Expand All @@ -98,31 +98,31 @@ include_directories(${CPP_REDIS_INCLUDES} ${DEPS_INCLUDES})
# sources
###
set(SRC_DIRS "sources"
"sources/builders"
"sources/core"
"sources/misc"
"sources/network"
"includes/cpp_redis"
"includes/cpp_redis/builders"
"includes/cpp_redis/core"
"includes/cpp_redis/misc"
"includes/cpp_redis/network")

foreach(dir ${SRC_DIRS})
# get directory sources and headers
file(GLOB s_${dir} "${dir}/*.cpp")
file(GLOB h_${dir} "${dir}/*.hpp")
file(GLOB i_${dir} "${dir}/*.ipp")

# set sources
set(SOURCES ${SOURCES} ${s_${dir}} ${h_${dir}} ${i_${dir}})
endforeach()
"sources/builders"
"sources/core"
"sources/misc"
"sources/network"
"includes/cpp_redis"
"includes/cpp_redis/builders"
"includes/cpp_redis/core"
"includes/cpp_redis/misc"
"includes/cpp_redis/network")

foreach (dir ${SRC_DIRS})
# get directory sources and headers
file(GLOB s_${dir} "${dir}/*.cpp")
file(GLOB h_${dir} "${dir}/*.hpp")
file(GLOB i_${dir} "${dir}/*.ipp")

# set sources
set(SOURCES ${SOURCES} ${s_${dir}} ${h_${dir}} ${i_${dir}})
endforeach ()
# filter tcp_client if no tacopie
if(USE_CUSTOM_TCP_CLIENT)
file(GLOB tacopie_cpp "sources/network/tcp_client.cpp")
file(GLOB tacopie_h "includes/cpp_redis/network/tcp_client.hpp")
list(REMOVE_ITEM SOURCES ${tacopie_cpp} ${tacopie_h})
endif(USE_CUSTOM_TCP_CLIENT)
if (USE_CUSTOM_TCP_CLIENT)
file(GLOB tacopie_cpp "sources/network/tcp_client.cpp")
file(GLOB tacopie_h "includes/cpp_redis/network/tcp_client.hpp")
list(REMOVE_ITEM SOURCES ${tacopie_cpp} ${tacopie_h})
endif (USE_CUSTOM_TCP_CLIENT)


###
Expand All @@ -144,39 +144,39 @@ configure_file("cpp_redis.pc.in" "${CMAKE_PKGCONFIG_OUTPUT_DIRECTORY}/cpp_redis.
add_library(${PROJECT} ${SOURCES})
set_property(TARGET ${PROJECT} PROPERTY POSITION_INDEPENDENT_CODE ON)

if(WIN32)
set_target_properties(${PROJECT}
PROPERTIES COMPILE_PDB_NAME ${PROJECT}
COMPILE_PDB_OUTPUT_DIRECTORY ${CMAKE_LIBRARY_OUTPUT_DIRECTORY})
endif(WIN32)
if (WIN32)
set_target_properties(${PROJECT}
PROPERTIES COMPILE_PDB_NAME ${PROJECT}
COMPILE_PDB_OUTPUT_DIRECTORY ${CMAKE_LIBRARY_OUTPUT_DIRECTORY})
endif (WIN32)

if(WIN32)
target_link_libraries(${PROJECT} ws2_32)
else()
target_link_libraries(${PROJECT} pthread)
endif(WIN32)
if (WIN32)
target_link_libraries(${PROJECT} ws2_32)
else ()
target_link_libraries(${PROJECT} pthread)
endif (WIN32)

if(TACOPIE_LIBRARY)
target_link_libraries(${PROJECT} ${TACOPIE_LIBRARY})
else()
target_link_libraries(${PROJECT} tacopie)
endif(TACOPIE_LIBRARY)
if (TACOPIE_LIBRARY)
target_link_libraries(${PROJECT} ${TACOPIE_LIBRARY})
else ()
target_link_libraries(${PROJECT} tacopie)
endif (TACOPIE_LIBRARY)


# __CPP_REDIS_READ_SIZE
if(READ_SIZE)
set_property(TARGET ${PROJECT} APPEND_STRING PROPERTY COMPILE_DEFINITIONS " __CPP_REDIS_READ_SIZE=${READ_SIZE}")
endif(READ_SIZE)
if (READ_SIZE)
set_property(TARGET ${PROJECT} APPEND_STRING PROPERTY COMPILE_DEFINITIONS " __CPP_REDIS_READ_SIZE=${READ_SIZE}")
endif (READ_SIZE)

# __CPP_REDIS_LOGGING_ENABLED
if(LOGGING_ENABLED)
set_property(TARGET ${PROJECT} APPEND_STRING PROPERTY COMPILE_DEFINITIONS " __CPP_REDIS_LOGGING_ENABLED=${LOGGING_ENABLED}")
endif(LOGGING_ENABLED)
if (LOGGING_ENABLED)
set_property(TARGET ${PROJECT} APPEND_STRING PROPERTY COMPILE_DEFINITIONS " __CPP_REDIS_LOGGING_ENABLED=${LOGGING_ENABLED}")
endif (LOGGING_ENABLED)

# __CPP_REDIS_USE_CUSTOM_TCP_CLIENT
if(USE_CUSTOM_TCP_CLIENT)
set_property(TARGET ${PROJECT} APPEND_STRING PROPERTY COMPILE_DEFINITIONS " __CPP_REDIS_USE_CUSTOM_TCP_CLIENT=${USE_CUSTOM_TCP_CLIENT}")
endif(USE_CUSTOM_TCP_CLIENT)
if (USE_CUSTOM_TCP_CLIENT)
set_property(TARGET ${PROJECT} APPEND_STRING PROPERTY COMPILE_DEFINITIONS " __CPP_REDIS_USE_CUSTOM_TCP_CLIENT=${USE_CUSTOM_TCP_CLIENT}")
endif (USE_CUSTOM_TCP_CLIENT)


###
Expand All @@ -194,30 +194,30 @@ install(DIRECTORY ${CPP_REDIS_INCLUDES}/ DESTINATION include USE_SOURCE_PERMISSI
###
# examples
###
if(BUILD_EXAMPLES)
add_subdirectory(examples)
# Reset variable to false to ensure tacopie does no build examples
set(BUILD_EXAMPLES false)
endif(BUILD_EXAMPLES)
if (BUILD_EXAMPLES)
add_subdirectory(examples)
# Reset variable to false to ensure tacopie does no build examples
set(BUILD_EXAMPLES false)
endif (BUILD_EXAMPLES)

###
# tests
###
if(BUILD_TESTS)
enable_testing()
add_subdirectory(tests)
ExternalProject_Add("googletest"
GIT_REPOSITORY "https://github.com/google/googletest.git"
CMAKE_ARGS "-DCMAKE_INSTALL_PREFIX=${PROJECT_SOURCE_DIR}/deps")
# Reset variable to false to ensure tacopie does no build tests
set(BUILD_TESTS false)
endif(BUILD_TESTS)
if (BUILD_TESTS)
enable_testing()
add_subdirectory(tests)
ExternalProject_Add("googletest"
GIT_REPOSITORY "https://github.com/google/googletest.git"
CMAKE_ARGS "-DCMAKE_INSTALL_PREFIX=${PROJECT_SOURCE_DIR}/deps")
# Reset variable to false to ensure tacopie does no build tests
set(BUILD_TESTS false)
endif (BUILD_TESTS)


###
# tacopie
###
if(NOT TACOPIE_LIBRARY AND NOT USE_CUSTOM_TCP_CLIENT)
set(SOURCES) # reset the SOURCES var so that the tacopie project won't include the cpp_redis sources too
add_subdirectory(tacopie)
endif(NOT TACOPIE_LIBRARY AND NOT USE_CUSTOM_TCP_CLIENT)
if (NOT TACOPIE_LIBRARY AND NOT USE_CUSTOM_TCP_CLIENT)
set(SOURCES) # reset the SOURCES var so that the tacopie project won't include the cpp_redis sources too
add_subdirectory(tacopie)
endif (NOT TACOPIE_LIBRARY AND NOT USE_CUSTOM_TCP_CLIENT)
73 changes: 26 additions & 47 deletions examples/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -39,50 +39,29 @@ include_directories(${CPP_REDIS_INCLUDES})
###
link_directories(${DEPS_LIBRARIES})


###
# executable
###
add_executable(cpp_redis_client cpp_redis_client.cpp)
target_link_libraries(cpp_redis_client cpp_redis)

add_executable(cpp_redis_consumer cpp_redis_consumer.cpp)
target_link_libraries(cpp_redis_consumer cpp_redis)

add_executable(cpp_redis_future_client cpp_redis_future_client.cpp)
target_link_libraries(cpp_redis_future_client cpp_redis)

add_executable(cpp_redis_subscriber cpp_redis_subscriber.cpp)
target_link_libraries(cpp_redis_subscriber cpp_redis)

add_executable(cpp_redis_logger cpp_redis_logger.cpp)
target_link_libraries(cpp_redis_logger cpp_redis)

add_executable(cpp_redis_kill cpp_redis_kill.cpp)
target_link_libraries(cpp_redis_kill cpp_redis)

add_executable(cpp_redis_streams_client cpp_redis_streams_client.cpp)
target_link_libraries(cpp_redis_streams_client cpp_redis)

add_executable(cpp_redis_high_availability_client cpp_redis_high_availability_client.cpp)
target_link_libraries(cpp_redis_high_availability_client cpp_redis)


###
# link libs
###
if(WIN32)
target_link_libraries(cpp_redis_client ws2_32)
target_link_libraries(cpp_redis_future_client ws2_32)
target_link_libraries(cpp_redis_subscriber ws2_32)
target_link_libraries(cpp_redis_logger ws2_32)
target_link_libraries(cpp_redis_kill ws2_32)
target_link_libraries(cpp_redis_high_availability_client ws2_32)
else()
target_link_libraries(cpp_redis_client pthread)
target_link_libraries(cpp_redis_future_client pthread)
target_link_libraries(cpp_redis_subscriber pthread)
target_link_libraries(cpp_redis_logger pthread)
target_link_libraries(cpp_redis_kill pthread)
target_link_libraries(cpp_redis_high_availability_client pthread)
endif(WIN32)
set(EXAMPLES cpp_redis_client
cpp_redis_consumer
cpp_redis_future_client
cpp_redis_subscriber
cpp_redis_logger
cpp_redis_kill
cpp_redis_streams_client
cpp_redis_high_availability_client
)

foreach(EXAMPLE IN ITEMS ${EXAMPLES})
###
# executable
###
add_executable(${EXAMPLE} ${EXAMPLE}.cpp)
target_link_libraries(${EXAMPLE} cpp_redis)

###
# link libs
###
if(WIN32)
target_link_libraries(${EXAMPLE} ws2_32)
else()
target_link_libraries(${EXAMPLE} pthread)
endif(WIN32)
endforeach(EXAMPLE)
2 changes: 1 addition & 1 deletion examples/cpp_redis_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ main(void) {
}
});

auto replcmd = [](cpp_redis::reply &reply) {
auto replcmd = [](const cpp_redis::reply &reply) {
std::cout << "set hello 42: " << reply << std::endl;
// if (reply.is_string())
// do_something_with_string(reply.as_string())
Expand Down
38 changes: 32 additions & 6 deletions examples/cpp_redis_consumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ sigint_handler(int) {
}

int
main(void) {
main() {
#ifdef _WIN32
//! Windows netword DLL init
WORD version = MAKEWORD(2, 2);
Expand All @@ -53,7 +53,8 @@ main(void) {

//! Enable logging

const std::string group_name = "groupone";
//const std::string group_name = "groupone";
const std::vector<std::string> group_names = {"groupone"}; //, "grouptwo"};
const std::string session_name = "sessone";
const std::string consumer_name = "ABCD";

Expand All @@ -68,11 +69,36 @@ main(void) {
}
});

sub.subscribe(group_name, [](const cpp_redis::message_type msg){
std::cout << "Id in the cb: " << msg.get_id() << std::endl;
sub.auth("{redis_key}");

for (auto &group : group_names) {

sub.subscribe(group,
[group](const cpp_redis::message_type msg) {
cpp_redis::consumer_response_t res;
// Callback will run for each message obtained from the queue
std::cout << "Group: " << group << std::endl;
std::cout << "Id in the cb: " << msg.get_id() << std::endl;
res.insert({"Id", msg.get_id()});
return res;
},
[group](int ack_status) {
// Callback will run upon return of xack
std::cout << "Group: " << group << std::endl;
std::cout << "Ack status: " << ack_status << std::endl;
});
}

return msg;
});
/*sub.subscribe(group_name,
[](const cpp_redis::message_type msg) {
// Callback will run for each message obtained from the queue
std::cout << "Id in the cb: " << msg.get_id() << std::endl;
return msg;
},
[](int ack_status) {
// Callback will run upon return of xack
std::cout << "Ack status: " << ack_status << std::endl;
});*/

sub.commit();

Expand Down
2 changes: 1 addition & 1 deletion examples/cpp_redis_streams_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
#endif /* _WIN32 */

int
main(void) {
main() {
#ifdef _WIN32
//! Windows netword DLL init
WORD version = MAKEWORD(2, 2);
Expand Down
Loading