We consider the one-dimensional (1D) diffusion equation in a periodic domain
,
where is the diffusion coefficient. Finite differentiations of derivatives yield the expression,
.
is the time step and is the grid spacing. Space and time are discretized by , where and are the origins (nothing depends on them). denotes the value of on the grid at the time step . By solving the equation for , we obtain
.
The expression means that we can calculate at the next time step from at the current time step . This finite difference solution of the 1D diffusion equation
is coded by Fortran 90 as follows,
(download sample source code:
diffuse_serial.f90)
1 program diffusion_serial
2 implicit none
3 integer :: NX0=512 ! grid points [input]
4 integer :: NT=100000 ! iterations [input]
5 character (len=100) :: DATAFILE ! data file name [input]
6 integer, parameter :: NBX=1 ! boundary grid points
7 integer, parameter :: NOUT=1000 ! output size
8 real, parameter :: DC=1. ! diffusion coefficient
9 real, parameter :: LENGTH=2. ! domain size [-LENGTH/2:LENGTH/2]
10 integer :: nx
11 integer :: ix,it
12 real, allocatable :: f(:),df(:),x(:)
13 real, allocatable :: fp(:),xp(:)
14 real :: dx
15 real :: t,dt
16 real :: pi
17 real :: fmin,fmax
18 real :: total,total0
19 character (len=100), parameter :: MINMAXFILE='minmax.dat'
20 real, parameter :: INFINITY=1.e15
21 namelist /input/ NX0,NT,DATAFILE
22 namelist /minmax/ fmin,fmax
23 ! input
24 read(5,input)
25 DATAFILE=trim(DATAFILE)
26 nx=NX0+2*NBX
27 allocate(f(nx),df(nx),x(nx))
28 allocate(fp(NX0+1),xp(NX0+1))
29 pi = 4.*atan(1.)
30 dx = LENGTH/NX0
31 do ix = 1, nx
32 x(ix) = dx*(ix-NBX-.5) - .5*LENGTH
33 end do
34 xp(1:NX0+1)= .5*(x(NBX:NX0+NBX)+x(NBX+1:NX0+NBX+1))
35 dt = .5 * dx * dx / DC
36 t = 0.
37 !
38 ! initialize array
39 !
40 f(1:nx)=0.
41 df(1:nx)=0.
42 !
43 ! initial condition
44 !
45 f(1:nx) = .5*(cos(pi*x(1:nx))+1.)
46 fp(1:NX0+1)= .5*(f(NBX:NX0+NBX)+f(NBX+1:NX0+NBX+1))
47 fmin= INFINITY
48 fmax=-INFINITY
49 do ix=1,NX0+1
50 fmin=min(fmin,fp(ix))
51 fmax=max(fmax,fp(ix))
52 end do
53 !
54 ! write initial data
55 !
56 open(10,file=DATAFILE,form='unformatted')
57 write(10) xp
58 write(10) t,fp
59 total=sum(f(1+NBX:NX0+NBX))*dx
60 total0=total
61 write(6,'(3(a,f12.4),a)') &
62 & 'time = ',t, &
63 & ', total = ',total, &
64 & ', difference = ',(total0-total)/total0*100,' [%]'
65 !
66 ! main routine
67 !
68 do it = 1, NT
69
70 df(1+NBX:NX0+NBX) = DC*dt/dx**2 * &
71 & ( f(2+NBX:NX0+NBX+1) - 2. * f(1+NBX:NX0+NBX) + f(NBX:NX0+NBX-1) )
72 f(1+NBX:NX0+NBX) = f(1+NBX:NX0+NBX) + df(1+NBX:NX0+NBX)
73 f( 1: NBX)=f(NX0+ 1:NX0+ NBX)
74 f(NX0+NBX+1:NX0+NBX+NBX)=f( NBX+1: NBX+NBX)
75 !
76 ! output data
77 !
78 t = t + dt
79 if(mod(it,NOUT).eq.0) then
80 fp(1:NX0+1)= .5*(f(NBX:NX0+NBX)+f(NBX+1:NX0+NBX+1))
81 do ix=1,NX0+1
82 fmin=min(fmin,fp(ix))
83 fmax=max(fmax,fp(ix))
84 end do
85 write(10) t,fp
86 total=sum(f(1+NBX:NX0+NBX))*dx
87 write(6,'(3(a,f12.4),a)') &
88 & 'time = ',t, &
89 & ', total = ',total, &
90 & ', difference = ',(total0-total)/total0*100.,' [%]'
91 end if
92 end do
93 close(10)
94 open(20,file=MINMAXFILE)
95 write(20,minmax)
96 close(20)
97 stop
98 end program diffusion_serial
Grid arrangement in the program is shown in Fig. 1. The domain size is given by LENGTH, and the origin of space coordinate is chosen to be . The number of grids in the domain and outside the domain are NX0 and NBX, respectively, then the total number of grids becomes . The grids outside the domain are used to calculate the value on the boundary grids. To evaluate the value on the boundary grid 1+NBX, two neighboring grids data (NBX and NBX+2) are required. In this case, NBX=1 is sufficient. Figure 1 shows more general case in which more grids outside the domain are necessary for higher order finite differentiations. In the grid arrangement shown here, periodic boundary condition is imposed by identifying the data on grids (1:NBX) [(NX0+NBX+1:NX0+NBX+NBX)] with those on grids (NX0+1:NX0+NBX) [(NBX+1:NBX+NBX)].

This program also evaluates the total mass,
,
which conserves during time evolution for the periodic boundary case. Conservation of is one of a measure of correctness of the numerical solution. The following is a standard output of the program,
# ./diffuse_serial < input.diffuse time = 0.0000, total = 1.0000, difference = 0.0000 [%] time = 0.1221, total = 1.0000, difference = 0.0000 [%] time = 0.2441, total = 1.0000, difference = 0.0000 [%] time = 0.3662, total = 1.0000, difference = 0.0000 [%] time = 0.4883, total = 1.0000, difference = 0.0000 [%] time = 0.6104, total = 1.0000, difference = 0.0000 [%]
"difference [%]" shows the difference of the total mass from the initial value. The solution looks very good! Figure 2 shows time evolution of the solution

Parallelization is to distribute uncorrelated tasks to different processors, and to perform those tasks by multiple processors at the same time. Ideal parallelization can reduce elapse time by although ideal means practically impossible (there should be data communications among processors, unbalance of task distribution, unparallelizable tasks, ans so on). In the problem considered here, procedures to calculate is parallelizable because only values of 's on neighboring grids are necessary. We are going to distribute to some (or many) processors. Let the number of processors and the number of grids in the domain. processor () is responsible for to grids such that
, ,
where is a function which returns larger or , and gives round up of . The following figure shows a schematic drawing how data are distributed.

To calculate , it is necessary to have neighboring data and . However, each processor does not have data required to evaluate the value of the edge grids. For example, suppose and , then, the first to the forth grids are assigned to the first processor, and the fifth grid data is missing to calculate on the forth grid. If the periodic boundary condition is considered, processor 1 also need 12th grid data which is regarded as equivalent to 0th grid. processor 2 needs 4th and 9th grids data at each time step, and processor 3 needs 1st and 8th grids data.

These data transfers among processors are implemented by the help
of the MPI library. The following is a possible modification to
parallelize by the MPI library,
(download sample source code: diffuse_mpi.F90)
1 program diffusion_mpi
2 use mpi
3 implicit none
4 integer :: NX0=512 ! grid points [input]
5 integer :: NT=100000 ! iterations [input]
6 character (len=100) :: DATAFILE ! data file name [input]
7 integer, parameter :: NBX=1 ! boundary grid points
8 integer, parameter :: NOUT=1000 ! output size
9 real, parameter :: DC=1. ! diffusion coefficient
10 real, parameter :: LENGTH=2. ! domain size [-LENGTH/2:LENGTH/2]
11 integer :: nx
12 integer :: ix,it
13 real, allocatable :: f(:),df(:),x(:)
14 real, allocatable :: fp(:),xp(:)
15 real :: dx
16 real :: t,dt
17 real :: pi
18 real :: fmin,fmax
19 real :: total,total0
20 character (len=100), parameter :: MINMAXFILE='minmax.dat'
21 real, parameter :: INFINITY=1.e15
22 integer, parameter :: rootproc=0
23 integer :: myid=rootproc,nps=1,ip,nrange
24 integer, allocatable :: npmin(:),npmax(:),npbmin(:),npbmax(:)
25 integer :: ierr
26 integer :: p_send,p_recv
27 integer :: mpirealtype,tag=0,status(mpi_status_size)
28 namelist /input/ NX0,NT,DATAFILE
29 namelist /minmax/ fmin,fmax
30 call mpi_init(ierr)
31 call mpi_comm_rank(mpi_comm_world,myid,ierr)
32 call mpi_comm_size(mpi_comm_world,nps,ierr)
33 mpirealtype=mpi_real
34 #ifdef DBLE
35 mpirealtype=mpi_double_precision
36 #endif
37 allocate(npmin(0:nps-1),npmax(0:nps-1))
38 allocate(npbmin(0:nps-1),npbmax(0:nps-1))
39 ! input
40 if(myid.eq.rootproc) read(5,input)
41 call mpi_bcast(NX0, 1,mpi_integer, rootproc,mpi_comm_world,ierr)
42 call mpi_bcast(NT, 1,mpi_integer, rootproc,mpi_comm_world,ierr)
43 call mpi_bcast(DATAFILE,100,mpi_character,rootproc,mpi_comm_world,ierr)
44 DATAFILE=trim(DATAFILE)
45 nrange=NX0/nps
46 do ip=0,nps-1
47 npmin(ip)=nrange*ip+1 +NBX
48 npmax(ip)=nrange*(ip+1)+NBX
49 end do
50 npmax(nps-1)=NX0+NBX
51 npbmin(0:nps-1)=npmin(0:nps-1)-NBX
52 npbmax(0:nps-1)=npmax(0:nps-1)+NBX
53 nx=NX0+2*NBX
54 allocate(f(nx),df(nx),x(nx))
55 allocate(fp(NX0+1),xp(NX0+1))
56 pi = 4.*atan(1.)
57 dx = LENGTH/NX0
58 do ix = 1, nx
59 x(ix) = dx*(ix-NBX-.5) - .5*LENGTH
60 end do
61 xp(1:NX0+1)= .5*(x(NBX:NX0+NBX)+x(NBX+1:NX0+NBX+1))
62 dt = .5 * dx * dx / DC
63 t = 0.
64 !
65 ! initialize array
66 !
67 f(1:nx)=0.
68 df(1:nx)=0.
69 !
70 ! initial condition
71 !
72 f(1:nx) = .5*(cos(pi*x(1:nx))+1.)
73 fp(1:NX0+1)= .5*(f(NBX:NX0+NBX)+f(NBX+1:NX0+NBX+1))
74 fmin= INFINITY
75 fmax=-INFINITY
76 do ix=1,NX0+1
77 fmin=min(fmin,fp(ix))
78 fmax=max(fmax,fp(ix))
79 end do
80 !
81 ! write initial data
82 !
83 if(myid.eq.rootproc) then
84 open(10,file=DATAFILE,form='unformatted')
85 write(10) xp
86 write(10) t,fp
87 total=sum(f(1+NBX:NX0+NBX))*dx
88 total0=total
89 write(6,'(3(a,f12.4),a)') &
90 & 'time = ',t, &
91 & ', total = ',total, &
92 & ', difference = ',(total0-total)/total0*100,' [%]'
93 endif
94 !
95 ! main routine
96 !
97 do it = 1, NT
98 df(npmin(myid):npmax(myid)) = DC*dt/dx**2 * &
99 & ( f(npmin(myid)+1:npmax(myid)+1) &
100 & - 2.*f(npmin(myid) :npmax(myid)) &
101 & + f(npmin(myid)-1:npmax(myid)-1) )
102 f(npmin(myid):npmax(myid)) = &
103 & f(npmin(myid):npmax(myid)) + df(npmin(myid):npmax(myid))
104 if(nps.eq.1) then
105 f(npbmin(myid):npmin(myid)-1)=f(npmax(myid)-NBX+1:npbmax(myid))
106 f(npmax(myid)+1:npbmax(myid))=f(npmin(myid):npmin(myid)+NBX-1)
107 else
108 !
109 ! Periodic boundary
110 ! Processor 0 sends f(NBX+1:NBX+NBX) and
111 ! Processor nps-1 receives/stores it in f(NX0+NBX+1:NX0+NBX+NBX)
112 !
113 p_send=mpi_proc_null
114 p_recv=mpi_proc_null
115 if(myid.eq.0) p_send=nps-1
116 if(myid.eq.nps-1) p_recv=0
117 call mpi_send(f(1+NBX),NBX,mpirealtype, &
118 & p_send,tag,mpi_comm_world,ierr)
119 call mpi_recv(f(NX0+NBX+1),NBX,mpirealtype, &
120 & p_recv,tag,mpi_comm_world,status,ierr)
121 !
122 ! Periodic boundary
123 ! Processor nps-1 sends f(NX0+1:NX0+NBX) and
124 ! Processor 0 receives/stores it in f(1:NBX)
125 !
126 p_send=mpi_proc_null
127 p_recv=mpi_proc_null
128 if(myid.eq.0) p_recv=nps-1
129 if(myid.eq.nps-1) p_send=0
130 call mpi_send(f(NX0+1),NBX,mpirealtype, &
131 & p_send,tag,mpi_comm_world,ierr)
132 call mpi_recv(f(1),NBX,mpirealtype, &
133 & p_recv,tag,mpi_comm_world,status,ierr)
134 !
135 ! Shift communications
136 ! send/receive boundary data to right/from left
137 !
138 p_send=myid+1
139 p_recv=myid-1
140 if(myid.eq.0) p_recv=mpi_proc_null
141 if(myid.eq.nps-1) p_send=mpi_proc_null
142 call mpi_send(f(npmax(myid)-NBX+1),NBX,mpirealtype, &
143 & p_send,tag,mpi_comm_world,ierr)
144 call mpi_recv(f(npmin(myid)-NBX),NBX,mpirealtype, &
145 & p_recv,tag,mpi_comm_world,status,ierr)
146 !
147 ! Shift communications
148 ! send/receive boundary data to left/from right
149 !
150 p_send=myid-1
151 p_recv=myid+1
152 if(myid.eq.0) p_send=mpi_proc_null
153 if(myid.eq.nps-1) p_recv=mpi_proc_null
154 call mpi_send(f(npmin(myid)),NBX,mpirealtype, &
155 & p_send,tag,mpi_comm_world,ierr)
156 call mpi_recv(f(npmax(myid)+1),NBX,mpirealtype, &
157 & p_recv,tag,mpi_comm_world,status,ierr)
158 endif
159 !
160 ! output data
161 !
162 t = t + dt
163 if(mod(it,NOUT).eq.0) then
164 if(nps.gt.1) then
165 call mpi_gather( &
166 & f(npmin(myid)),npmax(myid)-npmin(myid)+1,mpirealtype, &
167 & f(1+NBX), npmax(myid)-npmin(myid)+1,mpirealtype, &
168 & rootproc,mpi_comm_world,ierr)
169 p_send=mpi_proc_null
170 p_recv=mpi_proc_null
171 if(myid.eq.rootproc) p_recv=nps-1
172 if(myid.eq.nps-1 ) p_send=rootproc
173 call mpi_send(f(NX0+NBX+1),NBX,mpirealtype, &
174 & p_send,tag,mpi_comm_world,ierr)
175 call mpi_recv(f(NX0+NBX+1),NBX,mpirealtype, &
176 & p_recv,tag,mpi_comm_world,status,ierr)
177 endif
178 if(myid.eq.rootproc) then
179 fp(1:NX0+1)= .5*(f(NBX:NX0+NBX)+f(NBX+1:NX0+NBX+1))
180 do ix=1,NX0+1
181 fmin=min(fmin,fp(ix))
182 fmax=max(fmax,fp(ix))
183 end do
184 write(10) t,fp
185 total=sum(f(1+NBX:NX0+NBX))*dx
186 write(6,'(3(a,f12.4),a)') &
187 & 'time = ',t, &
188 & ', total = ',total, &
189 & ', difference = ',(total0-total)/total0*100.,' [%]'
190 endif
191 end if
192 end do
193 close(10)
194 if(myid.eq.rootproc) then
195 open(20,file=MINMAXFILE)
196 write(20,minmax)
197 close(20)
198 endif
199 call mpi_finalize(ierr)
200 stop
201 end program diffusion_mpi
Let us see what have been changed in the MPI source code (important parts are written in red.) First of all, we must loudly proclaim that we are in a "parallel world". This is done by "use mpi" statement in line 2. The module mpi enables us to access MPI subroutines and predefined parameters. A parallelized part of the MPI source code should be surrounded by the subroutines mpi_init and mpi_finalize. In between those statements, we can use multiple processors. After moving into the parallel world, what each process has to do next is to know who it is (processor rank) [mpi_comm_rank] and the total number of processors [mpi_comm_size]. In lines from 45 to 52, ranges of grids which each processor is responsible for are determined. Using these ranges, tasks to calculate are distributed on processors. As explained earlier, each processor needs to communicate neighboring processors to exchanged neighboring grid data. These communications are performed by subroutine mpi_send and mpi_recv. The subroutine mpi_gather and the following send/recv operations just collect distributed data to one processor for data output. It is easier and safer to force only one processor to do data input/output. That is why the standard input data are read by only one processor and are broadcasted [mpi_bcast] to other processors.
# cpp -DDBLE diffuse_mpi.F90 # mpif90 -r8 -o diffuse_mpi diffuse_mpi.f90
You may consider that it is possible to combine the communications for the periodic boundary (lines 113-120 and 126-133) with the shift communications (lines 139-145 and 150-157) by defining p_send/p_recv properly. This can be done by setting p_recv=nps-1/p_send=0 in line 141-142, p_recv=0/p_send=nps-1 in line 152-153 to absorb lines 108-133 into lines 134-157. This program, however, may not work in some systems. Mpi_send is the blocking communication subroutine which blocks the following operations until the subroutine successfully copies the data to be sent into the system buffer and returns. Top panel of figure 5 shows data flow of send/recv communications on the system which has enough buffer. The send/recv communications succeed as shown in the figure. If the system which does not have enough system buffer (see bottom panel of figure 5), mpi_send called by some processors do not return and other processors should wait for those processes to return. The program does not proceed any further. This situation in which all processors are waiting is called "deadlock."

If we change the order of calling mpi_send and mpi_recv, the situation becomes worse. The program obviously deadlocks in all systems (not depend on size of the system buffer) after all processors call mpi_recv because mpi_recv is also blocking, and returns after receiving the corresponding data.
There are several ways to avoid deadlock. If one processor call mpi_send and the corresponding processor call mpi_recv, this communication succeeds even if the system buffer is not large enough to store all the data to be sent (Figure 6).

Thus, the source code should be modified for even/odd rank processors to call mpi_send/mpi_recv first. There is a subroutine mpi_sendrecv which automatically do this.
Another way to avoid deadlock is to use non-blocking communication subroutines. Mpi_isend and mpi_irecv are the non-blocking subroutines which immediately return. The reason for using blocking communication subroutines is to assure correctness of the data to be communicated. This means, in turn, non-blocking subroutines do not avoid illegal access to the data. This can be easily seen by the behavior of mpi_irecv. If we call mpi_irecv(recvbuf,...) and access recvbuf right after the subroutine call,
call mpi_irecv(recvbuf,....) tmp=recvbuf*recvbuf
recvbuf may not have a correct value since it is not guaranteed that mpi_irecv has already received recvbuf. A similar thing can be occurred to mpi_isend. You may overwrite the data to be sent unexpectedly if you access that data immediately after calling mpi_isend. To wait until mpi_irecv/mpi_isend receives/sends data, we can use subroutine mpi_wait. Alteration to use the non-blocking communication subroutines can be as follows,
call mpi_isend(sendbuf,...,ireq1,...) call mpi_irecv(recvbuf,...,ireq2,...) call mpi_wait(ireq1,...) call mpi_wait(ireq2,...)
Calculation of the total mass (summing up all ) is performed by the rootproc processor in the source code because distributed s are already gathered to rootproc for an output purpose. If we don't need data output, do we still need to gather all the data for the calculation of the total mass? No! We just sum up local sums of in each processor. A global operation (taking a summation of the local sums) just acts on the results of local operations. This type of operations is called reduction. The MPI library provides a subroutine mpi_reduce for reduction operations. Using this subroutine may significantly reduce an amount of data communication in comparison with gathering all if is huge. The summation of in line 185 can be written as follows,
total0=sum(f(npmin(pid):npmax(pid)))*dx call mpi_reduce(total0,total,1,mpirealtype,mpi_sum,rootproc,mpi_comm_world,ierr)
The most important issue for MPI programming is to reduce time for data communications. Since calling an MPI communication subroutine takes time, it can be efficient to decrease the number of MPI subroutine calls. In the example source code, there are three mpi_bcast calls in lines 41-43. If we can combine them into one mpi_bcast call, it may reduce communication time. (In reality, this modification is not efficient at all. This is just an example to show what a derivative type is.)
The MPI library provides a framework to construct a group of data to be communicated. Users can create a data type of a group of data called a derivative type, and use it as a general MPI data type. To construct a derivative type, we have to define elements of the group. Data types, positions, and sizes of the data are necessary informations.

In figure 7, we show an example of a group of data. The group includes two reals (a and b) and one character (c) of length 2. Positions of data are given by displacements relative to the address of the first data (a). Then, the positions of a, b and c are 0, 16, 24, respectively. The following subroutine defines a derivative type (newtype) can be used in the example source code,
1 subroutine inputtype(int1,int2,char1,newtype)
2 use mpi
3 implicit none
4 integer, intent(in) :: int1,int2
5 character (len=100), intent(in) :: char1
6 integer, intent(out) :: newtype
7 integer, parameter :: count=3
8 integer :: i
9 integer :: addr_start
10 integer :: blen(count)
11 integer :: disp(count)
12 integer :: tlst(count)
13 integer :: ierr
14 blen=(/ 1,1,100 /)
15
16 tlst=(/ mpi_integer,mpi_integer,mpi_character /)
17
18 call mpi_address(int1, disp(1),ierr)
19 call mpi_address(int2, disp(2),ierr)
20 call mpi_address(char1,disp(3),ierr)
21 addr_start=disp(1)
22 do i=1,count
23 disp(i)=disp(i)-addr_start
24 end do
25 call mpi_type_struct(count,blen,disp,tlst,newtype,ierr)
26 call mpi_type_commit(newtype,ierr)
27
28 return
29 end subroutine inputtype
The group of data has three elements (int1=NX0, int2=NT, char1=DATAFILE). A size, a data type, and a position of each element are stored in "blen", "tlst", and "disp". Mpi_address is used to determine a byte address of each data. The defined group of data is constructed by mpi_type_struct. A new data type (newtype) becomes available after committed by mpi_type_commit. Finally, we can broadcast the bunch of data (NX0, NT, DATAFILE) by calling only one mpi_bcast using the new data type like,
call mpi_bcast(NX0,1,newtype,rootproc,mpi_comm_world,ierr)
This operation broadcasts (NX0,NT,DATAFILE), not NX0 only.
Download archive (diffuse.tgz)
Just type "make",
# make
then you will get diffuse.ps, and will see what's happening. If you want to convert diffuse.ps to an animation, type "make anim." To change the number of processors,
# make MPI=16
There are some macros to control the behavior of make. Please consult README for more detail.