Prefect Dev Log

We love a good graph

By Nate Nowack

One pretty frequent question we get from users is

I just got a brand new Prefect Cloud workspace. How do I migrate all my workflows and their config from my proof-of-concept open-source setup?

The historical answer has been

Here’s the prefect client, here are the methods, go write a script 👍 good luck!

… which is obviously not ideal.

So, we’ve built a new Prefect command that does this for you. It’s called prefect transfer and it will be available in Prefect 3.4.14. It can transfer your resources between any two profiles, so that means you can migrate:

How does that happen?

  1. In prefect, we have an idea of profiles. Profiles are essentially a group of settings that constitute a “Prefect environment”. See yours with vim ~/.prefect/profiles.toml

  2. You identify the source profile, e.g. local, that contains the settings required to talk to your environment where all your stuff lives (try prefect profile ls)

  3. You identify the destination profile (where you want to move your stuff) e.g. prod

  4. You run prefect transfer --from local --to prod

That’s it! All of your:

will now exist in your destination profile. Resources that already exist in the destination are skipped (i.e. the command is idempotent).

“Cool story bro?”

On its face, this is a pretty pedestrian, quality of life utility - but (as fans of graphs) we think the implementation is pretty interesting!

Path Dependencies

The key challenge is that certain types of resources cannot exist without other resources. For example, a deployment cannot exist without a flow. A work queue cannot exist without a work pool.

So, when we transfer a resource, we must first ensure that resource’s dependencies are transferred first.

This problem might sound familiar to you. It’s the same problem that Kahn’s algorithm solves. Here’s a nice video explaining the algorithm.

The DAG Implementation

As far as our problem is concerned, the basic idea is that each type of resource can discover its own dependencies:

 1class MigratableDeployment(MigratableResource[DeploymentResponse]):
 2    async def get_dependencies(self) -> list[MigratableProtocol]:
 3        deps = []
 4        # A deployment needs its flow
 5        if self.source_deployment.flow_id:
 6            flow = await client.read_flow(self.source_deployment.flow_id)
 7            deps.append(await MigratableFlow.construct(flow))
 8        # And its work pool
 9        if self.source_deployment.work_pool_name:
10            pool = await client.read_work_pool(self.source_deployment.work_pool_name)
11            deps.append(await MigratableWorkPool.construct(pool))
12        return deps

We build a DAG from these dependencies. Before execution, we verify it’s acyclic using three-color DFS.

The execution uses self-spawning workers with no central scheduler:

 1async def worker(nid: uuid.UUID, tg: TaskGroup):
 2    async with semaphore:  # Respect max_workers limit
 3        try:
 4            await process_node(node)
 5            
 6            # Check dependents under lock to prevent races
 7            async with self._lock:
 8                for dependent in self._status[nid].dependents:
 9                    if all(self._status[d].state == COMPLETED 
10                           for d in dependent.dependencies):
11                        tg.start_soon(worker, dependent, tg)
12                        
13        except (TransferSkipped, Exception) as e:
14            # Both intentional skips and failures cascade to descendants
15            # TransferSkipped = "already exists", Exception = actual failure
16            to_skip = deque([nid])
17            while to_skip:
18                cur = to_skip.popleft()
19                for descendant in self._status[cur].dependents:
20                    if descendant.state in {PENDING, READY}:
21                        descendant.state = SKIPPED
22                        to_skip.append(descendant)

The semaphore bounds concurrency, the lock prevents race conditions when checking dependencies, and the deque ensures we skip entire subtrees when failures occur. If a work pool fails, its queues get skipped but unrelated branches continue.

No more scripting around 409s to move your resources around!

Excited to try it? Tried it already and have issues? Let us know on GitHub!


  1. On Prefect Cloud, the work pools you can create may be limited by your plan↩︎

<< Previous Post

|

Next Post >>

#Data Platform #Cli #Dag