Ir al contenido principal

Ralsina.Me — El sitio web de Roberto Alsina

Airflow By Example

Apache Air­flow is a very in­ter­est­ing, pop­u­lar and free tool to cre­ate, man­age and mon­i­tor work­flows, for ex­am­ple if you want to do ETL (Ex­tract / Trans­form / Load) on da­ta.

This sort of en­ter­prise soft­ware of­ten may seem com­pli­cat­ed or over­ly un­re­lat­ed to our ev­ery­day ex­pe­ri­ence as de­vel­op­ers but ... is it, re­al­ly? How about if I just want to watch some TV shows? And ex­per­i­ment with some en­ter­prise-lev­el soft­ware at the same time?

Let's do that by learn­ing how to use Air­flow to watch TV.


Caveat: This post was orig­i­nal­ly a twit­ter thread, that's why all the ex­am­ples are im­ages and you can't copy­/­paste them. But hey, at least they are short. Al­so, ty­pos, be­cause I re­al­ly just did this while tweet­ing it, no prepa­ra­tion be­fore­hand.

Just in case: I did not down­load any "S­tar Trek: Pi­card" es­pisodes, and I have a Prime video sub­scrip­tion, so I don't need to down­load them via tor­ren­t. OTO­H, if Sir Patrick ev­er reads this (which he won't): good job, sir!


A thread by Roberto Alsina

This is a script that gives you the in­for­ma­tion about the lat­est al­ready aired episode of a TV se­ries.

And this is a script that gives you the link to a tor­rent to down­load that episode.

This, on the oth­er hand, is a script to down­load that tor­ren­t.

Of course, one thing about this script is not like the oth­er script­s.

While the oth­ers take a cou­ple of sec­onds to run, this one may take hours or days. But let's ig­nore that for now. This is a script that moves your down­load­ed file in­to a nice di­rec­to­ry hi­er­ar­chy.

How­ev­er, I think this is nicer be­cause it does the same thing but with names "guar­an­teed" to be right, more uni­for­m, and transcodes it to, say, some­thing a chrome­cast would like.

I could add ex­tra tiny scripts that get sub­ti­tles for the lan­guages you like and put them in the right lo­ca­tion, and so on, but you get the idea.

Ba­si­cal­ly: it's easy to au­to­mate "I want to watch the lat­est episode of Pi­card"

How­ev­er, it's to­tal­ly im­prac­ti­cal, be­cause:

1) You have to go and tell it to get it
2) It takes hours
3) It may fail, and then you have to do it again
4) It will take hours again

But what if there was a way to de­fine this set of tasks so that they run au­to­mat­i­cal­ly, you know when they are work­ing, when they start and when they fin­ish, and you have to do noth­ing ex­cept wait­ing for a mes­sage in tele­gram that tells you "go watch Pi­card"? And that's where Apache Air­flow en­ters the pic­ture.

If you have a script that is a num­ber of steps which de­pend on one an­oth­er and are ex­e­cut­ed in or­der then it's pos­si­ble to con­vert them in­to very sim­ple air­flow DAGs. Now I will take a lit­tle while to learn ex­act­ly HOW to do that and will con­tin­ue this thread in a bit. Be­cause, re­al­ly, ETL (Ex­tract / Trans­form / Load) is not as com­pli­cat­ed as it may ap­pear to be in most cas­es. BTW, if you want to have air­flow with Python 3.8 (be­cause it's nicer)

Now, this may look com­pli­cat­ed, but re­al­ly, I am defin­ing a DAG (Di­rect­ed Acyclic Graph) or "thin­gies con­nect­ed with ar­rows that has no loops in it"

What are the "thingies" in­side an air­flow DAG? Op­er­a­tors. They come in many fla­vors, but I am us­ing this one now, which sets up a ven­v, runs a func­tion, then re­moves ev­ery­thing. Nice and clean. (air­flow.a­pache.org/­doc­s/stable/_a…) So, let's con­vert this in­to an Air­flow op­er­a­tor.

It's not hard! operators are (in the case of python operators) simply functions.

Details: do all the imports inside the function.

Have a list of requirements ready if you require things.

Now I need to put this op­er­a­tor in­side the DAG I cre­at­ed ear­li­er. Again, it's a mat­ter of declar­ing things.

So, now that we have a (very stupid, one node, no ar­rows) DAG ... what can we do with it?

Well, we can make sure air­flow sees it (y­ou need to tell air­flow where your dags live)

We can check that our DAG has some task in it!

A task is just an in­stance of an Op­er­a­tor, we added one, so there should be one.

And of course, we can TEST the thing, and have our task do its trick. Note that I have to pass a date. We could even use that in a 0.0.2 ver­sion to check episodes we missed!

Hey, that worked! (BTW, of course it did not work THE FIRST TIME, come on).

Back­fill means "s­tart at this date and run what­ev­er would have run if we had ac­tu­al­ly start­ed at that date"

Now, if you run "air­flow sched­uler" and "air­flow web­server" you can see things like this

And yes, that means this task will run dai­ly and re­port ev­ery­thing in the nice web UI and all that.

But of course a sin­gle task is a lame DAG, so let's make it a bit more in­ter­est­ing. Now, let's cre­ate a sec­ond op­er­a­tor, which will run AF­TER the one we had done is fin­ished, and use its out­put.

It's based on this:

Fol­low­ing the same me­chan­i­cal changes as be­fore (im­ports in the func­tion, etc) it will look like this:

This us­es two pieces of da­ta from the pre­vi­ous task.
So, we need to do 2 things:

1) Con­nect the two op­er­a­tors
2) Pass da­ta from one to the oth­er

Con­nect­ing two tasks in a DAG is sim­ple. De­clare them both and tell air­flow they are con­nect­ed and in what di­rec­tion.

And of course that's now re­flect­ed in the air­flow UI. Here you can see that Check­_Pi­card has been suc­cess­ful (dark green bor­der) and Search_­Tor­rent has no sta­tus be­cause it nev­er ran (white bor­der)

It's prob­a­bly worth men­tion­ing that pa­tience is im­por­tant at this point in the pro­jec­t, since "it runs quick­ly with im­me­di­ate feed­back" is not one of the ben­e­fits we are get­ting here.

This will be slow­er than just run­ning the scripts by hand. And now we have the search_­tor­rent task fail­ing.

Why?

Well, luck­i­ly we are us­ing air­flow, so we have logs!

The prob­lem is, Search_­Tor­rent is not get­ting the right ar­gu­ments. It "wants" a dict with at least se­ries_­name, sea­son and episode in it.

And ... that's now how these things work in air­flow :-)

Slight de­tour, I just ran in­to this: (is­sues.a­pache.org/ji­ra/browse/AI…)

So, I need to re­write my nice vir­tualenved op­er­a­tors in­to ugli­er not-vir­tualenved ones.

Shame, air­flow, shame. BTW, this is a mi­nor but im­por­tant part of de­vel­op­ing soft­ware. Some­times, you are do­ing things right and it will not work be­cause there is a bug some­where else.

Suck it up! So, back to the code, let's re­cap. I now have two op­er­a­tors. One op­er­a­tor looks for the lat­est episode of a TV se­ries (ex­am­ple: Pi­card), and re­turns all the rel­e­vant da­ta in a se­ri­al­iz­able thing, like a dict of dicts of strings and ints.

The sec­ond one will search for a tor­rent of a giv­en episode of a se­ries. It us­es the se­ries name, the sea­son num­ber, and the episode num­ber.

How does it know the da­ta it needs to search? Since it was re­turned by the pre­vi­ous task in the DAG, it gets it from "con­tex­t". Specif­i­cal­ly it's there as a XCOM with key "re­turn val­ue".

And once it has all the in­ter­est­ing tor­rent in­for­ma­tion, then it just adds it to the da­ta it got and re­turns THAT

How do we use these op­er­a­tors in our DAG?

To search for Pi­card, I use my tvdb op­er­a­tor with an ar­gu­ment "Pi­card"

To get the pi­card tor­ren­t, I use my rar­bg op­er­a­tor, and use provide_­con­tex­t-True, so it can ac­cess the out­put of the oth­er op­er­a­tor.

And then I hook them up

Does it work? Yes it does!

So, let's make it more in­ter­st­ing. A DAG of 2 nodes with one ar­row is not the least in­ter­est­ing DAG ev­er but it's close!

So, what hap­pens if I al­so want to watch ... "The Rook­ie"? I can just set­up a new tvdb op­er­a­tor with a dif­fer­ent ar­gu­men­t, and con­nect them both to the op­er­a­tor that search­es for tor­rents.

In the air­flow graph it looks like this:

And in an­oth­er view, the tree view, it looks like this:

So, this DAG will look ev­ery day for new episodes of Pi­card or The Rook­ie, then, if there is one, will trig­ger a tor­rent search for it. Adding fur­ther op­er­a­tors and tasks to ac­tu­al­ly down­load the tor­ren­t, transcode it, putting it in the right place, get­ting sub­ti­tles, and so ... is left as an ex­er­cise for the read­er (I gave some hints at the be­gin­ning of the thread)

If you en­joyed this thread, con­sid­er hir­ing me ;-)

Se­nior Python De­v, eng mgmt ex­pe­ri­ence, re­mote pre­ferred, based near Buenos Aires, Ar­genti­na. (ralsi­na.me/we­blog/­post­s/l…)


Contents © 2000-2024 Roberto Alsina