Start async task under our application Supervision tree
Inside your application.ex
file we will add our task supervisor with the name MyApp.TaskSupervisor
MyApp.Application
is the module where we essentially describe in what order BEAM should start our elixir application and it's the place where you describe your worker or supervisor process et.al.
defmodule MyApp.Application
use Application
def start(_type, _args) do
childern = [
...
{Task.Supervisor, name: MyApp.TaskSupervisor},
...
]
opts = [strategy: :one_for_one, name: MyApp.TaskSupervisor]
Supervisor.start_link(children, opts)
end
Now we have added our MyApp.TaskSupervisor to our supervision tree and let us spawn new task processes to process our async tasks of sending an email to a list of users concurrently.
In this post, we will be using the function Task.Supervisor.async_stream/4
to process a list of data that could be any enumerable in our case it would be a list of users.
users_list = [1..100]
To process our list of users the MyApp.Supervisor
will spawn the task processes by default to the number of schedular available on your VM that's equal to the number of cores you have on your machine in my case with M1 pro that's 10 cores so my VM will have created 10 schedular (you could check with System.schedulers_online/0
function to find the number scheduler you have online).
You can change the number of processes created by Task.Supervisor.async_stream/4
by passing the option of max_concurrency: n
where n could be an arbitrary number of processes to be created for processing the list.
Task.Supervisor.async_stream(MyApp.TaskSupervsior, data, work)
Now for each user in our users_list
our Task.Supervisor.async_stream/4
will invoke the work/1
anonymous function and start processing that could be sending an email or some other expensive task or batch inserting data into our database or making a network call.
work = fn user_id ->
IO.puts "sending email to #{user_id}"
Process.sleep(1000)
IO.puts "email sent to #{user_id}"
end
user_list = 1..100
Task.Supervisor.async_stream(MyApp.TaskSupervsior, data, work) |> Enum.count()
Since async_stream/4
produces a stream which are lazy so kick start processing our user_list I'm passing it to Enum.count/1
which returns the total number of users.
References:
https://hexdocs.pm/elixir/1.12/Task.Supervisor.html#async_stream/6
https://elixir-lang.org/getting-started/enumerables-and-streams.html