diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt.h b/ompi/mca/osc/pt2pt/osc_pt2pt.h index 1f3c204502e..409011d32b9 100644 --- a/ompi/mca/osc/pt2pt/osc_pt2pt.h +++ b/ompi/mca/osc/pt2pt/osc_pt2pt.h @@ -8,7 +8,7 @@ * University of Stuttgart. All rights reserved. * Copyright (c) 2004-2005 The Regents of the University of California. * All rights reserved. - * Copyright (c) 2007-2015 Los Alamos National Security, LLC. All rights + * Copyright (c) 2007-2016 Los Alamos National Security, LLC. All rights * reserved. * Copyright (c) 2010 Cisco Systems, Inc. All rights reserved. * Copyright (c) 2012-2013 Sandia National Laboratories. All rights reserved. @@ -149,20 +149,20 @@ struct ompi_osc_pt2pt_module_t { uint32_t *epoch_outgoing_frag_count; /** cyclic counter for a unique tage for long messages. */ - unsigned int tag_counter; - unsigned int rtag_counter; + uint32_t tag_counter; + uint32_t rtag_counter; /* Number of outgoing fragments that have completed since the begining of time */ - uint32_t outgoing_frag_count; + volatile uint32_t outgoing_frag_count; /* Next outgoing fragment count at which we want a signal on cond */ - uint32_t outgoing_frag_signal_count; + volatile uint32_t outgoing_frag_signal_count; /* Number of incoming fragments that have completed since the begining of time */ - uint32_t active_incoming_frag_count; + volatile uint32_t active_incoming_frag_count; /* Next incoming buffer count at which we want a signal on cond */ - uint32_t active_incoming_frag_signal_count; + volatile uint32_t active_incoming_frag_signal_count; /** Number of targets locked/being locked */ unsigned int passive_target_access_epoch; @@ -409,14 +409,6 @@ int ompi_osc_pt2pt_component_irecv(ompi_osc_pt2pt_module_t *module, int tag, struct ompi_communicator_t *comm); -int ompi_osc_pt2pt_component_isend(ompi_osc_pt2pt_module_t *module, - const void *buf, - size_t count, - struct ompi_datatype_t *datatype, - int dest, - int tag, - struct ompi_communicator_t *comm); - /** * ompi_osc_pt2pt_progress_pending_acc: * @@ -639,8 +631,8 @@ static inline void osc_pt2pt_add_pending (ompi_osc_pt2pt_pending_t *pending) opal_list_append (&mca_osc_pt2pt_component.pending_operations, &pending->super)); } -#define OSC_PT2PT_FRAG_TAG 0x10000 -#define OSC_PT2PT_FRAG_MASK 0x0ffff +#define OSC_PT2PT_FRAG_TAG 0x80000 +#define OSC_PT2PT_FRAG_MASK 0x7ffff /** * get_tag: @@ -658,11 +650,8 @@ static inline int get_tag(ompi_osc_pt2pt_module_t *module) /* the LSB of the tag is used be the receiver to determine if the message is a passive or active target (ie, where to mark completion). */ - int tmp = module->tag_counter + !!(module->passive_target_access_epoch); - - module->tag_counter = (module->tag_counter + 4) & OSC_PT2PT_FRAG_MASK; - - return tmp; + int32_t tmp = OPAL_THREAD_ADD32((volatile int32_t *) &module->tag_counter, 4); + return (tmp & OSC_PT2PT_FRAG_MASK) | !!(module->passive_target_access_epoch); } static inline int get_rtag(ompi_osc_pt2pt_module_t *module) @@ -670,11 +659,8 @@ static inline int get_rtag(ompi_osc_pt2pt_module_t *module) /* the LSB of the tag is used be the receiver to determine if the message is a passive or active target (ie, where to mark completion). */ - int tmp = module->rtag_counter + !!(module->passive_target_access_epoch); - - module->rtag_counter = (module->rtag_counter + 4) & OSC_PT2PT_FRAG_MASK; - - return tmp; + int32_t tmp = OPAL_THREAD_ADD32((volatile int32_t *) &module->rtag_counter, 4); + return (tmp & OSC_PT2PT_FRAG_MASK) | !!(module->passive_target_access_epoch); } /** * ompi_osc_pt2pt_accumulate_lock: diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt_active_target.c b/ompi/mca/osc/pt2pt/osc_pt2pt_active_target.c index e169addb549..58d6b40b766 100644 --- a/ompi/mca/osc/pt2pt/osc_pt2pt_active_target.c +++ b/ompi/mca/osc/pt2pt/osc_pt2pt_active_target.c @@ -8,7 +8,7 @@ * University of Stuttgart. All rights reserved. * Copyright (c) 2004-2005 The Regents of the University of California. * All rights reserved. - * Copyright (c) 2007-2015 Los Alamos National Security, LLC. All rights + * Copyright (c) 2007-2016 Los Alamos National Security, LLC. All rights * reserved. * Copyright (c) 2010 IBM Corporation. All rights reserved. * Copyright (c) 2012-2013 Sandia National Laboratories. All rights reserved. @@ -211,7 +211,7 @@ int ompi_osc_pt2pt_start (ompi_group_t *group, int assert, ompi_win_t *win) ompi_osc_pt2pt_module_t *module = GET_MODULE(win); ompi_osc_pt2pt_sync_t *sync = &module->all_sync; - OPAL_THREAD_LOCK(&sync->lock); + OPAL_THREAD_LOCK(&module->lock); /* check if we are already in an access epoch */ if (ompi_osc_pt2pt_access_epoch_active (module)) { diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt_comm.c b/ompi/mca/osc/pt2pt/osc_pt2pt_comm.c index b22f7837bf5..a1dcfd71721 100644 --- a/ompi/mca/osc/pt2pt/osc_pt2pt_comm.c +++ b/ompi/mca/osc/pt2pt/osc_pt2pt_comm.c @@ -8,7 +8,7 @@ * University of Stuttgart. All rights reserved. * Copyright (c) 2004-2005 The Regents of the University of California. * All rights reserved. - * Copyright (c) 2007-2015 Los Alamos National Security, LLC. All rights + * Copyright (c) 2007-2016 Los Alamos National Security, LLC. All rights * reserved. * Copyright (c) 2010 Cisco Systems, Inc. All rights reserved. * Copyright (c) 2012-2013 Sandia National Laboratories. All rights reserved. @@ -34,27 +34,55 @@ #include /* progress an OSC request */ +static int ompi_osc_pt2pt_comm_complete (ompi_request_t *request) +{ + ompi_osc_pt2pt_module_t *module = + (ompi_osc_pt2pt_module_t*) request->req_complete_cb_data; + + OPAL_OUTPUT_VERBOSE((10, ompi_osc_base_framework.framework_output, + "isend_completion_cb called")); + + mark_outgoing_completion(module); + + /* put this request on the garbage colletion list */ + osc_pt2pt_gc_add_request (module, request); + + return OMPI_SUCCESS; +} + static int ompi_osc_pt2pt_req_comm_complete (ompi_request_t *request) { ompi_osc_pt2pt_request_t *pt2pt_request = (ompi_osc_pt2pt_request_t *) request->req_complete_cb_data; - ompi_osc_pt2pt_module_t *module = pt2pt_request->module; OPAL_OUTPUT_VERBOSE((10, ompi_osc_base_framework.framework_output, "ompi_osc_pt2pt_req_comm_complete called tag = %d", request->req_status.MPI_TAG)); - mark_outgoing_completion (module); - if (0 == OPAL_THREAD_ADD32(&pt2pt_request->outstanding_requests, -1)) { ompi_osc_pt2pt_request_complete (pt2pt_request, request->req_status.MPI_ERROR); } - /* put this request on the garbage colletion list */ - osc_pt2pt_gc_add_request (module, request); + return ompi_osc_pt2pt_comm_complete (request); +} - return OMPI_SUCCESS; +static inline int ompi_osc_pt2pt_data_isend (ompi_osc_pt2pt_module_t *module, const void *buf, + size_t count, ompi_datatype_t *datatype, int dest, + int tag, ompi_osc_pt2pt_request_t *request) +{ + /* increment the outgoing send count */ + ompi_osc_signal_outgoing (module, dest, 1); + + if (NULL != request) { + ++request->outstanding_requests; + return ompi_osc_pt2pt_isend_w_cb (buf, count, datatype, dest, tag, module->comm, + ompi_osc_pt2pt_req_comm_complete, request); + } + + return ompi_osc_pt2pt_isend_w_cb (buf, count, datatype, dest, tag, module->comm, + ompi_osc_pt2pt_comm_complete, module); } + static int ompi_osc_pt2pt_dt_send_complete (ompi_request_t *request) { ompi_datatype_t *datatype = (ompi_datatype_t *) request->req_complete_cb_data; @@ -282,14 +310,14 @@ static inline int ompi_osc_pt2pt_put_w_req (const void *origin_addr, int origin_ payload_len = origin_dt->super.size * origin_count; frag_len = sizeof(ompi_osc_pt2pt_header_put_t) + ddt_len + payload_len; - ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr); + ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr, false); if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { frag_len = sizeof(ompi_osc_pt2pt_header_put_t) + ddt_len; - ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr); + ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr, true); if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { /* allocate space for the header plus space to store ddt_len */ frag_len = sizeof(ompi_osc_pt2pt_header_put_t) + 8; - ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr); + ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr, true); if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { return OMPI_ERR_OUT_OF_RESOURCE; } @@ -301,9 +329,8 @@ static inline int ompi_osc_pt2pt_put_w_req (const void *origin_addr, int origin_ tag = get_tag(module); } - /* flush will be called at the end of this function. make sure all post messages have - * arrived. */ - if ((is_long_msg || request) && OMPI_OSC_PT2PT_SYNC_TYPE_PSCW == pt2pt_sync->type) { + if (is_long_msg) { + /* wait for eager sends to be active before starting a long put */ ompi_osc_pt2pt_sync_wait (pt2pt_sync); } @@ -361,18 +388,8 @@ static inline int ompi_osc_pt2pt_put_w_req (const void *origin_addr, int origin_ header->tag = tag; osc_pt2pt_hton(header, proc); - /* increase the outgoing signal count */ - ompi_osc_signal_outgoing (module, target, 1); - - if (request) { - request->outstanding_requests = 1; - ret = ompi_osc_pt2pt_isend_w_cb (origin_addr, origin_count, origin_dt, - target, tag, module->comm, ompi_osc_pt2pt_req_comm_complete, - request); - } else { - ret = ompi_osc_pt2pt_component_isend (module,origin_addr, origin_count, origin_dt, target, tag, - module->comm); - } + ret = ompi_osc_pt2pt_data_isend (module,origin_addr, origin_count, origin_dt, target, tag, + request); } } while (0); @@ -380,14 +397,7 @@ static inline int ompi_osc_pt2pt_put_w_req (const void *origin_addr, int origin_ header->base.flags |= OMPI_OSC_PT2PT_HDR_FLAG_VALID; } - ret = ompi_osc_pt2pt_frag_finish(module, frag); - - if (request || is_long_msg) { - /* need to flush now in case the caller decides to wait on the request */ - ompi_osc_pt2pt_frag_flush_target (module, target); - } - - return ret; + return ompi_osc_pt2pt_frag_finish(module, frag); } int @@ -459,14 +469,14 @@ ompi_osc_pt2pt_accumulate_w_req (const void *origin_addr, int origin_count, payload_len = origin_dt->super.size * origin_count; frag_len = sizeof(*header) + ddt_len + payload_len; - ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr); + ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr, false); if (OMPI_SUCCESS != ret) { frag_len = sizeof(*header) + ddt_len; - ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr); + ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr, true); if (OMPI_SUCCESS != ret) { /* allocate space for the header plus space to store ddt_len */ frag_len = sizeof(*header) + 8; - ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr); + ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr, true); if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { return OMPI_ERR_OUT_OF_RESOURCE; } @@ -478,9 +488,8 @@ ompi_osc_pt2pt_accumulate_w_req (const void *origin_addr, int origin_count, tag = get_rtag (module); } - /* flush will be called at the end of this function. make sure all post messages have - * arrived. */ - if ((is_long_msg || request) && OMPI_OSC_PT2PT_SYNC_TYPE_PSCW == pt2pt_sync->type) { + if (is_long_msg || is_long_datatype) { + /* wait for synchronization before posting a long message */ ompi_osc_pt2pt_sync_wait (pt2pt_sync); } @@ -538,18 +547,8 @@ ompi_osc_pt2pt_accumulate_w_req (const void *origin_addr, int origin_count, OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output, "acc: starting long accumulate with tag %d", tag)); - /* increment the outgoing send count */ - ompi_osc_signal_outgoing (module, target, 1); - - if (request) { - request->outstanding_requests = 1; - ret = ompi_osc_pt2pt_isend_w_cb (origin_addr, origin_count, origin_dt, - target, tag, module->comm, ompi_osc_pt2pt_req_comm_complete, - request); - } else { - ret = ompi_osc_pt2pt_component_isend (module, origin_addr, origin_count, origin_dt, target, tag, - module->comm); - } + ret = ompi_osc_pt2pt_data_isend (module, origin_addr, origin_count, origin_dt, target, tag, + request); } } while (0); @@ -561,14 +560,7 @@ ompi_osc_pt2pt_accumulate_w_req (const void *origin_addr, int origin_count, header->base.flags |= OMPI_OSC_PT2PT_HDR_FLAG_VALID; } - ret = ompi_osc_pt2pt_frag_finish(module, frag); - - if (is_long_msg || request) { - /* need to flush now in case the caller decides to wait on the request */ - ompi_osc_pt2pt_frag_flush_target (module, target); - } - - return ret; + return ompi_osc_pt2pt_frag_finish(module, frag); } int @@ -639,7 +631,7 @@ int ompi_osc_pt2pt_compare_and_swap (const void *origin_addr, const void *compar } frag_len = sizeof(ompi_osc_pt2pt_header_cswap_t) + ddt_len + payload_len; - ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr); + ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr, false); if (OMPI_SUCCESS != ret) { return OMPI_ERR_OUT_OF_RESOURCE; } @@ -787,11 +779,11 @@ static inline int ompi_osc_pt2pt_rget_internal (void *origin_addr, int origin_co ddt_len = ompi_datatype_pack_description_length(target_dt); frag_len = sizeof(ompi_osc_pt2pt_header_get_t) + ddt_len; - ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr); + ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr, false); if (OMPI_SUCCESS != ret) { /* allocate space for the header plus space to store ddt_len */ frag_len = sizeof(ompi_osc_pt2pt_header_put_t) + 8; - ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr); + ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr, false); if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { return OMPI_ERR_OUT_OF_RESOURCE; } @@ -804,9 +796,8 @@ static inline int ompi_osc_pt2pt_rget_internal (void *origin_addr, int origin_co /* for bookkeeping the get is "outgoing" */ ompi_osc_signal_outgoing (module, target, 1); - /* flush will be called at the end of this function. make sure all post messages have - * arrived. */ - if (!release_req && OMPI_OSC_PT2PT_SYNC_TYPE_PSCW == pt2pt_sync->type) { + if (!release_req) { + /* wait for epoch to begin before starting rget operation */ ompi_osc_pt2pt_sync_wait (pt2pt_sync); } @@ -857,14 +848,7 @@ static inline int ompi_osc_pt2pt_rget_internal (void *origin_addr, int origin_co *request = &pt2pt_request->super; } - ret = ompi_osc_pt2pt_frag_finish(module, frag); - - if (!release_req) { - /* need to flush now in case the caller decides to wait on the request */ - ompi_osc_pt2pt_frag_flush_target (module, target); - } - - return ret; + return ompi_osc_pt2pt_frag_finish(module, frag); } int ompi_osc_pt2pt_rget (void *origin_addr, int origin_count, struct ompi_datatype_t *origin_dt, @@ -1003,14 +987,14 @@ int ompi_osc_pt2pt_rget_accumulate_internal (const void *origin_addr, int origin } frag_len = sizeof(*header) + ddt_len + payload_len; - ret = ompi_osc_pt2pt_frag_alloc(module, target_rank, frag_len, &frag, &ptr); + ret = ompi_osc_pt2pt_frag_alloc(module, target_rank, frag_len, &frag, &ptr, false); if (OMPI_SUCCESS != ret) { frag_len = sizeof(*header) + ddt_len; - ret = ompi_osc_pt2pt_frag_alloc(module, target_rank, frag_len, &frag, &ptr); + ret = ompi_osc_pt2pt_frag_alloc(module, target_rank, frag_len, &frag, &ptr, true); if (OMPI_SUCCESS != ret) { /* allocate space for the header plus space to store ddt_len */ frag_len = sizeof(*header) + 8; - ret = ompi_osc_pt2pt_frag_alloc(module, target_rank, frag_len, &frag, &ptr); + ret = ompi_osc_pt2pt_frag_alloc(module, target_rank, frag_len, &frag, &ptr, true); if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { return OMPI_ERR_OUT_OF_RESOURCE; } @@ -1030,9 +1014,8 @@ int ompi_osc_pt2pt_rget_accumulate_internal (const void *origin_addr, int origin /* increment the number of outgoing fragments */ ompi_osc_signal_outgoing (module, target_rank, pt2pt_request->outstanding_requests); - /* flush will be called at the end of this function. make sure all post messages have - * arrived. */ - if (!release_req && OMPI_OSC_PT2PT_SYNC_TYPE_PSCW == pt2pt_sync->type) { + if (!release_req) { + /* wait for epoch to begin before starting operation */ ompi_osc_pt2pt_sync_wait (pt2pt_sync); } @@ -1100,14 +1083,7 @@ int ompi_osc_pt2pt_rget_accumulate_internal (const void *origin_addr, int origin *request = (ompi_request_t *) pt2pt_request; } - ret = ompi_osc_pt2pt_frag_finish(module, frag); - - if (!release_req) { - /* need to flush now in case the caller decides to wait on the request */ - ompi_osc_pt2pt_frag_flush_target (module, target_rank); - } - - return ret; + return ompi_osc_pt2pt_frag_finish(module, frag); } int ompi_osc_pt2pt_get_accumulate(const void *origin_addr, int origin_count, diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt_data_move.c b/ompi/mca/osc/pt2pt/osc_pt2pt_data_move.c index 77ea7f861fa..418c6c4ec95 100644 --- a/ompi/mca/osc/pt2pt/osc_pt2pt_data_move.c +++ b/ompi/mca/osc/pt2pt/osc_pt2pt_data_move.c @@ -8,7 +8,7 @@ * University of Stuttgart. All rights reserved. * Copyright (c) 2004-2005 The Regents of the University of California. * All rights reserved. - * Copyright (c) 2007-2015 Los Alamos National Security, LLC. All rights + * Copyright (c) 2007-2016 Los Alamos National Security, LLC. All rights * reserved. * Copyright (c) 2009-2011 Oracle and/or its affiliates. All rights reserved. * Copyright (c) 2012-2013 Sandia National Laboratories. All rights reserved. @@ -213,7 +213,7 @@ int ompi_osc_pt2pt_control_send (ompi_osc_pt2pt_module_t *module, int target, char *ptr; int ret; - ret = ompi_osc_pt2pt_frag_alloc(module, target, len, &frag, &ptr); + ret = ompi_osc_pt2pt_frag_alloc(module, target, len, &frag, &ptr, false); if (OPAL_LIKELY(OMPI_SUCCESS == ret)) { memcpy (ptr, data, len); @@ -1683,33 +1683,6 @@ int ompi_osc_pt2pt_component_irecv (ompi_osc_pt2pt_module_t *module, void *buf, osc_pt2pt_incoming_req_complete, module); } - -static int -isend_completion_cb(ompi_request_t *request) -{ - ompi_osc_pt2pt_module_t *module = - (ompi_osc_pt2pt_module_t*) request->req_complete_cb_data; - - OPAL_OUTPUT_VERBOSE((10, ompi_osc_base_framework.framework_output, - "isend_completion_cb called")); - - mark_outgoing_completion(module); - - /* put this request on the garbage colletion list */ - osc_pt2pt_gc_add_request (module, request); - - return OMPI_SUCCESS; -} - - -int ompi_osc_pt2pt_component_isend (ompi_osc_pt2pt_module_t *module, const void *buf, - size_t count, struct ompi_datatype_t *datatype, - int dest, int tag, struct ompi_communicator_t *comm) -{ - return ompi_osc_pt2pt_isend_w_cb (buf, count, datatype, dest, tag, comm, - isend_completion_cb, module); -} - int ompi_osc_pt2pt_isend_w_cb (const void *ptr, int count, ompi_datatype_t *datatype, int target, int tag, ompi_communicator_t *comm, ompi_request_complete_fn_t cb, void *ctx) { diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt_frag.h b/ompi/mca/osc/pt2pt/osc_pt2pt_frag.h index 515ce82fdf8..f55e6cba09d 100644 --- a/ompi/mca/osc/pt2pt/osc_pt2pt_frag.h +++ b/ompi/mca/osc/pt2pt/osc_pt2pt_frag.h @@ -1,7 +1,7 @@ /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */ /* * Copyright (c) 2012 Sandia National Laboratories. All rights reserved. - * Copyright (c) 2014-2015 Los Alamos National Security, LLC. All rights + * Copyright (c) 2014-2016 Los Alamos National Security, LLC. All rights * reserved. * $COPYRIGHT$ * @@ -33,7 +33,8 @@ struct ompi_osc_pt2pt_frag_t { char *top; /* Number of operations which have started writing into the frag, but not yet completed doing so */ - int32_t pending; + volatile int32_t pending; + int32_t pending_long_sends; ompi_osc_pt2pt_frag_header_t *header; ompi_osc_pt2pt_module_t *module; }; @@ -44,12 +45,24 @@ extern int ompi_osc_pt2pt_frag_start(ompi_osc_pt2pt_module_t *module, ompi_osc_p extern int ompi_osc_pt2pt_frag_flush_target(ompi_osc_pt2pt_module_t *module, int target); extern int ompi_osc_pt2pt_frag_flush_all(ompi_osc_pt2pt_module_t *module); +static inline int ompi_osc_pt2pt_frag_finish (ompi_osc_pt2pt_module_t *module, + ompi_osc_pt2pt_frag_t* buffer) +{ + opal_atomic_wmb (); + if (0 == OPAL_THREAD_ADD32(&buffer->pending, -1)) { + opal_atomic_mb (); + return ompi_osc_pt2pt_frag_start(module, buffer); + } + + return OMPI_SUCCESS; +} + /* * Note: module lock must be held during this operation */ static inline int ompi_osc_pt2pt_frag_alloc (ompi_osc_pt2pt_module_t *module, int target, size_t request_len, ompi_osc_pt2pt_frag_t **buffer, - char **ptr) + char **ptr, bool long_send) { ompi_osc_pt2pt_peer_t *peer = ompi_osc_pt2pt_peer_lookup (module, target); ompi_osc_pt2pt_frag_t *curr; @@ -66,29 +79,21 @@ static inline int ompi_osc_pt2pt_frag_alloc (ompi_osc_pt2pt_module_t *module, in OPAL_THREAD_LOCK(&module->lock); curr = peer->active_frag; - if (NULL == curr || curr->remain_len < request_len) { - opal_free_list_item_t *item = NULL; - - if (NULL != curr) { - curr->remain_len = 0; - peer->active_frag = NULL; - opal_atomic_mb (); - + if (NULL == curr || curr->remain_len < request_len || (long_send && curr->pending_long_sends == 32)) { + if (NULL != curr && opal_atomic_cmpset (&peer->active_frag, curr, NULL)) { /* If there's something pending, the pending finish will start the buffer. Otherwise, we need to start it now. */ - if (0 == OPAL_THREAD_ADD32(&curr->pending, -1)) { - ret = ompi_osc_pt2pt_frag_start(module, curr); - if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { - return ret; - } + ret = ompi_osc_pt2pt_frag_finish (module, curr); + if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { + OPAL_THREAD_UNLOCK(&module->lock); + return ret; } } - item = opal_free_list_get (&mca_osc_pt2pt_component.frags); - if (OPAL_UNLIKELY(NULL == item)) { + curr = (ompi_osc_pt2pt_frag_t *) opal_free_list_get (&mca_osc_pt2pt_component.frags); + if (OPAL_UNLIKELY(NULL == curr)) { return OMPI_ERR_OUT_OF_RESOURCE; } - curr = peer->active_frag = (ompi_osc_pt2pt_frag_t*) item; curr->target = target; @@ -96,7 +101,8 @@ static inline int ompi_osc_pt2pt_frag_alloc (ompi_osc_pt2pt_module_t *module, in curr->top = (char*) (curr->header + 1); curr->remain_len = mca_osc_pt2pt_component.buffer_size; curr->module = module; - curr->pending = 1; + curr->pending = 2; + curr->pending_long_sends = long_send; curr->header->base.type = OMPI_OSC_PT2PT_HDR_TYPE_FRAG; curr->header->base.flags = OMPI_OSC_PT2PT_HDR_FLAG_VALID; @@ -104,12 +110,18 @@ static inline int ompi_osc_pt2pt_frag_alloc (ompi_osc_pt2pt_module_t *module, in curr->header->base.flags |= OMPI_OSC_PT2PT_HDR_FLAG_PASSIVE_TARGET; } curr->header->source = ompi_comm_rank(module->comm); - curr->header->num_ops = 0; + curr->header->num_ops = 1; if (curr->remain_len < request_len) { OPAL_THREAD_UNLOCK(&module->lock); return OMPI_ERR_TEMP_OUT_OF_RESOURCE; } + + peer->active_frag = curr; + } else { + OPAL_THREAD_ADD32(&curr->pending, 1); + OPAL_THREAD_ADD32(&curr->header->num_ops, 1); + curr->pending_long_sends += long_send; } *ptr = curr->top; @@ -117,24 +129,8 @@ static inline int ompi_osc_pt2pt_frag_alloc (ompi_osc_pt2pt_module_t *module, in curr->top += request_len; curr->remain_len -= request_len; - OPAL_THREAD_UNLOCK(&module->lock); - OPAL_THREAD_ADD32(&curr->pending, 1); - OPAL_THREAD_ADD32(&curr->header->num_ops, 1); - - return OMPI_SUCCESS; -} - - -/* - * Note: module lock must be held for this operation - */ -static inline int ompi_osc_pt2pt_frag_finish(ompi_osc_pt2pt_module_t *module, - ompi_osc_pt2pt_frag_t* buffer) -{ - if (0 == OPAL_THREAD_ADD32(&buffer->pending, -1)) { - return ompi_osc_pt2pt_frag_start(module, buffer); - } + OPAL_THREAD_UNLOCK(&module->lock); return OMPI_SUCCESS; } diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt_passive_target.c b/ompi/mca/osc/pt2pt/osc_pt2pt_passive_target.c index 0ddc4cf326e..099aa564624 100644 --- a/ompi/mca/osc/pt2pt/osc_pt2pt_passive_target.c +++ b/ompi/mca/osc/pt2pt/osc_pt2pt_passive_target.c @@ -8,7 +8,7 @@ * University of Stuttgart. All rights reserved. * Copyright (c) 2004-2005 The Regents of the University of California. * All rights reserved. - * Copyright (c) 2007-2015 Los Alamos National Security, LLC. All rights + * Copyright (c) 2007-2016 Los Alamos National Security, LLC. All rights * reserved. * Copyright (c) 2010 IBM Corporation. All rights reserved. * Copyright (c) 2012-2013 Sandia National Laboratories. All rights reserved. @@ -244,6 +244,8 @@ static int ompi_osc_pt2pt_lock_internal_execute (ompi_osc_pt2pt_module_t *module } } + } else { + lock->eager_send_active = true; } return OMPI_SUCCESS; diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt_request.c b/ompi/mca/osc/pt2pt/osc_pt2pt_request.c index eddccf5b426..6741036e110 100644 --- a/ompi/mca/osc/pt2pt/osc_pt2pt_request.c +++ b/ompi/mca/osc/pt2pt/osc_pt2pt_request.c @@ -51,6 +51,7 @@ request_construct(ompi_osc_pt2pt_request_t *request) request->super.req_status._cancelled = 0; request->super.req_free = request_free; request->super.req_cancel = request_cancel; + request->outstanding_requests = 0; } OBJ_CLASS_INSTANCE(ompi_osc_pt2pt_request_t, diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt_request.h b/ompi/mca/osc/pt2pt/osc_pt2pt_request.h index 07b9d53093e..dee5c86892d 100644 --- a/ompi/mca/osc/pt2pt/osc_pt2pt_request.h +++ b/ompi/mca/osc/pt2pt/osc_pt2pt_request.h @@ -1,7 +1,7 @@ /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */ /* * Copyright (c) 2012 Sandia National Laboratories. All rights reserved. - * Copyright (c) 2014-2015 Los Alamos National Security, LLC. All rights + * Copyright (c) 2014-2016 Los Alamos National Security, LLC. All rights * reserved. * Copyright (c) 2015 Research Organization for Information Science * and Technology (RIST). All rights reserved. @@ -57,6 +57,7 @@ OBJ_CLASS_DECLARATION(ompi_osc_pt2pt_request_t); #define OMPI_OSC_PT2PT_REQUEST_RETURN(req) \ do { \ OMPI_REQUEST_FINI(&(req)->super); \ + (req)->outstanding_requests = 0; \ opal_free_list_return (&mca_osc_pt2pt_component.requests, \ (opal_free_list_item_t *) (req)); \ } while (0) diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt_sync.h b/ompi/mca/osc/pt2pt/osc_pt2pt_sync.h index eee29645c22..f4e4adcae0a 100644 --- a/ompi/mca/osc/pt2pt/osc_pt2pt_sync.h +++ b/ompi/mca/osc/pt2pt/osc_pt2pt_sync.h @@ -1,6 +1,6 @@ /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */ /* - * Copyright (c) 2015 Los Alamos National Security, LLC. All rights + * Copyright (c) 2015-2016 Los Alamos National Security, LLC. All rights * reserved. * $COPYRIGHT$ * @@ -163,8 +163,10 @@ static inline void ompi_osc_pt2pt_sync_expected (ompi_osc_pt2pt_sync_t *sync) { int32_t new_value = OPAL_THREAD_ADD32 (&sync->sync_expected, -1); if (0 == new_value) { + OPAL_THREAD_LOCK(&sync->lock); sync->eager_send_active = true; opal_condition_broadcast (&sync->cond); + OPAL_THREAD_UNLOCK(&sync->lock); } }