WEBVTT

00:00.000 --> 00:13.400
Excellent. Thank you very much. So now we can start with the talk. Rafael is going to talk

00:13.400 --> 00:21.000
about effortless distributed computing and Python. So take it away.

00:21.000 --> 00:29.000
Can you hear me well? Fine. Okay. So my name is Rafael. I'm a software engineer and

00:29.000 --> 00:37.000
for the past few months I contributed to a few new libraries for client. And I would like

00:37.000 --> 00:43.000
to showcase them. So all the libraries are related to distributed computing.

00:43.000 --> 00:49.000
So I would present the libraries the first one. It's a we named it scalar. So it's a

00:49.000 --> 00:55.000
kind of distributed system for Python. So you can send Python task and it will

00:55.000 --> 01:01.000
put them to some workers, different computers, do the computation and return to result.

01:01.000 --> 01:05.000
On top of that we made a profit. So it's a very small library that you can use to

01:05.000 --> 01:10.000
do map produce. So I will explain what map you use is. And the last one is

01:10.000 --> 01:17.000
paragraph. It's another library that we developed to do distributed graph

01:17.000 --> 01:24.000
computation. So graph is when you have dependencies between task. So we start

01:24.000 --> 01:29.000
with scalar. This is really the core of what we did. It looks a bit like task for

01:29.000 --> 01:38.000
those we know, you know. So since Python 3.2 you can use the process

01:38.000 --> 01:45.000
put executor to submit task to different processes, Python processes on your

01:45.000 --> 01:50.000
machine to do some kind of parallel computing. So this is an example. So you start a

01:51.000 --> 01:57.000
four workers in this case. I'm not going to use all of them. And you submit two

01:57.000 --> 02:03.000
functions to square root with different parameters. And the executor will not

02:03.000 --> 02:07.000
return the result immediately with a future. So a future is just an object

02:07.000 --> 02:13.000
that will add some point content the result. And then you can just print

02:13.000 --> 02:19.000
and use the result. But Python will block if the result is not ready yet. So

02:19.000 --> 02:25.000
if the computation that happened on another process didn't finish yet. So

02:25.000 --> 02:31.000
it's an easy way to do a parallelization in Python. And what we wanted to do is

02:31.000 --> 02:37.000
to not only be able to send processes to different cores, but also to different

02:37.000 --> 02:43.000
machines. So we created an infrastructure. So instead of using the process

02:43.000 --> 02:50.000
tool, you use our client that connects to a cluster. And you can submit task

02:50.000 --> 02:54.000
the same way. So it's a total transparent. It will serialize the function,

02:54.000 --> 02:59.000
the argument, and then fetch back the result. It's similar to task. I can

02:59.000 --> 03:04.000
talk about the difference after the talk if some are interested. So the

03:04.000 --> 03:08.000
architecture is like this. You've got the scheduler, which is the central point

03:08.000 --> 03:13.000
of our system. And then you can have as many workers as you want that are

03:13.000 --> 03:19.000
on different machines. And the scheduler is responsible for accepting functions

03:19.000 --> 03:23.000
and tasks from the client and redirecting them to the worker that had

03:23.000 --> 03:30.000
resources available. So we'd like to show a small example. So we are going

03:30.000 --> 03:36.000
to make a parallel implementation of the Monte Carlo approximation of Python.

03:36.000 --> 03:41.000
So the approximation of Python using Monte Carlo is basically you take a

03:41.000 --> 03:46.000
square and you put random points. You generate random points on that square.

03:46.000 --> 03:51.000
And you count the points that fill that four inside the circle that is in

03:51.000 --> 03:57.000
this square. And on the example on the left, if you count the points that

03:57.000 --> 04:01.000
they are in the green inside the circle, you divide that by the ratio

04:01.000 --> 04:05.000
of points. You multiply that by four. You will get an approximation of

04:05.000 --> 04:10.000
five. So three to two is pretty close. Points you had the closer to five

04:10.000 --> 04:15.000
you get. So this is the example I'm going to use in the next slide. So if you

04:15.000 --> 04:20.000
want to do that in Python, first you have to define what is when a point is

04:20.000 --> 04:24.000
inside the circle. So you take the trigger metric equation for the circle.

04:24.000 --> 04:28.000
You check the point is inside the circle. It doesn't matter much. It's just

04:28.000 --> 04:32.000
for example. And then you do the Monte Carlo estimation. So you generate

04:32.000 --> 04:39.000
random coordinates. So x is and y's. And you filter to the points that are

04:39.000 --> 04:44.000
inside the circle. And you count them. You multiply that by four and divide it

04:44.000 --> 04:49.000
by the number of points. And you get an estimate of five. So the more points you had.

04:49.000 --> 04:53.000
So this is the function run. The more points you had, the closer to five you

04:53.000 --> 05:00.000
get. And this is a sequential version. And if you run it with one hundred

05:00.000 --> 05:05.000
thousand points, it's really quite slow. Maybe a few seconds. But it gets closer to five.

05:05.000 --> 05:11.000
So what if we would like to scale that further. And first do it on multiple

05:11.000 --> 05:19.000
course and then on multiple machines. So we're going to make another function

05:19.000 --> 05:24.000
that we'll use the first one. And the function what it will do is that it will

05:24.000 --> 05:30.000
split the number of points that you want to generate. By it is case one hundred.

05:30.000 --> 05:36.000
And for each of this subset of points, it will start the task. So this is here,

05:36.000 --> 05:43.000
the submit. It will start 100 tasks that each have one hundred of the number of

05:43.000 --> 05:52.000
points. Then you average the result. So this sum is during the average. So all this is

05:52.000 --> 05:57.000
compared to the sequential one. So the sequential one, if you want to do it on one

05:57.000 --> 06:03.000
billion points, it will take about seven minutes. So it's quite slow. If you use the

06:03.000 --> 06:10.000
byte and built in executor with eight course. So this is this laptop. You will get it in

06:10.000 --> 06:17.000
five or five minutes. So it will take about fifty seven seconds. So it's already way faster.

06:17.000 --> 06:22.000
And if you use all scalar client. This is on a big cluster. We have one

06:22.000 --> 06:26.000
thirty five course on that one. You see that the computation is twelve

06:26.000 --> 06:31.000
seconds. So it's thirty five times faster than the other one. So this is

06:31.000 --> 06:37.000
basically all you can do prior computation with scalar. We have additional

06:37.000 --> 06:43.000
one. This one is equivalent to edge top. So you can see those status of the server.

06:43.000 --> 06:48.000
All the course, you know, how much CPU is it to have. You know, how many

06:48.000 --> 06:53.000
clients are connected to the cluster. And you also have the status of the

06:53.000 --> 06:59.000
skidder, which is read the sunflower point of or infer. We have

06:59.000 --> 07:05.000
feeder recovery, which means that if a work dies, why it is computing a task,

07:05.000 --> 07:09.000
the skidder will immediately reboot all the tasks that the worker was

07:09.000 --> 07:15.000
responsible for to available workers. We have also dynamic load

07:15.000 --> 07:20.000
balancing. Like if one worker has too many tasks and just cannot

07:20.000 --> 07:24.000
make progress that might happen if some of your tasks are way longer to compute

07:24.000 --> 07:28.000
than others. It will reboot all the tasks that the worker

07:28.000 --> 07:35.000
accumulated on available workers. So that's for skidder. And on top of that

07:35.000 --> 07:40.000
we made a library that you can use to do map reduce. So I'm going to

07:40.000 --> 07:48.000
explain what sorry map reduce is. So another example, you want to count

07:48.000 --> 07:53.000
the words that you have inside the text. So you get this count words

07:53.000 --> 07:58.000
function. So it accepts a list of lines. So it's every sentence is

07:58.000 --> 08:04.000
an item in the list. And you just go through all the lines and all the

08:04.000 --> 08:08.000
words and you just accumulate and count the words. And then when you

08:08.000 --> 08:13.000
run it on a small file, you will get obviously that the most common words

08:13.000 --> 08:21.000
are d and then off. But what if we want to do this on a very large

08:21.000 --> 08:26.000
data set? Like something that is gigabytes. And what if we want the result

08:26.000 --> 08:31.000
to be fast? You can use the map reduce pattern. So map reduce is

08:31.000 --> 08:36.000
basically you take the inputs. You split the inputs in different batches

08:36.000 --> 08:41.000
or partitions. You process all these partitions and batches

08:41.000 --> 08:46.000
separately and then you have a function that combines the result. So

08:46.000 --> 08:51.000
paraphernal just helps you doing that. So it's a decorator. So you

08:51.000 --> 08:54.000
take the original function. You don't change anything to the function that

08:54.000 --> 08:58.000
you have. You just say that okay, I've got this function. I want to

08:58.000 --> 09:05.000
paralyze it. I want to check the data by batches. So in this case,

09:05.000 --> 09:09.000
we have predefined function for list. So the first argument of the

09:09.000 --> 09:15.000
decorator says I want the input data to be split it in whatever

09:15.000 --> 09:21.000
the size is, but in batches of lines. And then I want the

09:21.000 --> 09:24.000
executive function and finally I will combine the result with the

09:24.000 --> 09:28.000
sum. So you can sum contours and you will get the little

09:28.000 --> 09:34.000
option. So by doing this, you can call the function to take

09:34.000 --> 09:40.000
transparently like this one with a way larger file. And actually

09:40.000 --> 09:44.000
it will run on multiple computers if you're using scalar

09:44.000 --> 09:47.000
file. And something that is very important with

09:47.000 --> 09:52.000
Parthen is that you don't specify the size of the batches.

09:52.000 --> 09:57.000
You don't specify all you're going to split the, I mean,

09:57.000 --> 10:01.000
you're not going to specify the size of the small task. So you

10:01.000 --> 10:05.000
don't know how many tasks will be created. Because we,

10:05.000 --> 10:10.000
finding the optimal batch size for computation is quite hard.

10:10.000 --> 10:14.000
Like, if it is too small, so if you create too many tasks,

10:14.000 --> 10:19.000
you will spend a lot of time just trying to transfer

10:19.000 --> 10:23.000
data communicating and doing some synchronization stuff,

10:23.000 --> 10:27.000
maybe IPC, you're running on single machine. And if you

10:27.000 --> 10:31.000
sub-task or too large, like if you split the data in two

10:31.000 --> 10:34.000
large chunks, you will not get that much

10:34.000 --> 10:38.000
Parization. So you want something that is smart about that.

10:38.000 --> 10:41.000
So we use machine learning. So the decorator won't use it.

10:41.000 --> 10:45.000
This is the debugging information. It will automatically

10:45.000 --> 10:50.000
automatically deduce what is the optimal size of the

10:50.000 --> 10:55.000
partitions of the batch. So in this case, it deduced

10:55.000 --> 11:01.000
some tries that computing the function in batches of

11:01.000 --> 11:08.000
1,660,600 lines. It's actually quite efficient.

11:08.000 --> 11:11.000
By doing that, you will only spend 5% of the time doing

11:11.000 --> 11:16.000
communication and IPCs. And you get a nice speed up.

11:16.000 --> 11:20.000
So this is the third transparent. And that's probably the main feature of

11:20.000 --> 11:24.000
our map with use thing.

11:26.000 --> 11:30.000
Lastly, we have paragraph. So it's, again, another library

11:30.000 --> 11:34.000
that you can use on top of scatter. And this one is really

11:34.000 --> 11:40.000
designed to do graph computation. I've got a third example.

11:40.000 --> 11:47.000
So you want to generate a report. And your report, it opens

11:47.000 --> 11:51.000
a file. It reads the file. It pre-process the files.

11:51.000 --> 11:56.000
But it also goes to a database. And it will extract

11:56.000 --> 12:01.000
the email, the email addresses. And then finally, it will send

12:01.000 --> 12:06.000
the report to all these email addresses. So if you are into

12:06.000 --> 12:10.000
distributed computing, you will notice that actually you can

12:10.000 --> 12:15.000
run this to a set of code, a book of code,

12:15.000 --> 12:19.000
super key, and in a personalized way. Because they're not

12:19.000 --> 12:25.000
dependent. They're only dependent here. So what we have is

12:25.000 --> 12:33.000
two additional decorators that allows you to define graphs in a

12:33.000 --> 12:40.000
declarative way in Python. So first we have the

12:40.000 --> 12:45.000
delayed one, the delay one, you will define what is your

12:45.000 --> 12:50.000
graph leaves. So what the smallest function that you don't

12:50.000 --> 12:54.000
want to utilize. And we just had the delay to all the

12:54.000 --> 12:59.000
sub-task. So remember, we had this read data file, read

12:59.000 --> 13:03.000
progress table, extract emails, all these functions, all

13:03.000 --> 13:09.000
the ones that are in green and orange. You just said that

13:09.000 --> 13:13.000
this is a release of your graph. And then the parent

13:13.000 --> 13:15.000
function, the one that is coding the other function, you

13:15.000 --> 13:21.000
said that it is a graph. And by doing that, when you

13:21.000 --> 13:24.000
request the graph representation of this

13:24.000 --> 13:28.000
computation, it will be actually not really run the

13:28.000 --> 13:31.000
function, so it will inspect the function. And it will be

13:31.000 --> 13:34.000
able to generate a graph that will present the

13:34.000 --> 13:38.000
computation. And here we see that we have two paths that we can

13:38.000 --> 13:42.000
take to do the computation in a distributed way. And once

13:42.000 --> 13:47.000
you have this graph, you can export it and run it on

13:47.000 --> 13:51.000
scale. So we have a special function. You can also

13:51.000 --> 13:55.000
run it on that. It is the same format. And that is

13:55.000 --> 13:59.000
basically the ID. I am a little bit

13:59.000 --> 14:04.000
early. So I have a lot of time for questions.

14:04.000 --> 14:06.000
Thank you.

14:06.000 --> 14:10.000
Thank you so much.

14:10.000 --> 14:15.000
Just before the question, we released that a few

14:15.000 --> 14:22.000
months ago on the Apache 2. And we welcome

14:22.000 --> 14:26.000
contributors and issues and if you have just

14:26.000 --> 14:30.000
it is on GitHub.

14:30.000 --> 14:33.000
Thanks for the presentation. For the

14:33.000 --> 14:36.000
graph representation, can you represent more

14:36.000 --> 14:39.000
advanced graph structures including what? Can you

14:39.000 --> 14:42.000
represent more complicated graph structures?

14:43.000 --> 14:46.000
You are outta

14:46.000 --> 14:49.000
fail Ju.

14:49.000 --> 15:01.000
We had a couple of questions and questions.

15:01.000 --> 15:08.000
In comparison to the concepts, in fact, various simple

15:08.000 --> 15:11.000
structures and equations. So this one is a very simple one.

15:11.000 --> 15:17.200
what how many levels he wants, what the graph library is doing is that it's

15:17.200 --> 15:22.520
it's overriding all the Python operators. So when you run the function it actually

15:22.520 --> 15:27.400
doesn't run the function, it's just generate a kind of syntax tree and

15:27.400 --> 15:34.080
we extract the graph from that. So unless the operator is not correctly

15:34.080 --> 15:39.200
designed it should work and the format for which we generate the graph is the same

15:39.200 --> 15:44.440
as desk which is part of the library so you can run it on desk and on

15:44.440 --> 15:51.240
skater, what was it the second one, what is it the second one,

15:51.240 --> 16:02.960
the format it's pretty standard, it's a Python dictionary, I'm just going to read

16:02.960 --> 16:08.800
one question that was written in the chat so it says with full scalar, how does

16:08.800 --> 16:12.520
it include, how does it handle Python dependencies say if you need a

16:12.520 --> 16:16.040
pipeline package installed on your for your task do you need to make sure

16:16.040 --> 16:22.040
that it's on your notes in advance? Yeah so we are using CloudPycle so it's

16:22.040 --> 16:26.400
basically the same constraints as CloudPycles so you you might want to have the

16:26.400 --> 16:32.680
libraries installed, however you can override the serializer so if you want to

16:32.680 --> 16:36.760
do something more complex you could technically do it but yeah that's one of the

16:36.760 --> 16:40.720
complexities you have to to install the libraries on all the machines

16:40.720 --> 16:46.640
daily, hello thank you for your for the talk, what is the relation between

16:46.640 --> 16:56.720
scalar and something like salary? I never used it but it's mostly a queue, I don't

16:56.720 --> 17:01.720
think it it uses the executor interface but there might be a wrong I never used it

17:01.720 --> 17:09.760
yeah sorry I don't know much about sorry so I can I can't read

17:09.760 --> 17:16.840
answer hi yeah thanks for the interesting quotation I was wondering about

17:16.840 --> 17:21.120
testing do you have a support like like if I had these decorators can I

17:21.120 --> 17:28.720
disable them you're in testing or how do I yeah yeah for the graph actually so if

17:28.720 --> 17:34.240
you don't call the two graph it just looks like a Python function so you can

17:34.240 --> 17:42.360
just run it sequentially and for powerful we actually renamed the original

17:42.360 --> 17:47.080
function so you just had two underscore sequential and you get access to the original

17:47.080 --> 17:52.200
function you can also disable it on the system wide system by coding some

17:52.200 --> 17:58.520
function and the second question if I may with par from there's no explicit

17:58.520 --> 18:04.200
connection to to scalar is it like something under it? Yeah it's okay you have to set

18:04.200 --> 18:10.280
up this in the you can either create a context you know the width something

18:10.280 --> 18:17.840
in Python or you set up the backends on the on the world process that's the

18:17.840 --> 18:21.720
two way we we under that so you can define the function and use different

18:21.720 --> 18:24.840
backends and call the function using different backends at the same time

18:24.840 --> 18:35.840
yeah thanks for the presentation I have two questions one I'll do you

18:35.840 --> 18:41.880
set up the cluster I think you mentioned cloud python is that now it's just

18:41.880 --> 18:54.600
we give our processes so we just have we get back to do so there's two things

18:54.600 --> 18:58.840
are just Python programs so you just start the programs and they all connect to

18:58.840 --> 19:06.120
some IP address or whatever you want your configuration it's just Python

19:06.120 --> 19:12.120
commands so all the configuration is in the parameters so in the arguments

19:12.120 --> 19:17.280
okay thanks and on the fillers scenario you you saw out to recover from a

19:17.280 --> 19:24.600
worker failing what if this scheduler is that is yeah you don't yeah we

19:24.600 --> 19:34.080
we we I have a question in terms of the worst case scenario do you mind going

19:34.080 --> 19:41.800
to a slide where like one of the cluster nodes dies yeah so I'm wondering

19:41.800 --> 19:45.920
just because of the syntax I don't this it takes into the pi example like you

19:45.920 --> 19:49.720
run the scheduler runs all the jobs and then at the end it's some tip together

19:49.720 --> 19:54.320
so you can get like the result of pi and a worst case scenario let's say the

19:54.320 --> 19:59.280
cluster dies or all the nodes die like one of the jobs never completes like

19:59.280 --> 20:02.240
what happens to that computation what happens to that syntax where you're

20:02.240 --> 20:06.920
adding stuff and you end up adding like a none let's say or you mean if you're

20:06.920 --> 20:10.960
process so you're running a process they're so long that let's say like

20:10.960 --> 20:15.080
one of the nodes just dies and you never get a response and you're adding

20:15.080 --> 20:18.880
over multiple processes like does it return a none and then it tries to add a

20:18.880 --> 20:23.080
none and then you get like an air Python I'm not sure I understand but the

20:23.080 --> 20:26.440
workers are constantly connecting to the scheduler so if she lose the

20:26.440 --> 20:31.520
connection it's a kind of thing so if the latencies became too big like one

20:31.520 --> 20:36.960
minute the worker is we'll be considered as dead all the task will be

20:36.960 --> 20:41.440
redirected yeah but if all the if the workers never able to answer what does

20:41.440 --> 20:47.080
the scheduler return to that like if it never gets an answer for a job oh but if

20:47.080 --> 20:52.240
this one dies the scheduler keeps the task on it yeah it's

20:52.240 --> 20:57.160
Department we have a parameter for that so if you don't store the task on the

20:57.160 --> 21:05.000
scheduler obviously it will answer an exception to the client but by default we

21:05.000 --> 21:09.440
store the task object so the arguments and the function code

21:09.440 --> 21:19.320
basically and we can reboot the task thank you I had another question for the

21:19.320 --> 21:24.400
graph execution one can the graph be modified why it's running or you have to

21:24.400 --> 21:30.760
compile it beforehand and then it could technically modify it yourself like

21:30.760 --> 21:40.600
as I said it built it built a kind of abstract tree of the computation as a

21:40.600 --> 21:44.480
Python dictionary so you could technically access it and modify it but the library

21:44.480 --> 22:04.080
doesn't do it how do the how does the system handle data exchange between

22:04.080 --> 22:09.480
workers like a result of a function is being in the graph is returned and is

22:09.480 --> 22:17.160
needed by another function I mean for the graph computation yeah yeah so it will not

22:17.160 --> 22:23.360
send the task unless these are needed so we have it's ended on the scheduler

22:23.360 --> 22:29.280
side on on the client side so we're not queueing the task to the workers until we

22:29.280 --> 22:33.640
have the argument for the client weights for the data to come back and it's

22:33.640 --> 22:39.600
like okay now next task that schedules will get that input I'm not sure

22:39.600 --> 22:45.360
I understand well if you have like a function and it there are dependencies

22:45.360 --> 22:50.080
between them right data dependencies so one function as an input take the output of

22:50.080 --> 22:54.640
another function yeah like how does that data flow from one oh and it might be

22:54.640 --> 22:59.080
scheduled and it doesn't get back to the client unless you request it okay so

22:59.080 --> 23:04.920
it's the scheduler it will so let's say that the work on top wants to execute

23:04.920 --> 23:09.160
a function that executed there so we get the data back to the scheduler and then

23:09.160 --> 23:19.160
it gets back to the the other worker that needs to input just a quick one

23:19.160 --> 23:23.160
maybe I got distracted and haven't got that part the batch size that you

23:23.160 --> 23:26.760
mentioned is it's something that you need to make the book keeping off the size

23:26.760 --> 23:30.600
of it or is it somewhere automatically in the background calculated it's calculated

23:30.600 --> 23:35.400
so it's calculated yeah it'll give any eye no it's basically machine running

23:35.400 --> 23:40.920
it's a regression so it looks at the function when you execute it and it tries

23:40.920 --> 23:45.800
different batch size and after a few tries it will deduce that the optimal

23:45.800 --> 23:51.840
one is whatever it is and it will continue to learn so if you change the parameters

23:51.840 --> 23:56.240
for example and suddenly the function doesn't behave the same it

23:56.240 --> 24:06.320
starts to be learned okay thank you so I think that's it in terms of

24:06.320 --> 24:12.000
questions thank you very much ah that's one more sorry I've got 58 seconds this is

24:12.000 --> 24:18.440
six one seven so coming coming from communities it looks similar design the big

24:18.440 --> 24:24.680
problem communities are CPU limit memory limit even I owe limit so how

24:24.680 --> 24:30.760
do you deal with that so we've got a features I'm working on it it will be

24:30.760 --> 24:36.200
released next week but we have what we call tags so if we work on hand can have

24:36.200 --> 24:45.160
specific tags so it's just symbols strings and you can say OT1 as a lot of memory

24:45.160 --> 24:50.280
and and then when you submit the task it will only be redirected to the

24:50.280 --> 24:58.440
work your that F this specific tag so yeah but my kind of handle that and if you

24:58.440 --> 25:03.560
want to limit the CPU usage inside the process you have to run in C group so

25:03.560 --> 25:10.920
things like that you have to deal with that on the operating system level okay so thank you very

25:10.920 --> 25:15.000
thank you very much

