Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 104 additions & 35 deletions hardeneks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
NamespacedResources,
Resources,
)
from .harden import harden
from .harden import harden, cluster_data


app = typer.Typer()
Expand All @@ -40,28 +40,69 @@ def _config_callback(value: str):
return value


def _get_current_context(context):
if context:
return context
_, active_context = kubernetes.config.list_kube_config_contexts()
return active_context["name"]
def _get_cluster_name_from_context(clusterNameStr):

if clusterNameStr.endswith('eksctl.io'):
clusterName = clusterNameStr.split('.')[0]
elif clusterNameStr.startswith('arn:'):
clusterName = clusterNameStr.split('/')[-1]
else:
clusterName = clusterNameStr

return clusterName



def _get_current_context(contextFromUser, clusterFromUser):

contextName = None
clusterName = None

#print("contextFromUser={} clusterFromUser={}".format(contextFromUser, clusterFromUser))

contextList, active_context = kubernetes.config.list_kube_config_contexts()

if contextFromUser:
contextName = contextFromUser
if clusterFromUser:
clusterName = clusterFromUser
else:
for contextData in contextList:
#print("contextData={}".format(contextData))
if contextData['name'] == contextFromUser:
clusterName = _get_cluster_name_from_context(contextData['context']['cluster'])
else:
if clusterFromUser:
clusterName = clusterFromUser
for contextData in contextList:
clusterNameFromContext = _get_cluster_name_from_context(contextData['context']['cluster'])
#print("clusterNameFromContext={} clusterFromUser={}".format(clusterNameFromContext, clusterFromUser))
if clusterNameFromContext == clusterFromUser:
contextName = contextData['name']
print("contextName={}".format(contextName))

else:
contextName = active_context['name']
clusterName = _get_cluster_name_from_context(active_context['context']['cluster'])


if contextName and clusterName:
#print("contextName={} clusterName={}".format(contextName, clusterName))
return (contextName, clusterName)
else:
#print("contextName={} and clusterName={} are not valid. Exiting the program".format(contextName, clusterName))
sys.exit()


def _get_namespaces(ignored_ns: list) -> list:
v1 = kubernetes.client.CoreV1Api()
namespaces = [i.metadata.name for i in v1.list_namespace().items]
return list(set(namespaces) - set(ignored_ns))


def _get_cluster_name(context, region):
try:
client = boto3.client("eks", region_name=region)
for name in client.list_clusters()["clusters"]:
if name in context:
return name
except EndpointConnectionError:
raise ValueError(f"{region} seems like a bad region name")

def _get_pillars() -> list:
pillarsList = ["security", "reliability", "cluster-autoscaling", "networking"]
return pillarsList


def _get_region():
return boto3.session.Session().region_name
Expand Down Expand Up @@ -114,6 +155,22 @@ def run_hardeneks(
False,
"--insecure-skip-tls-verify",
),
pillars: str = typer.Option(
default=None,
help="Specific pillars to harden. Default is all pillars.",
),
run_only_cluster_level_checks: bool = typer.Option(
False,
"--run_only_cluster_level_checks",
),
run_only_namespace_level_checks: bool = typer.Option(
False,
"--run_only_namespace_level_checks",
),
debug: bool = typer.Option(
False,
"--debug",
),
):
"""
Main entry point to hardeneks.
Expand All @@ -132,16 +189,14 @@ def run_hardeneks(
None

"""

(context, cluster) = _get_current_context(context, cluster)

if insecure_skip_tls_verify:
_load_kube_config()
else:
kubernetes.config.load_kube_config(context=context)

context = _get_current_context(context)

if not cluster:
cluster = _get_cluster_name(context, region)

if not region:
region = _get_region()

Expand All @@ -158,26 +213,40 @@ def run_hardeneks(
if not namespace:
namespaces = _get_namespaces(config["ignore-namespaces"])
else:
namespaces = [namespace]
#namespaces = [namespace]
namespaces = namespace.split(',')

if not pillars:
pillarsList = _get_pillars()
else:
#namespaces = [namespace]
pillarsList = pillars.split(',')


rules = config["rules"]

resources = Resources(region, context, cluster, namespaces, debug)
resources.set_resources()
cluster_data(resources, rules, "cluster_wide")


console.rule("[b]Checking cluster wide rules", characters="- ")
console.print()

resources = Resources(region, context, cluster, namespaces)
resources.set_resources()
harden(resources, rules, "cluster_wide")

for ns in namespaces:
console.rule(
f"[b]Checking rules against namespace: {ns}", characters=" -"
)
console.print()
resources = NamespacedResources(region, context, cluster, ns)
resources.set_resources()
harden(resources, rules, "namespace_based")
console.print()
if not run_only_namespace_level_checks:
#resources = Resources(region, context, cluster, namespaces, debug)
#resources.set_resources()
harden(resources, rules, "cluster_wide", pillarsList)

if not run_only_cluster_level_checks:
for ns in namespaces:
#console.rule(f"[b]Checking rules against namespace: {ns}", characters=" -")
#console.print()
resources = NamespacedResources(region, context, cluster, ns, debug)
resources.set_resources()
harden(resources, rules, "namespace_based", pillarsList)
console.print()


if export_txt:
console.save_text(export_txt)
Expand Down
Empty file.
Loading