From d9f9d76fad091f278000480fbc37e60b11dd97d1 Mon Sep 17 00:00:00 2001 From: Mahdi Ramezani Date: Wed, 14 Jan 2026 02:09:31 +0000 Subject: [PATCH 1/3] Modified the wildcard matching logic for client cert CN names. Signed-off-by: Mahdi Ramezani --- go-server-server/go/auth.go | 41 +++++++++++++++++++--------- test/test_restapi.py | 54 ++++++++++++++++++++++++++----------- 2 files changed, 68 insertions(+), 27 deletions(-) diff --git a/go-server-server/go/auth.go b/go-server-server/go/auth.go index 502fde7..b6346fc 100644 --- a/go-server-server/go/auth.go +++ b/go-server-server/go/auth.go @@ -9,24 +9,41 @@ import ( func CommonNameMatch(r *http.Request) bool { // During client cert authentication, after the certificate chain is validated by // TLS, here we will further check if at least one of the common names in the end-entity certificate - // matches one of the trusted common names of the server config. - for _, peercert := range r.TLS.PeerCertificates { - commonName := peercert.Subject.CommonName - log.Printf("info: CommonName in the client cert: %s", commonName) - for _, name := range trustedCertCommonNames { - if strings.HasPrefix(name, "*") { + // matches one of the trusted common names in the server config. + + for _, name := range trustedCertCommonNames { + is_wildcard := false + dots_in_trusted_cn := 0 + domain := name + if strings.HasPrefix(name, "*.") { + if len(name) < 3 { + log.Printf("warning: Skipping invalid trusted common name %s", name) + continue; + } + is_wildcard = true + domain = name[1:] //strip "*" but keep the "." at the beginning + dots_in_trusted_cn = strings.Count(domain, ".") + } else if strings.HasPrefix(name, "*") { + log.Printf("warning: Skipping invalid trusted common name %s", name) + continue; + } + for _, peercert := range r.TLS.PeerCertificates { + commonName := peercert.Subject.CommonName + if is_wildcard { // wildcard common name matching - domain := name[1:] //strip "*" - if strings.HasSuffix(commonName, domain) { - log.Printf("info: CommonName %s in the client cert matches trusted wildcard common name %s", commonName, name) + if len(commonName) > len(domain) && strings.HasSuffix(commonName, domain) && dots_in_trusted_cn == strings.Count(commonName, ".") { + log.Printf("info: Wildcard match between common name %s in the client cert and trusted common name %s", commonName, name) + return true; + } + } else { + if commonName == name { + log.Printf("info: Exact match with trusted common name %s", name) return true; } - } else if commonName == name { - return true; } } } - log.Printf("error: Authentication Fail! None of the CommonNames in the client cert match any of the trusted common names") + log.Printf("error: Authentication Fail! None of the common names in the client cert match any of the trusted common names") return false; } \ No newline at end of file diff --git a/test/test_restapi.py b/test/test_restapi.py index 607b965..f74cdfe 100644 --- a/test/test_restapi.py +++ b/test/test_restapi.py @@ -69,12 +69,24 @@ def test_exact_match_success(self, setup_restapi_client): r = restapi_client.get_heartbeat(client_cert=(client_cert.cert, client_cert.key)) assert r.status_code == 200 - def test_exact_match_failure(self, setup_restapi_client): + def test_exact_match_failure_1(self, setup_restapi_client): _, _, _, restapi_client = setup_restapi_client with ClientCert("client.restapi.sonic") as client_cert: r = restapi_client.get_heartbeat(client_cert=(client_cert.cert, client_cert.key)) assert r.status_code == 401 + def test_exact_match_failure_2(self, setup_restapi_client): + _, _, _, restapi_client = setup_restapi_client + with ClientCert("test.client.restapi.com") as client_cert: + r = restapi_client.get_heartbeat(client_cert=(client_cert.cert, client_cert.key)) + assert r.status_code == 401 + + def test_exact_match_failure_3(self, setup_restapi_client): + _, _, _, restapi_client = setup_restapi_client + with ClientCert("sub.test.client.restapi.sonic") as client_cert: + r = restapi_client.get_heartbeat(client_cert=(client_cert.cert, client_cert.key)) + assert r.status_code == 401 + # Wildcard match tests for "*.example.sonic" def test_wildcard_match_success_1(self, setup_restapi_client): @@ -85,67 +97,79 @@ def test_wildcard_match_success_1(self, setup_restapi_client): def test_wildcard_match_success_2(self, setup_restapi_client): _, _, _, restapi_client = setup_restapi_client - with ClientCert("another.test.example.sonic") as client_cert: + with ClientCert("a.example.sonic") as client_cert: r = restapi_client.get_heartbeat(client_cert=(client_cert.cert, client_cert.key)) assert r.status_code == 200 def test_wildcard_match_failure_1(self, setup_restapi_client): _, _, _, restapi_client = setup_restapi_client - with ClientCert("example.sonic") as client_cert: + with ClientCert("sub.test.example.sonic") as client_cert: r = restapi_client.get_heartbeat(client_cert=(client_cert.cert, client_cert.key)) assert r.status_code == 401 def test_wildcard_match_failure_2(self, setup_restapi_client): _, _, _, restapi_client = setup_restapi_client - with ClientCert("test.example") as client_cert: + with ClientCert("example.sonic") as client_cert: r = restapi_client.get_heartbeat(client_cert=(client_cert.cert, client_cert.key)) assert r.status_code == 401 def test_wildcard_match_failure_3(self, setup_restapi_client): _, _, _, restapi_client = setup_restapi_client - with ClientCert("test.example.com") as client_cert: + with ClientCert("test.example") as client_cert: r = restapi_client.get_heartbeat(client_cert=(client_cert.cert, client_cert.key)) assert r.status_code == 401 def test_wildcard_match_failure_4(self, setup_restapi_client): _, _, _, restapi_client = setup_restapi_client - with ClientCert("someexample.sonic") as client_cert: + with ClientCert("test.example.com") as client_cert: r = restapi_client.get_heartbeat(client_cert=(client_cert.cert, client_cert.key)) assert r.status_code == 401 def test_wildcard_match_failure_5(self, setup_restapi_client): + _, _, _, restapi_client = setup_restapi_client + with ClientCert("someexample.sonic") as client_cert: + r = restapi_client.get_heartbeat(client_cert=(client_cert.cert, client_cert.key)) + assert r.status_code == 401 + + def test_wildcard_match_failure_6(self, setup_restapi_client): _, _, _, restapi_client = setup_restapi_client with ClientCert("test.example.sonic.com") as client_cert: r = restapi_client.get_heartbeat(client_cert=(client_cert.cert, client_cert.key)) assert r.status_code == 401 + def test_wildcard_match_failure_7(self, setup_restapi_client): + _, _, _, restapi_client = setup_restapi_client + with ClientCert(".example.sonic") as client_cert: + r = restapi_client.get_heartbeat(client_cert=(client_cert.cert, client_cert.key)) + assert r.status_code == 401 + # Wildcard match tests for "*test.sonic" - def test_wildcard_match_success_a(self, setup_restapi_client): + def test_wildcard_match_failure_a(self, setup_restapi_client): _, _, _, restapi_client = setup_restapi_client with ClientCert("mytest.sonic") as client_cert: r = restapi_client.get_heartbeat(client_cert=(client_cert.cert, client_cert.key)) - assert r.status_code == 200 + assert r.status_code == 401 - def test_wildcard_match_success_b(self, setup_restapi_client): + def test_wildcard_match_failure_b(self, setup_restapi_client): _, _, _, restapi_client = setup_restapi_client with ClientCert("test.sonic") as client_cert: r = restapi_client.get_heartbeat(client_cert=(client_cert.cert, client_cert.key)) - assert r.status_code == 200 + assert r.status_code == 401 - def test_wildcard_match_success_c(self, setup_restapi_client): + def test_wildcard_match_failure_c(self, setup_restapi_client): _, _, _, restapi_client = setup_restapi_client - with ClientCert("example.test.sonic") as client_cert: + with ClientCert("sub.test.sonic") as client_cert: r = restapi_client.get_heartbeat(client_cert=(client_cert.cert, client_cert.key)) - assert r.status_code == 200 + assert r.status_code == 401 - def test_wildcard_match_failure_a(self, setup_restapi_client): + def test_wildcard_match_failure_d(self, setup_restapi_client): _, _, _, restapi_client = setup_restapi_client with ClientCert("est.sonic") as client_cert: r = restapi_client.get_heartbeat(client_cert=(client_cert.cert, client_cert.key)) assert r.status_code == 401 - def test_wildcard_match_failure_b(self, setup_restapi_client): + def test_wildcard_match_failure_e(self, setup_restapi_client): _, _, _, restapi_client = setup_restapi_client with ClientCert("test.sonico") as client_cert: r = restapi_client.get_heartbeat(client_cert=(client_cert.cert, client_cert.key)) From 824233894af26980f274cc1a82f8a86ca4e4acfd Mon Sep 17 00:00:00 2001 From: Mahdi Ramezani Date: Wed, 14 Jan 2026 02:32:17 +0000 Subject: [PATCH 2/3] Fixed indentation. Signed-off-by: Mahdi Ramezani --- go-server-server/go/auth.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/go-server-server/go/auth.go b/go-server-server/go/auth.go index b6346fc..cd3598a 100644 --- a/go-server-server/go/auth.go +++ b/go-server-server/go/auth.go @@ -7,8 +7,8 @@ import ( ) func CommonNameMatch(r *http.Request) bool { - // During client cert authentication, after the certificate chain is validated by - // TLS, here we will further check if at least one of the common names in the end-entity certificate + // During client cert authentication, after the certificate chain is validated by + // TLS, here we will further check if at least one of the common names in the end-entity certificate // matches one of the trusted common names in the server config. for _, name := range trustedCertCommonNames { @@ -44,6 +44,6 @@ func CommonNameMatch(r *http.Request) bool { } } - log.Printf("error: Authentication Fail! None of the common names in the client cert match any of the trusted common names") - return false; + log.Printf("error: Authentication Fail! None of the common names in the client cert match any of the trusted common names") + return false; } \ No newline at end of file From 010d5dc5bcec8ffc97fb7d893f9bfa6113366b8a Mon Sep 17 00:00:00 2001 From: Mahdi Ramezani Date: Thu, 15 Jan 2026 23:29:03 +0000 Subject: [PATCH 3/3] Multi-level subdomains are now accepted for wildcard matches. Added new unit tests. Signed-off-by: Mahdi Ramezani --- go-server-server/go/auth.go | 4 +- supervisor/rest_api_test.conf | 2 +- test/test_restapi.py | 74 ++++++++++++++++++++++++++++------- 3 files changed, 61 insertions(+), 19 deletions(-) diff --git a/go-server-server/go/auth.go b/go-server-server/go/auth.go index cd3598a..21c9e1b 100644 --- a/go-server-server/go/auth.go +++ b/go-server-server/go/auth.go @@ -13,7 +13,6 @@ func CommonNameMatch(r *http.Request) bool { for _, name := range trustedCertCommonNames { is_wildcard := false - dots_in_trusted_cn := 0 domain := name if strings.HasPrefix(name, "*.") { if len(name) < 3 { @@ -22,7 +21,6 @@ func CommonNameMatch(r *http.Request) bool { } is_wildcard = true domain = name[1:] //strip "*" but keep the "." at the beginning - dots_in_trusted_cn = strings.Count(domain, ".") } else if strings.HasPrefix(name, "*") { log.Printf("warning: Skipping invalid trusted common name %s", name) continue; @@ -31,7 +29,7 @@ func CommonNameMatch(r *http.Request) bool { commonName := peercert.Subject.CommonName if is_wildcard { // wildcard common name matching - if len(commonName) > len(domain) && strings.HasSuffix(commonName, domain) && dots_in_trusted_cn == strings.Count(commonName, ".") { + if len(commonName) > len(domain) && strings.HasSuffix(commonName, domain) { log.Printf("info: Wildcard match between common name %s in the client cert and trusted common name %s", commonName, name) return true; } diff --git a/supervisor/rest_api_test.conf b/supervisor/rest_api_test.conf index 3afccb6..d056744 100644 --- a/supervisor/rest_api_test.conf +++ b/supervisor/rest_api_test.conf @@ -1,5 +1,5 @@ [program:rest-api] -command=/usr/sbin/go-server-server.test -test.coverprofile=/coverage.cov -systemtest=true -enablehttps=true -clientcert=/usr/sbin/cert/client/selfsigned.crt -servercert=/usr/sbin/cert/server/selfsigned.crt -serverkey=/usr/sbin/cert/server/selfsigned.key -localapitestdocker=true -clientcertcommonname=test.client.restapi.sonic,*.example.sonic,*test.sonic -loglevel trace +command=/usr/sbin/go-server-server.test -test.coverprofile=/coverage.cov -systemtest=true -enablehttps=true -clientcert=/usr/sbin/cert/client/selfsigned.crt -servercert=/usr/sbin/cert/server/selfsigned.crt -serverkey=/usr/sbin/cert/server/selfsigned.key -localapitestdocker=true -clientcertcommonname=test.client.restapi.sonic,*.example.sonic,*test.sonic,*. -loglevel trace priority=1 autostart=false autorestart=false diff --git a/test/test_restapi.py b/test/test_restapi.py index f74cdfe..a23597d 100644 --- a/test/test_restapi.py +++ b/test/test_restapi.py @@ -61,7 +61,7 @@ def __exit__(self, exc_type, exc_value, traceback): class TestClientCertAuth: - # Exact match tests for "test.client.restapi.sonic" + # Exact matching tests for "test.client.restapi.sonic" def test_exact_match_success(self, setup_restapi_client): _, _, _, restapi_client = setup_restapi_client @@ -87,7 +87,13 @@ def test_exact_match_failure_3(self, setup_restapi_client): r = restapi_client.get_heartbeat(client_cert=(client_cert.cert, client_cert.key)) assert r.status_code == 401 - # Wildcard match tests for "*.example.sonic" + def test_exact_match_failure_4(self, setup_restapi_client): + _, _, _, restapi_client = setup_restapi_client + with ClientCert("TEST.CLIENT.RESTAPI.SONIC") as client_cert: + r = restapi_client.get_heartbeat(client_cert=(client_cert.cert, client_cert.key)) + assert r.status_code == 401 + + # Wildcard matching tests for "*.example.sonic" def test_wildcard_match_success_1(self, setup_restapi_client): _, _, _, restapi_client = setup_restapi_client @@ -101,80 +107,118 @@ def test_wildcard_match_success_2(self, setup_restapi_client): r = restapi_client.get_heartbeat(client_cert=(client_cert.cert, client_cert.key)) assert r.status_code == 200 - def test_wildcard_match_failure_1(self, setup_restapi_client): + def test_wildcard_match_success_3(self, setup_restapi_client): _, _, _, restapi_client = setup_restapi_client with ClientCert("sub.test.example.sonic") as client_cert: + r = restapi_client.get_heartbeat(client_cert=(client_cert.cert, client_cert.key)) + assert r.status_code == 200 + + def test_wildcard_match_success_4(self, setup_restapi_client): + _, _, _, restapi_client = setup_restapi_client + with ClientCert("TEST.example.sonic") as client_cert: + r = restapi_client.get_heartbeat(client_cert=(client_cert.cert, client_cert.key)) + assert r.status_code == 200 + + def test_wildcard_match_failure_1(self, setup_restapi_client): + _, _, _, restapi_client = setup_restapi_client + with ClientCert("example.sonic") as client_cert: r = restapi_client.get_heartbeat(client_cert=(client_cert.cert, client_cert.key)) assert r.status_code == 401 def test_wildcard_match_failure_2(self, setup_restapi_client): _, _, _, restapi_client = setup_restapi_client - with ClientCert("example.sonic") as client_cert: + with ClientCert("test.example") as client_cert: r = restapi_client.get_heartbeat(client_cert=(client_cert.cert, client_cert.key)) assert r.status_code == 401 def test_wildcard_match_failure_3(self, setup_restapi_client): _, _, _, restapi_client = setup_restapi_client - with ClientCert("test.example") as client_cert: + with ClientCert("test.example.com") as client_cert: r = restapi_client.get_heartbeat(client_cert=(client_cert.cert, client_cert.key)) assert r.status_code == 401 def test_wildcard_match_failure_4(self, setup_restapi_client): _, _, _, restapi_client = setup_restapi_client - with ClientCert("test.example.com") as client_cert: + with ClientCert("someexample.sonic") as client_cert: r = restapi_client.get_heartbeat(client_cert=(client_cert.cert, client_cert.key)) assert r.status_code == 401 def test_wildcard_match_failure_5(self, setup_restapi_client): _, _, _, restapi_client = setup_restapi_client - with ClientCert("someexample.sonic") as client_cert: + with ClientCert("test.example.sonic.com") as client_cert: r = restapi_client.get_heartbeat(client_cert=(client_cert.cert, client_cert.key)) assert r.status_code == 401 def test_wildcard_match_failure_6(self, setup_restapi_client): _, _, _, restapi_client = setup_restapi_client - with ClientCert("test.example.sonic.com") as client_cert: + with ClientCert(".example.sonic") as client_cert: r = restapi_client.get_heartbeat(client_cert=(client_cert.cert, client_cert.key)) assert r.status_code == 401 def test_wildcard_match_failure_7(self, setup_restapi_client): _, _, _, restapi_client = setup_restapi_client - with ClientCert(".example.sonic") as client_cert: + with ClientCert("TEST.EXAMPLE.SONIC") as client_cert: r = restapi_client.get_heartbeat(client_cert=(client_cert.cert, client_cert.key)) assert r.status_code == 401 - # Wildcard match tests for "*test.sonic" + # Matching tests for "*test.sonic" (invalid CN) - def test_wildcard_match_failure_a(self, setup_restapi_client): + def test_invalid_match_failure_1(self, setup_restapi_client): _, _, _, restapi_client = setup_restapi_client with ClientCert("mytest.sonic") as client_cert: r = restapi_client.get_heartbeat(client_cert=(client_cert.cert, client_cert.key)) assert r.status_code == 401 - def test_wildcard_match_failure_b(self, setup_restapi_client): + def test_invalid_match_failure_2(self, setup_restapi_client): _, _, _, restapi_client = setup_restapi_client with ClientCert("test.sonic") as client_cert: r = restapi_client.get_heartbeat(client_cert=(client_cert.cert, client_cert.key)) assert r.status_code == 401 - def test_wildcard_match_failure_c(self, setup_restapi_client): + def test_invalid_match_failure_3(self, setup_restapi_client): _, _, _, restapi_client = setup_restapi_client with ClientCert("sub.test.sonic") as client_cert: r = restapi_client.get_heartbeat(client_cert=(client_cert.cert, client_cert.key)) assert r.status_code == 401 - def test_wildcard_match_failure_d(self, setup_restapi_client): + def test_invalid_match_failure_4(self, setup_restapi_client): _, _, _, restapi_client = setup_restapi_client with ClientCert("est.sonic") as client_cert: r = restapi_client.get_heartbeat(client_cert=(client_cert.cert, client_cert.key)) assert r.status_code == 401 - def test_wildcard_match_failure_e(self, setup_restapi_client): + def test_invalid_match_failure_5(self, setup_restapi_client): _, _, _, restapi_client = setup_restapi_client with ClientCert("test.sonico") as client_cert: r = restapi_client.get_heartbeat(client_cert=(client_cert.cert, client_cert.key)) assert r.status_code == 401 + # Corner cases + + def test_empty_cn(self, setup_restapi_client): + _, _, _, restapi_client = setup_restapi_client + with ClientCert("") as client_cert: + r = restapi_client.get_heartbeat(client_cert=(client_cert.cert, client_cert.key)) + assert r.status_code == 401 + + def test_missing_tld(self, setup_restapi_client): + _, _, _, restapi_client = setup_restapi_client + with ClientCert("test.example.") as client_cert: + r = restapi_client.get_heartbeat(client_cert=(client_cert.cert, client_cert.key)) + assert r.status_code == 401 + + def test_match_all(self, setup_restapi_client): + _, _, _, restapi_client = setup_restapi_client + with ClientCert("*") as client_cert: + r = restapi_client.get_heartbeat(client_cert=(client_cert.cert, client_cert.key)) + assert r.status_code == 401 + + def test_ends_with_dot(self, setup_restapi_client): + _, _, _, restapi_client = setup_restapi_client + with ClientCert("*.") as client_cert: + r = restapi_client.get_heartbeat(client_cert=(client_cert.cert, client_cert.key)) + assert r.status_code == 401 + class TestRestApiPositive: """Normal behaviour tests"""