Skip to content

part-persist: implement message aggregation #13039

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ompi/mca/part/base/part_base_frame.c
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,8 @@ static int mca_part_base_open(mca_base_open_flag_t flags)

mca_part_base_selected_component.partm_finalize = NULL;

/* Currently this uses a default with no selection criteria as there is only 1 module. */
opal_pointer_array_add(&mca_part_base_part, strdup("persist"));
opal_pointer_array_add(&mca_part_base_part, strdup("persist_aggregated"));

return OMPI_SUCCESS;
}
Expand Down
55 changes: 55 additions & 0 deletions ompi/mca/part/persist_aggregated/Makefile.am
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
#
# Copyright (c) 2004-2006 The Regents of the University of California.
# All rights reserved.
# Copyright (c) 2009-2024 High Performance Computing Center Stuttgart,
# University of Stuttgart. All rights reserved.
# Copyright (c) 2010 Cisco Systems, Inc. All rights reserved.
# Copyright (c) 2017 IBM Corporation. All rights reserved.
# Copyright (c) 2020-2021 Sandia National Laboratories. All rights reserved.
# $COPYRIGHT$
#
# Additional copyrights may follow
#
# $HEADER$
#

# Make the output library in this directory, and name it either
# mca_<type>_<name>.la (for DSO builds) or libmca_<type>_<name>.la
# (for static builds).

EXTRA_DIST = post_configure.sh

if MCA_BUILD_ompi_part_persist_aggregated_DSO
component_noinst =
component_install = mca_part_persist_aggregated.la
else
component_noinst = libmca_part_persist_aggregated.la
component_install =
endif

local_sources = \
part_persist_aggregated.c \
part_persist_aggregated.h \
part_persist_aggregated_component.c \
part_persist_aggregated_component.h \
part_persist_aggregated_recvreq.h \
part_persist_aggregated_recvreq.c \
part_persist_aggregated_request.h \
part_persist_aggregated_request.c \
part_persist_aggregated_sendreq.h \
part_persist_aggregated_sendreq.c \
schemes/part_persist_aggregated_scheme_regular.h \
schemes/part_persist_aggregated_scheme_regular.c

mcacomponentdir = $(ompilibdir)
mcacomponent_LTLIBRARIES = $(component_install)
mca_part_persist_aggregated_la_SOURCES = $(local_sources)
mca_part_persist_aggregated_la_LIBADD = $(top_builddir)/ompi/lib@[email protected] \
$(part_persist_aggregated_LIBS)
mca_part_persist_aggregated_la_LDFLAGS = -module -avoid-version $(part_persist_aggregated_LDFLAGS)

noinst_LTLIBRARIES = $(component_noinst)
libmca_part_persist_aggregated_la_SOURCES = $(local_sources)
libmca_part_persist_aggregated_la_LIBADD = $(part_persist_aggregated_LIBS)
libmca_part_persist_aggregated_la_LDFLAGS = -module -avoid-version $(part_persist_aggregated_LDFLAGS)

652 changes: 652 additions & 0 deletions ompi/mca/part/persist_aggregated/part_persist_aggregated.c

Large diffs are not rendered by default.

98 changes: 98 additions & 0 deletions ompi/mca/part/persist_aggregated/part_persist_aggregated.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Copyright (c) 2004-2006 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2004-2007 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2015-2024 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* Copyright (c) 2017 Intel, Inc. All rights reserved
* Copyright (c) 2019-2021 The University of Tennessee at Chattanooga and The University
* of Tennessee Research Foundation. All rights reserved.
* Copyright (c) 2019-2021 Sandia National Laboratories. All rights reserved.
* Copyright (c) 2021 University of Alabama at Birmingham. All rights reserved.
* Copyright (c) 2021 Tennessee Technological University. All rights reserved.
* Copyright (c) 2021 Cisco Systems, Inc. All rights reserved
* Copyright (c) 2021 Bull S.A.S. All rights reserved.
* Copyright (c) 2024 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/

#ifndef PART_PERSIST_AGGREGATED_H
#define PART_PERSIST_AGGREGATED_H

#ifdef HAVE_ALLOCA_H
#include <alloca.h>
#endif

#include <math.h>

#include "ompi_config.h"
#include "ompi/request/request.h"
#include "ompi/mca/part/part.h"
#include "ompi/mca/part/base/base.h"
#include "ompi/datatype/ompi_datatype.h"
#include "ompi/communicator/communicator.h"
#include "ompi/request/request.h"
#include "opal/sys/atomic.h"

#include "ompi/mca/part/persist_aggregated/part_persist_aggregated_request.h"
#include "ompi/mca/part/base/part_base_precvreq.h"
#include "ompi/mca/part/persist_aggregated/part_persist_aggregated_recvreq.h"
#include "ompi/mca/part/persist_aggregated/part_persist_aggregated_sendreq.h"
#include "ompi/message/message.h"
#include "ompi/mca/pml/pml.h"
BEGIN_C_DECLS

typedef struct mca_part_persist_aggregated_list_t {
opal_list_item_t super;
mca_part_persist_aggregated_request_t *item;
} mca_part_persist_aggregated_list_t;

OPAL_DECLSPEC OBJ_CLASS_DECLARATION(mca_part_persist_aggregated_list_t);


struct ompi_part_persist_aggregated_t {
mca_part_base_module_t super;
int free_list_num;
int free_list_max;
int free_list_inc;
opal_list_t *progress_list;

int32_t next_send_tag; /**< This is a counter for send tags for the actual data transfer. */
int32_t next_recv_tag;
ompi_communicator_t *part_comm; /* This approach requires a separate tag space, so we need a dedicated communicator. */
ompi_request_t *part_comm_req;
int32_t part_comm_ready;
ompi_communicator_t *part_comm_setup; /* We create a second communicator to send set-up messages (rational: these
messages go in the opposite direction of normal messages, need to use MPI_ANY_SOURCE
to support different communicators, and thus need to have a unique tag. Because tags
are controlled by the sender in this model, we cannot assume that the tag will be
unused in part_comm. */
ompi_request_t *part_comm_sreq;
int32_t part_comm_sready;
int32_t init_comms;
int32_t init_world;
int32_t my_world_rank; /* Because the back end communicators use a world rank, we need to communicate ours
to set up the requests. */

uint32_t min_message_size; /* parameters to control internal partitioning */
uint32_t max_message_count;

opal_atomic_int32_t block_entry;
opal_mutex_t lock;
};
typedef struct ompi_part_persist_aggregated_t ompi_part_persist_aggregated_t;
extern ompi_part_persist_aggregated_t ompi_part_persist_aggregated;

int mca_part_persist_aggregated_start(size_t, ompi_request_t**);
int mca_part_persist_aggregated_free(ompi_request_t**);

END_C_DECLS

#endif /* PART_PERSIST_AGGREGATED_H_HAS_BEEN_INCLUDED */
184 changes: 184 additions & 0 deletions ompi/mca/part/persist_aggregated/part_persist_aggregated_component.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
/*
* Copyright (c) 2006-2007 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2007 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2006 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2010-2012 Cisco Systems, Inc. All rights reserved.
* Copyright (c) 2013-2021 Sandia National Laboratories. All rights reserved.
* Copyright (c) 2015 Los Alamos National Security, LLC. All rights
* reserved.
* Copyright (c) 2024 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* Copyright (c) 2024 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/

#include "ompi_config.h"

#include "ompi/mca/part/persist_aggregated/part_persist_aggregated.h"

#include "ompi/mca/part/persist_aggregated/part_persist_aggregated_sendreq.h"
#include "ompi/mca/part/persist_aggregated/part_persist_aggregated_recvreq.h"
#include "ompi/mca/part/persist_aggregated/part_persist_aggregated_component.h"

static int mca_part_persist_aggregated_component_register(void);
static int mca_part_persist_aggregated_component_open(void);
static int mca_part_persist_aggregated_component_close(void);
static mca_part_base_module_t* mca_part_persist_aggregated_component_init( int* priority,
bool enable_progress_threads, bool enable_mpi_threads);
static int mca_part_persist_aggregated_component_fini(void);

mca_part_base_component_4_0_0_t mca_part_persist_aggregated_component = {
/* First, the mca_base_component_t struct containing meta
* information about the component itself */

.partm_version = {
MCA_PART_BASE_VERSION_2_0_0,

.mca_component_name = "persist_aggregated",
MCA_BASE_MAKE_VERSION(component, OMPI_MAJOR_VERSION, OMPI_MINOR_VERSION,
OMPI_RELEASE_VERSION),
.mca_open_component = mca_part_persist_aggregated_component_open,
.mca_close_component = mca_part_persist_aggregated_component_close,
.mca_register_component_params = mca_part_persist_aggregated_component_register,
},
.partm_data = {
/* This component is not checkpoint ready */
MCA_BASE_METADATA_PARAM_NONE
},

.partm_init = mca_part_persist_aggregated_component_init,
.partm_finalize = mca_part_persist_aggregated_component_fini,
};

static int
mca_part_persist_aggregated_component_register(void)
{
ompi_part_persist_aggregated.free_list_num = 4;
(void) mca_base_component_var_register(&mca_part_persist_aggregated_component.partm_version, "free_list_num",
"Initial size of request free lists",
MCA_BASE_VAR_TYPE_INT, NULL, 0, 0,
OPAL_INFO_LVL_9,
MCA_BASE_VAR_SCOPE_READONLY,
&ompi_part_persist_aggregated.free_list_num);

ompi_part_persist_aggregated.free_list_max = -1;
(void) mca_base_component_var_register(&mca_part_persist_aggregated_component.partm_version, "free_list_max",
"Maximum size of request free lists",
MCA_BASE_VAR_TYPE_INT, NULL, 0, 0,
OPAL_INFO_LVL_9,
MCA_BASE_VAR_SCOPE_READONLY,
&ompi_part_persist_aggregated.free_list_max);

ompi_part_persist_aggregated.free_list_inc = 64;
(void) mca_base_component_var_register(&mca_part_persist_aggregated_component.partm_version, "free_list_inc",
"Number of elements to add when growing request free lists",
MCA_BASE_VAR_TYPE_INT, NULL, 0, 0,
OPAL_INFO_LVL_9,
MCA_BASE_VAR_SCOPE_READONLY,
&ompi_part_persist_aggregated.free_list_inc);

// variable for minimal internal partition size
ompi_part_persist_aggregated.min_message_size = 4096;
(void) mca_base_component_var_register(&mca_part_persist_aggregated_component.partm_version, "min_message_size",
"Minimal size of transferred messages (internal partitions)",
MCA_BASE_VAR_TYPE_UNSIGNED_INT, NULL, 0, 0,
OPAL_INFO_LVL_9,
MCA_BASE_VAR_SCOPE_READONLY,
&ompi_part_persist_aggregated.min_message_size);

// variable for maximal internal partition count
ompi_part_persist_aggregated.max_message_count = 4096;
(void) mca_base_component_var_register(&mca_part_persist_aggregated_component.partm_version, "max_message_count",
"Maximal number of transferred messages (internal partitions)",
MCA_BASE_VAR_TYPE_UNSIGNED_INT, NULL, 0, 0,
OPAL_INFO_LVL_9,
MCA_BASE_VAR_SCOPE_READONLY,
&ompi_part_persist_aggregated.max_message_count);


return OPAL_SUCCESS;
}

static void mca_part_persist_aggregated_init_lists(void)
{
opal_free_list_init (&mca_part_base_precv_requests,
sizeof(mca_part_persist_aggregated_precv_request_t),
opal_cache_line_size,
OBJ_CLASS(mca_part_persist_aggregated_precv_request_t),
0,opal_cache_line_size,
ompi_part_persist_aggregated.free_list_num,
ompi_part_persist_aggregated.free_list_max,
ompi_part_persist_aggregated.free_list_inc,
NULL, 0, NULL, NULL, NULL);
opal_free_list_init (&mca_part_base_psend_requests,
sizeof(mca_part_persist_aggregated_psend_request_t),
opal_cache_line_size,
OBJ_CLASS(mca_part_persist_aggregated_psend_request_t),
0,opal_cache_line_size,
ompi_part_persist_aggregated.free_list_num,
ompi_part_persist_aggregated.free_list_max,
ompi_part_persist_aggregated.free_list_inc,
NULL, 0, NULL, NULL, NULL);
ompi_part_persist_aggregated.progress_list = OBJ_NEW(opal_list_t);
}

static int
mca_part_persist_aggregated_component_open(void)
{
OBJ_CONSTRUCT(&ompi_part_persist_aggregated.lock, opal_mutex_t);

ompi_part_persist_aggregated.next_send_tag = 0; /**< This is a counter for send tags for the actual data transfer. */
ompi_part_persist_aggregated.next_recv_tag = 0;

mca_part_persist_aggregated_init_lists();

ompi_part_persist_aggregated.init_comms = 0;
ompi_part_persist_aggregated.init_world = -1;

ompi_part_persist_aggregated.part_comm_ready = 0;
ompi_part_persist_aggregated.part_comm_ready = 0;

ompi_part_persist_aggregated.block_entry = 0;
return OMPI_SUCCESS;
}


static int
mca_part_persist_aggregated_component_close(void)
{
OBJ_DESTRUCT(&ompi_part_persist_aggregated.lock);
return OMPI_SUCCESS;
}


static mca_part_base_module_t*
mca_part_persist_aggregated_component_init(int* priority,
bool enable_progress_threads,
bool enable_mpi_threads)
{
*priority = 1;

opal_output_verbose( 10, 0,
"in persist part priority is %d\n", *priority);

return &ompi_part_persist_aggregated.super;
}


static int
mca_part_persist_aggregated_component_fini(void)
{
return OMPI_SUCCESS;
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2006 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2020-2021 Sandia National Laboratories. All rights reserved.
* Copyright (c) 2024 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
/**
* @file
*/

#ifndef MCA_PART_RMA_COMPONENT_H
#define MCA_PART_RMA_COMPONENT_H

BEGIN_C_DECLS

/*
* PART module functions.
*/
OMPI_DECLSPEC extern mca_part_base_component_4_0_0_t mca_part_persist_aggregated_component;

END_C_DECLS

#endif
Loading
Loading