We love a good graph
By Nate Nowack •
One pretty frequent question we get from users is
I just got a brand new Prefect Cloud workspace. How do I migrate all my workflows and their config from my proof-of-concept open-source setup?
The historical answer has been
Here’s the prefect client, here are the methods, go write a script 👍 good luck!
… which is obviously not ideal.
So, we’ve built a new Prefect command that does this for you. It’s called prefect transfer
and it will be available in Prefect 3.4.14. It can transfer your resources between any two profiles, so that means you can migrate:
- from an open-source Prefect server to a Prefect Cloud workspace
- from a Prefect Cloud workspace to an open-source Prefect server
- from an open-source Prefect server to another open-source Prefect server
- from a Prefect Cloud workspace to another Prefect Cloud workspace
How does that happen?
In prefect, we have an idea of profiles. Profiles are essentially a group of settings that constitute a “Prefect environment”. See yours with
vim ~/.prefect/profiles.toml
You identify the source profile, e.g.
local
, that contains the settings required to talk to your environment where all your stuff lives (tryprefect profile ls
)You identify the destination profile (where you want to move your stuff) e.g.
prod
You run
prefect transfer --from local --to prod
That’s it! All of your:
will now exist in your destination profile. Resources that already exist in the destination are skipped (i.e. the command is idempotent).
“Cool story bro?”
On its face, this is a pretty pedestrian, quality of life utility - but (as fans of graphs) we think the implementation is pretty interesting!
Path Dependencies
The key challenge is that certain types of resources cannot exist without other resources. For example, a deployment cannot exist without a flow. A work queue cannot exist without a work pool.
So, when we transfer a resource, we must first ensure that resource’s dependencies are transferred first.
This problem might sound familiar to you. It’s the same problem that Kahn’s algorithm solves. Here’s a nice video explaining the algorithm.
The DAG Implementation
As far as our problem is concerned, the basic idea is that each type of resource can discover its own dependencies:
1class MigratableDeployment(MigratableResource[DeploymentResponse]):
2 async def get_dependencies(self) -> list[MigratableProtocol]:
3 deps = []
4 # A deployment needs its flow
5 if self.source_deployment.flow_id:
6 flow = await client.read_flow(self.source_deployment.flow_id)
7 deps.append(await MigratableFlow.construct(flow))
8 # And its work pool
9 if self.source_deployment.work_pool_name:
10 pool = await client.read_work_pool(self.source_deployment.work_pool_name)
11 deps.append(await MigratableWorkPool.construct(pool))
12 return deps
We build a DAG from these dependencies. Before execution, we verify it’s acyclic using three-color DFS.
The execution uses self-spawning workers with no central scheduler:
1async def worker(nid: uuid.UUID, tg: TaskGroup):
2 async with semaphore: # Respect max_workers limit
3 try:
4 await process_node(node)
5
6 # Check dependents under lock to prevent races
7 async with self._lock:
8 for dependent in self._status[nid].dependents:
9 if all(self._status[d].state == COMPLETED
10 for d in dependent.dependencies):
11 tg.start_soon(worker, dependent, tg)
12
13 except (TransferSkipped, Exception) as e:
14 # Both intentional skips and failures cascade to descendants
15 # TransferSkipped = "already exists", Exception = actual failure
16 to_skip = deque([nid])
17 while to_skip:
18 cur = to_skip.popleft()
19 for descendant in self._status[cur].dependents:
20 if descendant.state in {PENDING, READY}:
21 descendant.state = SKIPPED
22 to_skip.append(descendant)
The semaphore
bounds concurrency, the lock
prevents race conditions when checking dependencies, and the deque
ensures we skip entire subtrees when failures occur. If a work pool fails, its queues get skipped but unrelated branches continue.
No more scripting around 409s to move your resources around!
Excited to try it? Tried it already and have issues? Let us know on GitHub!