Main Page | Namespace List | Class Hierarchy | Alphabetical List | Class List | File List | Namespace Members | Class Members | File Members

MPIStream.cc

Go to the documentation of this file.
00001 // -*- C++ -*- 00002 00003 // PLearn (A C++ Machine Learning Library) 00004 // Copyright (C) 1998 Pascal Vincent 00005 // Copyright (C) 1999-2002 Pascal Vincent, Yoshua Bengio and University of Montreal 00006 // 00007 00008 // Redistribution and use in source and binary forms, with or without 00009 // modification, are permitted provided that the following conditions are met: 00010 // 00011 // 1. Redistributions of source code must retain the above copyright 00012 // notice, this list of conditions and the following disclaimer. 00013 // 00014 // 2. Redistributions in binary form must reproduce the above copyright 00015 // notice, this list of conditions and the following disclaimer in the 00016 // documentation and/or other materials provided with the distribution. 00017 // 00018 // 3. The name of the authors may not be used to endorse or promote 00019 // products derived from this software without specific prior written 00020 // permission. 00021 // 00022 // THIS SOFTWARE IS PROVIDED BY THE AUTHORS ``AS IS'' AND ANY EXPRESS OR 00023 // IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES 00024 // OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN 00025 // NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 00026 // SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED 00027 // TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 00028 // PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 00029 // LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 00030 // NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 00031 // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 00032 // 00033 // This file is part of the PLearn library. For more information on the PLearn 00034 // library, go to the PLearn Web site at www.plearn.org 00035 00036 00037 00038 00039 /* ******************************************************* 00040 * $Id: MPIStream.cc,v 1.5 2004/02/26 06:49:40 nova77 Exp $ 00041 * AUTHORS: Pascal Vincent & Yoshua Bengio 00042 * This file is part of the PLearn library. 00043 ******************************************************* */ 00044 00045 #include "MPIStream.h" 00046 00047 #if USING_MPI 00048 00049 namespace PLearn { 00050 using namespace std; 00051 00052 00053 // ****************** 00054 // ** MPIStreambuf ** 00055 // ****************** 00056 00057 MPIStreambuf::MPIStreambuf(int the_peerrank, int inbufsize) 00058 :tag(PLMPI::tag), peerrank(the_peerrank), 00059 inbuf(0), inbuf_capacity(0) 00060 { 00061 if(inbufsize>0) 00062 reserveInputBuffer(inbufsize); 00063 } 00064 00065 00066 MPIStreambuf::~MPIStreambuf() 00067 { 00068 if(inbuf) 00069 delete[] inbuf; 00070 } 00071 00072 streambuf* MPIStreambuf::setbuf(char* p, int len) 00073 { 00074 if(p && len>0) 00075 setp(p, p+len-1); // -1 because we want space to put the extra character passed to overflow() 00076 else 00077 setp(0,0); 00078 return this; 00079 } 00080 00081 int MPIStreambuf::showmanyc() 00082 { 00083 int ready; 00084 MPI_Status status; 00085 MPI_Iprobe(peerrank, tag, MPI_COMM_WORLD, &ready, &status); 00086 if(ready) 00087 { 00088 int nready; 00089 MPI_Get_count(&status, MPI_CHAR, &nready); 00090 return nready; 00091 } 00092 return -1; 00093 } 00094 00095 int MPIStreambuf::underflow() 00096 { 00097 // cerr<< "[ " << PLMPI::rank << " Entering underflow]"; 00098 int msglength; 00099 MPI_Status status; 00100 MPI_Probe(peerrank, tag, MPI_COMM_WORLD, &status); 00101 MPI_Get_count(&status, MPI_CHAR, &msglength); 00102 reserveInputBuffer(msglength); 00103 MPI_Recv(inbuf, msglength, MPI_CHAR, peerrank, tag, MPI_COMM_WORLD, &status); 00104 00105 setg(inbuf, inbuf, inbuf+msglength); 00106 // cerr<< "[ " << PLMPI::rank << " msglen=" << msglength << " Leaving underflow]"; 00107 return *inbuf; 00108 } 00109 00110 00111 // Done 00112 int MPIStreambuf::overflow(int c) 00113 { 00114 // cerr<< "[ " << PLMPI::rank << " Entering overflow]"; 00115 if(c!=EOF) 00116 { 00117 if(pbase()) // buffered mode 00118 { 00119 streamsize n = pptr() - pbase(); 00120 *pptr() = char(c); // put it in extra space 00121 if( MPI_Send(pbase(), n+1, MPI_CHAR, peerrank, tag, MPI_COMM_WORLD) != MPI_SUCCESS) 00122 return EOF; 00123 pbump(-n); 00124 } 00125 else // unbuffered mode 00126 { 00127 char tinybuf[1]; 00128 tinybuf[0] = c; 00129 if( MPI_Send(tinybuf, 1, MPI_CHAR, peerrank, tag, MPI_COMM_WORLD) != MPI_SUCCESS) 00130 return EOF; 00131 } 00132 } 00133 else // extra char is EOF, we ignore it 00134 { 00135 if(pbase()) // buffered mode 00136 { 00137 streamsize n = pptr() - pbase(); 00138 if( MPI_Send(pbase(), n, MPI_CHAR, peerrank, tag, MPI_COMM_WORLD) != MPI_SUCCESS) 00139 return EOF; 00140 pbump(-n); 00141 } 00142 } 00143 // cerr<< "[ " << PLMPI::rank << " Leaving overflow]"; 00144 return 0; 00145 } 00146 00147 int MPIStreambuf::sync() 00148 { 00149 // cerr<< "[ " << PLMPI::rank << " Entering sync]"; 00150 streamsize n = pptr() - pbase(); 00151 if(n>0) 00152 { 00153 if( MPI_Send(pbase(), n, MPI_CHAR, peerrank, tag, MPI_COMM_WORLD) != MPI_SUCCESS ) 00154 return EOF; 00155 pbump(-n); 00156 } 00157 00158 // cerr<< "[ " << PLMPI::rank << " Leaving sync]"; 00159 return 0; 00160 } 00161 00162 // Smart implementation of xsputn: 00163 // If n is greater than the buffer size we send the message directly 00164 // Thus, we don't waste time copying stuff into the buffer unnecessarily 00165 // (ex: whhen sending a long vector for ex.). 00166 streamsize MPIStreambuf::xsputn(const char* s, streamsize n) 00167 { 00168 // cerr<< "[ " << PLMPI::rank << " Entering xsputn]"; 00169 if(n>epptr()-pptr()) // n greater than buffer size! 00170 { 00171 // Let's not waste time copying stuff into the buffer, send it directly 00172 sync(); // first make sure we send what's left in the buffer 00173 if ( MPI_Send((char *)s, n, MPI_CHAR, peerrank, tag, MPI_COMM_WORLD) != MPI_SUCCESS ) 00174 return 0; 00175 } 00176 else // call the default method 00177 { 00178 streambuf::xsputn(s,n); 00179 } 00180 // cerr<< "[ " << PLMPI::rank << " Leaving xsputn]"; 00181 return n; 00182 } 00183 00184 // Smart implementation of xsgetn: 00185 // If the receive buffer is empty and the length of the next message is exactly n 00186 // we call MPI_Recv to directly fill s, rather than going through the input buffer 00187 streamsize MPIStreambuf::xsgetn(char* s, streamsize n) 00188 { 00189 // cerr<< "[ " << PLMPI::rank << " Entering xsgetn]"; 00190 int msglength = -1; 00191 MPI_Status status; 00192 if(gptr()==eback()) // receive buffer empty 00193 { 00194 MPI_Probe(peerrank, tag, MPI_COMM_WORLD, &status); 00195 MPI_Get_count(&status, MPI_CHAR, &msglength); 00196 } 00197 00198 if(msglength == n) 00199 { 00200 if( MPI_Recv(s, n, MPI_CHAR, peerrank, tag, MPI_COMM_WORLD, &status) != MPI_SUCCESS ) 00201 return 0; 00202 } 00203 else // call default method that will fill the buffer 00204 { 00205 streambuf::xsgetn(s, n); 00206 } 00207 // cerr<< "[ " << PLMPI::rank << " Leaving xsgetn]"; 00208 return n; 00209 } 00210 00211 00212 // *************** 00213 // ** MPIStream ** 00214 // *************** 00215 00216 void MPIStream::init(int the_peerrank, int inbufsize, int outbufsize) 00217 { 00218 // cerr<< "[ " << PLMPI::rank << " Entering init]"; 00219 rdbuf(new MPIStreambuf(the_peerrank, inbufsize)); 00220 if(outbufsize<=1) 00221 #if __GNUC__ < 3 && !defined(WIN32) 00222 rdbuf()->setbuf(0,0); 00223 #else 00224 rdbuf()->pubsetbuf(0,0); 00225 #endif 00226 else 00227 { 00228 outbuffer = new char[outbufsize]; 00229 #if __GNUC__ < 3 && !defined(WIN32) 00230 rdbuf()->setbuf(outbuffer,outbufsize); 00231 #else 00232 rdbuf()->pubsetbuf(outbuffer,outbufsize); 00233 #endif 00234 } 00235 // cerr<< "[ " << PLMPI::rank << " Leaving init]"; 00236 } 00237 00238 MPIStream::~MPIStream() 00239 { 00240 flush(); 00241 delete rdbuf(0); // delete MPIStreambuf 00242 if(outbuffer) 00243 delete[] outbuffer; 00244 } 00245 00246 // **************** 00247 // ** MPIStreams ** 00248 // **************** 00249 00250 MPIStreams::MPIStreams(int inbufsize, int outbufsize) 00251 { 00252 if(PLMPI::size==0) 00253 mpistreams = 0; 00254 else 00255 { 00256 mpistreams = new MPIStream[PLMPI::size]; 00257 for(int k=0; k<PLMPI::size; k++) 00258 mpistreams[k].init(k, inbufsize, outbufsize); 00259 } 00260 } 00261 00262 00263 } // end of namespace PLearn 00264 00265 // end of #if USING_MPI 00266 #endif

Generated on Tue Aug 17 15:59:04 2004 for PLearn by doxygen 1.3.7