Scientific Computing, TU Berlin, WS 2019/2020, Lecture 26

Jürgen Fuhrmann, WIAS Berlin

Multithreading in Julia

  • Start a function as a Task on a available thread Threads.@spawn.
  • wait(task): wait for the completion of the task
  • fetch(task) wait for the completion of the taks and retrieve result

Packages, setup

In [1]:
using BenchmarkTools
using LinearAlgebra
using Printf
using PyPlot

injupyter()=isdefined(Main, :IJulia) && Main.IJulia.inited
Out[1]:
injupyter (generic function with 1 method)
  • In order to allow multithreading, one has to start Julia with the environment variable JULIA_NUM_THREADS set to the desired number
  • Figure out the number of threads
In [2]:
Threads.nthreads()
Out[2]:
4

Starting threads using spawn()

A task which just returns the number of the thread executing it

In [3]:
function mytask()
    return Threads.threadid()
end
Out[3]:
mytask (generic function with 1 method)
In [4]:
function threads_hello(;ntasks=10)
    println("number of tasks: $(ntasks)")
    tasks=[Threads.@spawn mytask() for i=1:ntasks]
    for i=1:length(tasks)
        ithd=fetch(tasks[i])
        println("task #$(i) was executed thread #$(ithd)")
    end
end
Out[4]:
threads_hello (generic function with 1 method)
In [5]:
threads_hello(ntasks=10)
number of tasks: 10
task #1 was executed thread #2
task #2 was executed thread #2
task #3 was executed thread #1
task #4 was executed thread #3
task #5 was executed thread #4
task #6 was executed thread #1
task #7 was executed thread #3
task #8 was executed thread #4
task #9 was executed thread #3
task #10 was executed thread #2

Multithreaded dot product calculation

  • Interesting example as one has to collect the result into one variable
  • Start with creating a subdivision of the loop length into equal parts
In [6]:
function partition(N,ntasks)
    loop_begin=zeros(Int64,ntasks)
    loop_end=zeros(Int64,ntasks)
    for itask=1:ntasks
        ltask=Int(floor(N/ntasks))
        loop_begin[itask]=(itask-1)*ltask+1
        if itask==ntasks # adjust last task length
            ltask=N-(ltask*(ntasks-1))
        end
        loop_end[itask]=loop_begin[itask]+ltask-1
    end
    return (loop_begin,loop_end)
end
Out[6]:
partition (generic function with 1 method)

Check this

In [7]:
partition(100000,3)
Out[7]:
([1, 33334, 66667], [33333, 66666, 100000])

Calculate part of scalar product from n0 to n1

In [8]:
function mydot(a,b,n0,n1)
    result=0.0
    for i=n0:n1
        result+=a[i]*b[i]
    end
    result
end
Out[8]:
mydot (generic function with 1 method)

Calculate scalar product in parallell

In [9]:
function threaded_mydot(a,b,N,ntasks)
    loop_begin,loop_end=partition(N,ntasks)
    tasks=[Threads.@spawn mydot(a,b,loop_begin[i],loop_end[i]) for i=1:ntasks]
    return    mapreduce(task->fetch(task),+,tasks)
end
Out[9]:
threaded_mydot (generic function with 1 method)

Compare times, check accuracy

In [10]:
function test_threaded_mydot(;N=400000, ntasks=4)
    a=rand(N)
    b=rand(N)
    res_s=@btime mydot($a,$b,1,$N)
    res_p=@btime threaded_mydot($a,$b,$N,$ntasks)
    res_s  res_p
end
Out[10]:
test_threaded_mydot (generic function with 1 method)
In [11]:
test_threaded_mydot(N=400000, ntasks=4)
  353.982 μs (0 allocations: 0 bytes)
  97.785 μs (44 allocations: 3.64 KiB)
Out[11]:
true

Multithreaded dot product calculation based on fork-join model

The fork-join model promises that we can skip the scheduling "by hand"

In [12]:
function forkjoin_mydot_primitive(a,b)
    result=0.0
    N=length(a)
    Threads.@threads for i=1:N
        result+=a[i]*b[i]
    end
    result
end
Out[12]:
forkjoin_mydot_primitive (generic function with 1 method)
In [13]:
function test_forkjoin_mydot_primitive(;N=400000)
    a=rand(N)
    b=rand(N)
    res_s=@btime mydot($a,$b,1,$N)
    res_p=@btime forkjoin_mydot_primitive($a,$b)
    res_s  res_p
end
Out[13]:
test_forkjoin_mydot_primitive (generic function with 1 method)
In [14]:
test_forkjoin_mydot_primitive(N=400000)
  356.562 μs (0 allocations: 0 bytes)
  7.380 ms (800031 allocations: 12.21 MiB)
Out[14]:
false

What was wrong ?

  • In the parallel loop, there is no control over the access to result:
  • A possible scenario:
    • Thread 1 wants to perform result+=x
    • Thread 2 wants to perform result+=y
    1. Thread 1 reads result and gets its actual value r0
    2. Thread 1 reads result and gets its actual value r0
    3. Thread 1 performs its operation and writes back r0+x
    4. Thread 2 performs its operation and writes back r0+y
    • In the result, we have result=r0+y instead of the correct value r0+x+y

Atomic variables

  • Atomic variables are protected from unscheduled update:
  • Thread 2 would have to wait until Thread 1 is done with its operation
  • This is expensive due to the necessary communication infrastructure
In [15]:
function forkjoin_mydot_atomic(a,b)
    result = Threads.Atomic{Float64}(0.0)
    N=length(a)
    Threads.@threads for i=1:N
        Threads.atomic_add!(result,a[i]*b[i])
    end
    return result[]
end
Out[15]:
forkjoin_mydot_atomic (generic function with 1 method)
In [16]:
function test_forkjoin_mydot_atomic(;N=400000)
    a=rand(N)
    b=rand(N)
    res_s=@btime mydot($a,$b,1,$N)
    res_p=@btime forkjoin_mydot_atomic($a,$b)
    res_s  res_p
end
Out[16]:
test_forkjoin_mydot_atomic (generic function with 1 method)
In [17]:
test_forkjoin_mydot_atomic(N=400000)
  357.617 μs (0 allocations: 0 bytes)
  17.806 ms (31 allocations: 3.05 KiB)
Out[17]:
true

... at least it is correct

Reduction variables

  • Our threaded result was correct, because each thread had its own temporary variable
  • Transfer this concept to the fork-join model
  • Introduce a reduction variable
In [18]:
function forkjoin_mydot_reduction(a,b)
    N=length(a)
    result=zeros(Threads.nthreads())
    Threads.@threads for i=1:N
        ithd=Threads.threadid()
        result[ithd]+=a[i]*b[i]
    end
    return sum(result)
end

function test_forkjoin_mydot_reduction(;N=400000)
    a=rand(N)
    b=rand(N)
    res_s=@btime mydot($a,$b,1,$N)
    res_p=@btime forkjoin_mydot_reduction($a,$b)
    res_s  res_p
end
Out[18]:
test_forkjoin_mydot_reduction (generic function with 1 method)
In [19]:
test_forkjoin_mydot_reduction(N=400000)
  351.377 μs (0 allocations: 0 bytes)
  265.126 μs (31 allocations: 3.14 KiB)
Out[19]:
true
  • Results hint on the experimental character of the implementation
  • Handling of the implicit barriers can be a problem

Schönauer vector triad:

  • d[i]=a[i]+b[i]*c[i]
  • Vary length of the array
  • -> Memory performance issues
In [20]:
function vtriad(N,nrepeat)

    a = Array{Float64,1}(undef,N)
    b = Array{Float64,1}(undef,N)
    c = Array{Float64,1}(undef,N)
    d = Array{Float64,1}(undef,N)

    Threads.@threads for i=1:N
        a[i]=i
        b[i]=N-i
        c[i]=i
        d[i]=-i
    end

    t_scalar=@elapsed begin
        @inbounds @fastmath for j=1:nrepeat
            for i=1:N
                d[i]=a[i]+b[i]*c[i]
            end
        end
    end

    t_parallel=@elapsed begin
        for j=1:nrepeat
            Threads.@threads for i=1:N
                @inbounds @fastmath  d[i]=a[i]+b[i]*c[i]
            end
        end
    end
    GFlops=N*nrepeat*2.0/1.0e9
    @printf("% 10d % 10.3f % 10.3f % 10.3f\n", N, GFlops/t_scalar,GFlops/t_parallel,t_scalar/t_parallel)
    return [N, GFlops/t_scalar,GFlops/t_parallel,t_scalar/t_parallel]
    GC.gc()
end
Out[20]:
vtriad (generic function with 1 method)
In [21]:
function run_triad()
    # Approximate number of FLOPs per measurement
    flopcount=5.0e8
    # Smallest array size
    N0=1000
    # Data points per decade (of array size)
    ppdec=8
    # Number of array size increases
    nrun=40
    # File header
    @printf("# nthreads=%d\n",Threads.nthreads())
    @printf("#        N   S_GFlops/s  P_GFlops/s  speedup\n")
    # Loop over measurements
    N=N0
    result=zeros(4,nrun)
    for irun=0:nrun-1
        # Have exact powers of 10 in the measurement
        if (irun%ppdec==0)
            N=N0
            N0*=10
        end
        # Calculate number of repeats so that overall effort stays    constant
        nrepeat=flopcount/N
        result[:,irun+1].=vtriad(Int(ceil(N)),Int(ceil(nrepeat)))
        N=N*10^(1.0/ppdec)
    end
    PyPlot.figure(1)
    PyPlot.semilogx(result[1,:], result[2,:])
    PyPlot.grid()
    PyPlot.ylim(1,20)
    PyPlot.xlabel("Array Size")
    PyPlot.ylabel("GFlops/s")
end
Out[21]:
run_triad (generic function with 1 method)
In [22]:
run_triad()
# nthreads=4
#        N   S_GFlops/s  P_GFlops/s  speedup
      1000     17.132      0.230      0.013
      1334      8.025      0.309      0.038
      1779      7.926      0.406      0.051
      2372      8.473      0.538      0.063
      3163      8.258      0.722      0.087
      4217      8.292      0.944      0.114
      5624      7.830      1.229      0.157
      7499      7.379      1.586      0.215
     10000      5.122      2.053      0.401
     13336      4.525      2.601      0.575
     17783      4.264      3.321      0.779
     23714      4.225      4.021      0.952
     31623      4.121      4.867      1.181
     42170      4.170      5.648      1.354
     56235      4.235      6.237      1.473
     74990      4.033      7.526      1.866
    100000      4.311      8.722      2.023
    133353      3.989      9.148      2.293
    177828      3.989     10.149      2.545
    237138      3.838      9.862      2.570
    316228      3.549     10.265      2.892
    421697      2.801      4.665      1.665
    562342      2.442      3.492      1.430
    749895      2.242      2.433      1.085
   1000000      2.003      2.267      1.132
   1333522      1.895      2.040      1.076
   1778280      1.756      1.812      1.032
   2371374      1.695      1.833      1.082
   3162278      1.534      1.723      1.123
   4216966      1.565      1.718      1.098
   5623414      1.574      1.693      1.076
   7498943      1.560      1.684      1.079
  10000000      1.509      1.515      1.004
  13335215      1.536      1.660      1.081
  17782795      1.470      1.654      1.125
  23713738      1.513      1.610      1.064
  31622777      1.535      1.655      1.078
  42169651      1.494      1.623      1.086
  56234133      1.520      1.644      1.082
  74989421      1.490      1.620      1.087
Out[22]:
PyObject Text(24.000000000000007, 0.5, 'GFlops/s')

This notebook was generated using Literate.jl.