-
Notifications
You must be signed in to change notification settings - Fork 13
/
Copy pathagent_pools.py
79 lines (61 loc) · 2.81 KB
/
agent_pools.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
"""
Module for Terraform Enterprise/Cloud Migration Worker: Agent Pools.
"""
from .base_worker import TFCMigratorBaseWorker
class AgentPoolsWorker(TFCMigratorBaseWorker):
"""
A class to represent the worker that will migrate all agent pools from one
TFC/E org to another TFC/E org.
"""
_api_module_used = "agents"
_required_entitlements = ["agents"]
def migrate(self):
"""
Function to migrate all agent pools from one TFC/E org to another TFC/E org.
"""
self._logger.info("Migrating agent pools...")
# Only perform this migration if it's TFC to TFC, otherwise it won't work.
agent_pools_map = {}
# Fetch agent pools from existing org
source_agent_pools = self._api_source.agents.list_pools()["data"]
target_agent_pools = self._api_target.agents.list_pools()["data"]
target_agent_pool_data = {}
for target_agent_pool in target_agent_pools:
target_agent_pool_data[target_agent_pool["attributes"]["name"]] = \
target_agent_pool["id"]
for source_agent_pool in source_agent_pools:
source_agent_pool_name = source_agent_pool["attributes"]["name"]
source_agent_pool_id = source_agent_pool["id"]
if source_agent_pool_name in target_agent_pool_data:
agent_pools_map[source_agent_pool_id] = \
target_agent_pool_data[source_agent_pool_name]
self._logger.info(\
"Agent pool: %s, exists. Skipped.", source_agent_pool_name)
continue
# Build the new agent pool payload
new_agent_pool_payload = {
"data": {
"type": "agent-pools",
"attributes": {
"name": source_agent_pool_name
}
}
}
# Create Agent Pool in the target org
new_agent_pool = self._api_target.agents.create_pool(new_agent_pool_payload)
new_agent_pool_id = new_agent_pool["data"]["id"]
agent_pools_map[source_agent_pool_id] = new_agent_pool_id
self._logger.info("Agent pools migrated.")
return agent_pools_map
def delete_all_from_target(self):
"""
Function to delete all agent pools from the target TFC/E org.
"""
self._logger.info("Deleting agent pools...")
agent_pools = self._api_target.agents.list_pools()["data"]
if agent_pools:
for agent_pool in agent_pools:
if agent_pool["attributes"]["name"] != "Default":
self._logger.info("Agent pool: %s, deleted.", agent_pool["attributes"]["name"])
self._api_target.agents.destroy_pool(agent_pool["id"])
self._logger.info("Agent pools deleted.")