Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
51c8ece
removing old files and setting todos
weinbe58 Dec 20, 2022
931e670
override SparseArrays Mul!
weinbe58 Dec 28, 2022
e8753b0
updating TODOs
weinbe58 Dec 28, 2022
cd96e34
adding tests
weinbe58 Dec 28, 2022
8e12ae2
adding += operator
weinbe58 Dec 28, 2022
ededb98
Fixed broken .toml
johnzl-777 Dec 28, 2022
4e979f5
Added testing ability to .toml, merge_csr_mv! works with CSC format a…
johnzl-777 Dec 29, 2022
eb4604f
Added @inbounds, use nnz to denote number of nonzeros vs length(nz)
johnzl-777 Dec 29, 2022
b62adb7
Attempted fix for atomic ops, renamed args for merge_csr_mv, add tests
johnzl-777 Dec 29, 2022
f7f46b2
Fixed broken default matrix-matrix, matrix-vector mul, added simple t…
johnzl-777 Dec 29, 2022
45478d1
Added test for matrix x matrix operation using merge_csr_mv!
johnzl-777 Dec 30, 2022
381437f
Fixed issue with zeros, partial fix for AB\alpha operation inside mul…
johnzl-777 Dec 30, 2022
84393bc
fixing issue with mul!
weinbe58 Jan 3, 2023
1a89711
Try to use @view to fix issue with matrix not being mutated
johnzl-777 Jan 3, 2023
8e9a71d
Merge remote-tracking branch 'refs/remotes/origin/csrmv_merge' into c…
johnzl-777 Jan 3, 2023
b6c26cc
Add tests for transpose on square and rectangular matrices
johnzl-777 Jan 3, 2023
182aaa5
Changed unit tests to match new argument ordering
johnzl-777 Jan 3, 2023
35af5fe
adding StridedVector type
weinbe58 Jan 3, 2023
8cd636b
fixed segfault, revised some tests to adjoint/transpose matrix v. pla…
johnzl-777 Jan 3, 2023
b911851
Merge remote-tracking branch 'refs/remotes/origin/csrmv_merge' into c…
johnzl-777 Jan 3, 2023
34eda6b
fixing imports
weinbe58 Jan 3, 2023
366d2e8
rm inbounds no longer overloading SparseArray.mul!
weinbe58 Jan 3, 2023
778d524
Fixed issue with incorrect output and updated tests to always use tra…
johnzl-777 Jan 3, 2023
938ecd7
Add @inbounds
johnzl-777 Jan 3, 2023
c9b8ccc
Removed unnecessary comments, added SparseArrays definition override
johnzl-777 Jan 4, 2023
5217a3c
Reordered imports to alphabetical order
johnzl-777 Jan 4, 2023
b033e3d
added complex-valued unit tests for merge_csr_mv, removed incompatibl…
johnzl-777 Jan 4, 2023
87852a6
Added test for complex and real-valued matrices/vectors/coefficients …
johnzl-777 Jan 4, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions Project.toml
Original file line number Diff line number Diff line change
@@ -1,2 +1,14 @@
name = "ParallelMergeCSR"
uuid = "c4b929cc-6141-4ba4-8849-280a863346e1"
version = "0.1.0"

[deps]
Atomix = "a9b6321e-bd34-4604-b9c9-b65b8de01458"
LinearAlgebra = "37e2e46d-f89d-539d-b4ee-838fcccc9c8e"
SparseArrays = "2f01184e-e22b-5df5-ae63-d93ebab69eaf"

[extras]
Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40"

[targets]
test = ["Test"]
184 changes: 179 additions & 5 deletions src/ParallelMergeCSR.jl
Original file line number Diff line number Diff line change
@@ -1,13 +1,187 @@
module ParallelMergeCSR


using Atomix
using Base.Threads
using SparseArrays
using LinearAlgebra: Adjoint,adjoint,Transpose,transpose
using SparseArrays: AbstractSparseMatrixCSC,
DenseInputVecOrMat,
getcolptr

mutable struct Coordinate
x::Int
y::Int
end

# Want to give the illusion that we're still using 0-based coordinates when the actual
# values themselves are 1-based
#
# diagonal -> i + j = k
# a_len and b_len can stay the same from the paper
#
# a -> row-end offsets so really pass in a[2:end]
# b -> "natural" numbers
function merge_path_search(diagonal::Int, a_len::Int, b_len::Int, a, b)
# Diagonal search range (in x coordinate space)
x_min = max(diagonal - b_len, 0)
x_max = min(diagonal, a_len)
# 2D binary-search along diagonal search range
while (x_min < x_max)
pivot = (x_min + x_max) >> 1
if (a[pivot + 1] <= b[diagonal - pivot])
x_min = pivot + 1
else
x_max = pivot
end
end

return Coordinate(
min(x_min, a_len),
diagonal - x_min
)

end

# Internally treats the matrix as if its been transposed/is an adjoint
# e.g. if you pass in a 2x3, internally it's a 3x2 and can be an adjoint if you pass in `adjoint` as the op
function merge_csr_mv!(α::Number,A::AbstractSparseMatrixCSC, input::StridedVector, output::StridedVector, op)

nzv = nonzeros(A)
rv = rowvals(A)
cp = getcolptr(A)

nnz = length(nzv)

# nrows = length(cp) - 1 can give the wrong number of rows!
nrows = A.n

nz_indices = collect(1:nnz)
row_end_offsets = cp[2:end] # nzval ordering is diff for diff formats
num_merge_items = nnz + nrows # preserve the dimensions of the original matrix

num_threads = nthreads()
items_per_thread = (num_merge_items + num_threads - 1) ÷ num_threads

row_carry_out = zeros(eltype(cp), num_threads)
value_carry_out = zeros(eltype(output), num_threads) # value must match output

# Julia threads start id by 1, so make sure to offset!
@threads for tid in 1:num_threads
diagonal = min(items_per_thread * (tid - 1), num_merge_items)
diagonal_end = min(diagonal + items_per_thread, num_merge_items)

# Get starting and ending thread coordinates (row, nzv)
thread_coord = merge_path_search(diagonal, nrows, nnz, row_end_offsets, nz_indices)
thread_coord_end = merge_path_search(diagonal_end, nrows, nnz, row_end_offsets, nz_indices)

# Consume merge items, whole rows first
running_total = zero(eltype(output))
while thread_coord.x < thread_coord_end.x
@inbounds while thread_coord.y < row_end_offsets[thread_coord.x + 1] - 1
@inbounds running_total += op(nzv[thread_coord.y + 1]) * input[rv[thread_coord.y + 1]]
thread_coord.y += 1
end

@inbounds output[thread_coord.x + 1] += α * running_total
running_total = zero(eltype(output))
thread_coord.x += 1
end

# May have thread end up partially consuming a row.
# Save result form partial consumption and do one pass at the end to add it back to y
while thread_coord.y < thread_coord_end.y
@inbounds running_total += op(nzv[thread_coord.y + 1]) * input[rv[thread_coord.y + 1]]
thread_coord.y += 1
end

# Save carry-outs
@inbounds row_carry_out[tid] = thread_coord_end.x + 1
@inbounds value_carry_out[tid] = running_total

end

for tid in 1:num_threads
@inbounds if row_carry_out[tid] <= nrows
@inbounds output[row_carry_out[tid]] += α * value_carry_out[tid]
end
end

end


# C = adjoint(A)Bα + Cβ
# C = transpose(A)B + Cβ
# C = xABα + Cβ
for (T, t) in ((Adjoint, adjoint), (Transpose, transpose))
@eval function SparseArrays.mul!(C::StridedVecOrMat, xA::$T{<:Any,<:AbstractSparseMatrixCSC}, B::DenseInputVecOrMat, α::Number, β::Number)
# obtains the original matrix underneath the "lazy wrapper"
A = xA.parent
size(A, 2) == size(C, 1) || throw(DimensionMismatch())
size(A, 1) == size(B, 1) || throw(DimensionMismatch())
size(B, 2) == size(C, 2) || throw(DimensionMismatch())
if β != 1
# preemptively handle β so we just handle C = ABα + C
β != 0 ? SparseArrays.rmul!(C, β) : fill!(C, zero(eltype(C)))
end
for (col_idx, input) in enumerate(eachcol(B))
output = @view C[:, col_idx]
merge_csr_mv!(α, A, input, output, $t)
end
C
# end of @eval macro
end
# end of for loop
end


## function names should be suffixed w/ exclamation mark
function atomic_add!(a::AbstractVector{T},b::T) where T <: Real
# There's an atomic_add! in Threads but has signature (x::Atomic{T}, val::T) where T <: ArithmeticTypes
# you'd have to wrap each element of the array with Atomic
# Objective: for each element in AbstractVector, atomically add b to the element
# Can also use Atomix.jl
for idx in 1:length(a)
Atomix.@atomic a[idx] += b
end

end

include("types.jl")
include("threaded.jl")
function atomic_add!(a::AbstractVector{T},b::T) where T <: Complex
# return type representing real part
view_a = reinterpret(reshape, real(T),a)
real_b = real(b)
imag_b = imag(b)

# include("gpu.jl")
# include("distributed.jl")
# mutating the view also mutates the underlying array it came from
for idx in 1:size(view_a, 2)
Atomix.@atomic view_a[1, idx] += real_b
Atomix.@atomic view_a[2, idx] += imag_b
end
end

function SparseArrays.mul!(C::StridedVecOrMat, A::AbstractSparseMatrixCSC, B::DenseInputVecOrMat, α::Number, β::Number)
size(A, 2) == size(B, 1) || throw(DimensionMismatch())
size(A, 1) == size(C, 1) || throw(DimensionMismatch())
size(B, 2) == size(C, 2) || throw(DimensionMismatch())
nzv = nonzeros(A)
rv = rowvals(A)
if β != 1
# preemptively handle β so we just handle C = ABα + C
β != 0 ? SparseArrays.rmul!(C, β) : fill!(C, zero(eltype(C)))
end
for k in 1:size(C, 2)
@threads for col in 1:size(A, 2)
@inbounds αxj = B[col,k] * α
for j in nzrange(A, col)
@inbounds val = nzv[j]*αxj
@inbounds row = rv[j]
@inbounds out = @view C[row:row, k]
atomic_add!(out,val)
end
end
end
C
end

# end of the module
end
Empty file removed src/threaded.jl
Empty file.
Empty file removed src/types.jl
Empty file.
27 changes: 27 additions & 0 deletions test/atomic_add!.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
using Test
using ParallelMergeCSR
using SparseArrays

@testset "atomic_add!" begin
# atomic add with real
@testset "Real" begin
a = rand(Float64, 20)
a_copy = deepcopy(a)

b = 1.25

ParallelMergeCSR.atomic_add!(a, b)
@test a == (a_copy .+ b)
end

# atomic add with complex
@testset "Complex" begin
a = rand(Complex{Float64}, 20)
a_copy = deepcopy(a)

b = 2.5 + 0.8im

ParallelMergeCSR.atomic_add!(a, b)
@test a == (a_copy .+ b)
end
end
Loading