From e8520196873d8b22548003e739b7e3e93798bf2c Mon Sep 17 00:00:00 2001 From: amirshabanics Date: Fri, 20 Dec 2024 01:22:25 +0330 Subject: [PATCH] Add Fragment to VLESS Configuration in WS - Use ruff to format the code - Export inbount to `0.0.0.0` - Change some fields to None for being more simply and smaller config - Change == or != None to is or is not None - Add Fragment to VLESS WS (I test and use it. if we aggree we can implement for all configs too) - Update WS VLESS config with the new version --- v2ray2json.py | 570 ++++++++++++++++++++++++++++++-------------------- 1 file changed, 345 insertions(+), 225 deletions(-) diff --git a/v2ray2json.py b/v2ray2json.py index f468e36..91afb92 100644 --- a/v2ray2json.py +++ b/v2ray2json.py @@ -81,7 +81,9 @@ class LogBean: loglevel: str dnsLog: bool - def __init__(self, access: str, error: str, loglevel: str, dnsLog: bool) -> None: + def __init__( + self, access: str, error: str, loglevel: str, dnsLog: bool + ) -> None: self.access = access self.error = error self.loglevel = loglevel @@ -93,13 +95,19 @@ class SniffingBean: enabled: bool destOverride: list[str] # str metadataOnly: bool + routeOnly: bool def __init__( - self, enabled: bool, destOverride: list[str], metadataOnly: bool + self, + enabled: bool, + destOverride: list[str], + metadataOnly: bool, + routeOnly: bool = None, ) -> None: self.enabled = enabled self.destOverride = destOverride self.metadataOnly = metadataOnly + self.routeOnly = routeOnly class InSettingsBean: auth: str = None @@ -161,17 +169,17 @@ class VnextBean: class UsersBean: id: str = "" alterId: int = None - security: str = DEFAULT_SECURITY - level: int = DEFAULT_LEVEL + security: str = None + level: int = None encryption: str = "" - flow: str = "" + flow: str = None def __init__( self, id: str = "", alterId: int = None, - security: str = DEFAULT_SECURITY, - level: int = DEFAULT_LEVEL, + security: str = None, + level: int = None, encryption: str = "", flow: str = "", ) -> None: @@ -204,7 +212,10 @@ class SocksUsersBean: level: int = DEFAULT_LEVEL def __init__( - self, user: str = "", _pass: str = "", level: int = DEFAULT_LEVEL + self, + user: str = "", + _pass: str = "", + level: int = DEFAULT_LEVEL, ) -> None: self.user = user self._pass = _pass @@ -255,10 +266,24 @@ class WireGuardBean: publicKey: str = "" endpoint: str = "" - def __init__(self, publicKey: str = "", endpoint: str = "") -> None: + def __init__( + self, publicKey: str = "", endpoint: str = "" + ) -> None: self.publicKey = publicKey self.endpoint = endpoint + class FragmentBean: + interval: str + length: str + packets: str + + def __init__( + self, interval: str, length: str, packets: str + ) -> None: + self.interval = interval + self.length = length + self.packets = packets + vnext: list[VnextBean] = None # VnextBean servers: list[ServersBean] = None # ServersBean response: Response = None @@ -271,6 +296,7 @@ def __init__(self, publicKey: str = "", endpoint: str = "") -> None: inboundTag: str = None secretKey: str = None peers: list[WireGuardBean] = None # WireGuardBean + fragment: FragmentBean = None # FragmentBean def __init__( self, @@ -286,6 +312,7 @@ def __init__( inboundTag: str = None, secretKey: str = None, peers: list[WireGuardBean] = None, + fragment: FragmentBean = None, ) -> None: self.vnext = vnext self.servers = servers @@ -299,6 +326,7 @@ def __init__( self.inboundTag = inboundTag self.secretKey = secretKey self.peers = peers + self.fragment = fragment class StreamSettingsBean: class TcpSettingsBean: @@ -404,14 +432,8 @@ def __init__( self.seed = seed class WsSettingsBean: - class HeadersBean: - Host: str = "" - - def __init__(self, Host: str = "") -> None: - self.Host = Host - path: str = "" - headers: HeadersBean = HeadersBean() + host: str = "" maxEarlyData: int = None useBrowserForwarding: bool = None acceptProxyProtocol: bool = None @@ -419,13 +441,13 @@ def __init__(self, Host: str = "") -> None: def __init__( self, path: str = "", - headers: HeadersBean = HeadersBean(), + host: str = "", maxEarlyData: int = None, useBrowserForwarding: bool = None, acceptProxyProtocol: bool = None, ) -> None: self.path = path - self.headers = headers + self.host = host self.maxEarlyData = maxEarlyData self.useBrowserForwarding = useBrowserForwarding self.acceptProxyProtocol = acceptProxyProtocol @@ -439,7 +461,7 @@ def __init__(self, host: list[str] = [], path: str = "") -> None: self.path = path class TlsSettingsBean: - allowInsecure: bool = False + allowInsecure: bool = None serverName: str = "" alpn: list[str] = None # str minVersion: str = None @@ -450,14 +472,14 @@ class TlsSettingsBean: certificates: list[any] = None # any disableSystemRoot: bool = None enableSessionResumption: bool = None - show: bool = False + show: bool = None publicKey: str = None shortId: str = None spiderX: str = None def __init__( self, - allowInsecure: bool = False, + allowInsecure: bool = None, serverName: str = "", alpn: list[str] = None, minVersion: str = None, @@ -468,7 +490,7 @@ def __init__( certificates: list[any] = None, disableSystemRoot: bool = None, enableSessionResumption: bool = None, - show: bool = False, + show: bool = None, publicKey: str = None, shortId: str = None, spiderX: str = None, @@ -514,10 +536,22 @@ class GrpcSettingsBean: serviceName: str = "" multiMode: bool = None - def __init__(self, serviceName: str = "", multiMode: bool = None) -> None: + def __init__( + self, serviceName: str = "", multiMode: bool = None + ) -> None: self.serviceName = serviceName self.multiMode = multiMode + class SocktOptSettingsBean: + dialerProxy: str = None + tcpNoDelay: bool = None + + def __init__( + self, dialerProxy: str = None, tcpNoDelay: bool = None + ) -> None: + self.dialerProxy = dialerProxy + self.tcpNoDelay = tcpNoDelay + network: str = DEFAULT_NETWORK security: str = "" tcpSettings: TcpSettingsBean = None @@ -529,7 +563,7 @@ def __init__(self, serviceName: str = "", multiMode: bool = None) -> None: realitySettings: TlsSettingsBean = None grpcSettings: GrpcSettingsBean = None dsSettings: any = None - sockopt: any = None + sockopt: SocktOptSettingsBean = None def __init__( self, @@ -544,7 +578,7 @@ def __init__( realitySettings: TlsSettingsBean = None, grpcSettings: GrpcSettingsBean = None, dsSettings: any = None, - sockopt: any = None, + sockopt: SocktOptSettingsBean = None, ) -> None: self.network = network self.security = security @@ -559,6 +593,13 @@ def __init__( self.dsSettings = dsSettings self.sockopt = sockopt + def populateFragmentSettings( + self, fragment_tag: str = "fragment" + ) -> None: + self.sockopt = self.SocktOptSettingsBean( + dialerProxy=fragment_tag, tcpNoDelay=None + ) + def populateTransportSettings( self, transport: str, @@ -578,11 +619,15 @@ def populateTransportSettings( if headerType == HTTP: tcpSetting.header.type = HTTP if host != "" or path != "": - requestObj = self.TcpSettingsBean.HeaderBean.RequestBean() + requestObj = ( + self.TcpSettingsBean.HeaderBean.RequestBean() + ) requestObj.headers.Host = ( - "" if host == None else host.split(",") + "" if host is None else host.split(",") + ) + requestObj.path = ( + "" if path is None else path.split(",") ) - requestObj.path = "" if path == None else path.split(",") tcpSetting.header.request = requestObj sni = ( requestObj.headers.Host[0] @@ -596,8 +641,10 @@ def populateTransportSettings( elif self.network == "kcp": kcpsetting = self.KcpSettingsBean() - kcpsetting.header.type = headerType if headerType != None else "none" - if seed == None or seed == "": + kcpsetting.header.type = ( + headerType if headerType is not None else "none" + ) + if seed is not None or seed == "": kcpsetting.seed = None else: kcpsetting.seed = seed @@ -605,31 +652,37 @@ def populateTransportSettings( elif self.network == "ws": wssetting = self.WsSettingsBean() - wssetting.headers.Host = host if host != None else "" - sni = wssetting.headers.Host - wssetting.path = path if path != None else "/" + wssetting.host = host if host is not None else "" + sni = wssetting.host + wssetting.path = path if path is not None else "/" self.wsSettings = wssetting elif self.network == "h2" or self.network == "http": network = "h2" h2Setting = self.HttpSettingsBean() - h2Setting.host = "" if host == None else host.split(",") + h2Setting.host = "" if host is None else host.split(",") sni = h2Setting.host[0] if len(h2Setting.host) > 0 else sni - h2Setting.path = path if path != None else "/" + h2Setting.path = path if path is not None else "/" self.httpSettings = h2Setting elif self.network == "quic": quicsetting = self.QuicSettingBean() - quicsetting.security = quicSecurity if quicSecurity != None else "none" - quicsetting.key = key if key != None else "" - quicsetting.header.type = headerType if headerType != None else "none" + quicsetting.security = ( + quicSecurity if quicSecurity is not None else "none" + ) + quicsetting.key = key if key is not None else "" + quicsetting.header.type = ( + headerType if headerType is not None else "none" + ) self.quicSettings = quicsetting elif self.network == "grpc": grpcSetting = self.GrpcSettingsBean() grpcSetting.multiMode = mode == "multi" - grpcSetting.serviceName = serviceName if serviceName != None else "" - sni = host if host != None else "" + grpcSetting.serviceName = ( + serviceName if serviceName is not None else "" + ) + sni = host if host is not None else "" self.grpcSettings = grpcSetting return sni @@ -643,17 +696,19 @@ def populateTlsSettings( alpns: str, publicKey: str, shortId: str, - spiderX: str + spiderX: str, ): self.security = streamSecurity tlsSetting = self.TlsSettingsBean( - allowInsecure = allowInsecure, - serverName = sni, - fingerprint = fingerprint, - alpn = None if alpns == None or alpns == "" else alpns.split(","), - publicKey = publicKey, - shortId = shortId, - spiderX = spiderX + allowInsecure=allowInsecure, + serverName=sni, + fingerprint=fingerprint, + alpn=( + None if alpns is None or alpns == "" else alpns.split(",") + ), + publicKey=publicKey, + shortId=shortId, + spiderX=spiderX, ) if self.security == TLS: @@ -815,7 +870,9 @@ class FakednsBean: ipPool: str = "198.18.0.0/15" poolSize: int = 10000 - def __init__(self, ipPool: str = "198.18.0.0/15", poolSize: int = 10000) -> None: + def __init__( + self, ipPool: str = "198.18.0.0/15", poolSize: int = 10000 + ) -> None: self.ipPool = ipPool self.poolSize = poolSize @@ -977,80 +1034,80 @@ def remove_nulls(d): def get_log(): - log = LogBean(access = "", error = "", loglevel = "error", dnsLog = False) + log = LogBean(access="", error="", loglevel="error", dnsLog=False) return log def get_inbound(): inbound = InboundBean( - tag = "in_proxy", - port = 1080, - protocol = EConfigType.SOCKS.protocolName, - listen = "127.0.0.1", - settings = InboundBean.InSettingsBean( - auth = "noauth", - udp = True, - userLevel = 8, + tag="socks", + port=1080, + protocol=EConfigType.SOCKS.protocolName, + listen="0.0.0.0", + settings=InboundBean.InSettingsBean( + auth="noauth", + udp=True, ), - sniffing = InboundBean.SniffingBean( - enabled = False, - destOverride = None, - metadataOnly = None, + sniffing=InboundBean.SniffingBean( + enabled=False, + destOverride=["http", "tls", "quic", "fakedns"], + metadataOnly=None, + routeOnly=True, ), - streamSettings = None, - allocate = None, + streamSettings=None, + allocate=None, ) return inbound def get_outbound_vmess(): outbound = OutboundBean( - protocol = EConfigType.VMESS.protocolName, - settings = OutboundBean.OutSettingsBean( - vnext = [ + protocol=EConfigType.VMESS.protocolName, + settings=OutboundBean.OutSettingsBean( + vnext=[ OutboundBean.OutSettingsBean.VnextBean( - users = [OutboundBean.OutSettingsBean.VnextBean.UsersBean()], + users=[OutboundBean.OutSettingsBean.VnextBean.UsersBean()], ), ] ), - streamSettings = OutboundBean.StreamSettingsBean(), + streamSettings=OutboundBean.StreamSettingsBean(), ) return outbound def get_outbound_vless(): outbound = OutboundBean( - protocol = EConfigType.VLESS.protocolName, - settings = OutboundBean.OutSettingsBean( - vnext = [ + protocol=EConfigType.VLESS.protocolName, + settings=OutboundBean.OutSettingsBean( + vnext=[ OutboundBean.OutSettingsBean.VnextBean( - users = [OutboundBean.OutSettingsBean.VnextBean.UsersBean()], + users=[OutboundBean.OutSettingsBean.VnextBean.UsersBean()], ), ] ), - streamSettings = OutboundBean.StreamSettingsBean(), + streamSettings=OutboundBean.StreamSettingsBean(), ) return outbound def get_outbound_trojan(): outbound = OutboundBean( - protocol = EConfigType.TROJAN.protocolName, - settings = OutboundBean.OutSettingsBean( - servers = [OutboundBean.OutSettingsBean.ServersBean()] + protocol=EConfigType.TROJAN.protocolName, + settings=OutboundBean.OutSettingsBean( + servers=[OutboundBean.OutSettingsBean.ServersBean()] ), - streamSettings = OutboundBean.StreamSettingsBean(), + streamSettings=OutboundBean.StreamSettingsBean(), ) return outbound def get_outbound_ss(): outbound = OutboundBean( - protocol = "shadowsocks", - settings = OutboundBean.OutSettingsBean( - servers = [OutboundBean.OutSettingsBean.ServersBean()] + protocol="shadowsocks", + settings=OutboundBean.OutSettingsBean( + servers=[OutboundBean.OutSettingsBean.ServersBean()] ), - streamSettings = OutboundBean.StreamSettingsBean(), + streamSettings=OutboundBean.StreamSettingsBean(), ) return outbound @@ -1067,7 +1124,9 @@ def try_resolve_resolve_sip002(str: str, config: OutboundBean): method = arr_user_info[0] password = unquote(arr_user_info[1]) else: - base64_decode = base64.b64decode(uri.username).decode(encoding = "utf-8", errors = "ignore") + base64_decode = base64.b64decode(uri.username).decode( + encoding="utf-8", errors="ignore" + ) arr_user_info = list(map(str.strip, base64_decode.split(":"))) if len(arr_user_info) < 2: return False @@ -1087,41 +1146,67 @@ def try_resolve_resolve_sip002(str: str, config: OutboundBean): def get_outbound1(): outbound1 = OutboundBean( - tag = "direct", - protocol = EConfigType.FREEDOM.protocolName, - settings = OutboundBean.OutSettingsBean( - domainStrategy = DomainStrategy.UseIp, + tag="direct", + protocol=EConfigType.FREEDOM.protocolName, + settings=OutboundBean.OutSettingsBean( + domainStrategy=DomainStrategy.UseIp, ), - mux = None, + mux=None, ) return outbound1 def get_outbound2(): outbound2 = OutboundBean( - tag = "blackhole", - protocol = EConfigType.BLACKHOLE.protocolName, - settings = OutboundBean.OutSettingsBean(), - mux = None, + tag="blackhole", + protocol=EConfigType.BLACKHOLE.protocolName, + settings=OutboundBean.OutSettingsBean(), + mux=None, ) return outbound2 +def get_fragment_outbound( + *, + interval: str, + length: str, + packets: str, + tag: str = "fragment", +) -> OutboundBean: + return OutboundBean( + tag=tag, + protocol=EConfigType.FREEDOM.protocolName, + mux=None, + streamSettings=OutboundBean.StreamSettingsBean( + security=None, + network=None, + sockopt=OutboundBean.StreamSettingsBean.SocktOptSettingsBean( + tcpNoDelay=True + ), + ), + settings=OutboundBean.OutSettingsBean( + fragment=OutboundBean.OutSettingsBean.FragmentBean( + interval=interval, packets=packets, length=length + ) + ), + ) + + def get_dns(dns_list=["8.8.8.8"]): if isinstance(dns_list, str): if "," in dns_list: dns_list = dns_list.split(",") - dns = DnsBean(servers = dns_list) + dns = DnsBean(servers=dns_list) return dns def get_routing(): - routing = RoutingBean(domainStrategy = DomainStrategy.UseIp) + routing = RoutingBean(domainStrategy=DomainStrategy.UseIp) return routing -def generateConfig(config: str, dns_list = ["8.8.8.8"]): +def generateConfig(config: str, dns_list=["8.8.8.8"]): allowInsecure = True @@ -1135,10 +1220,14 @@ def generateConfig(config: str, dns_list = ["8.8.8.8"]): if _len % 4 > 0: raw_config += "=" * (4 - _len % 4) - b64decode = base64.b64decode(raw_config).decode(encoding = "utf-8", errors = "ignore") - _json = json.loads(b64decode, strict = False) + b64decode = base64.b64decode(raw_config).decode( + encoding="utf-8", errors="ignore" + ) + _json = json.loads(b64decode, strict=False) - vmessQRCode_attributes = list(VmessQRCode.__dict__["__annotations__"].keys()) + vmessQRCode_attributes = list( + VmessQRCode.__dict__["__annotations__"].keys() + ) for key in list(_json.keys()): if key not in vmessQRCode_attributes: del _json[key] @@ -1149,56 +1238,65 @@ def generateConfig(config: str, dns_list = ["8.8.8.8"]): vnext = outbound.settings.vnext[0] vnext.address = vmessQRCode.add - vnext.port = int(vmessQRCode.port) if vmessQRCode.port.isdigit() else DEFAULT_PORT + vnext.port = ( + int(vmessQRCode.port) + if vmessQRCode.port.isdigit() + else DEFAULT_PORT + ) user = vnext.users[0] user.id = vmessQRCode.id - user.security = vmessQRCode.scy if vmessQRCode.scy != "" else DEFAULT_SECURITY - user.alterId = int(vmessQRCode.aid) if vmessQRCode.aid.isdigit() else None + user.security = ( + vmessQRCode.scy if vmessQRCode.scy != "" else DEFAULT_SECURITY + ) + user.alterId = ( + int(vmessQRCode.aid) if vmessQRCode.aid.isdigit() else None + ) streamSetting = outbound.streamSettings sni = streamSetting.populateTransportSettings( - transport = vmessQRCode.net, - headerType = vmessQRCode.type, - host = vmessQRCode.host, - path = vmessQRCode.path, - seed = vmessQRCode.path, - quicSecurity = vmessQRCode.host, - key = vmessQRCode.path, - mode = vmessQRCode.type, - serviceName = vmessQRCode.path, + transport=vmessQRCode.net, + headerType=vmessQRCode.type, + host=vmessQRCode.host, + path=vmessQRCode.path, + seed=vmessQRCode.path, + quicSecurity=vmessQRCode.host, + key=vmessQRCode.path, + mode=vmessQRCode.type, + serviceName=vmessQRCode.path, ) - fingerprint = vmessQRCode.fp if vmessQRCode.fp else streamSetting.tlsSettings.fingerprint if streamSetting.tlsSettings else None + fingerprint = ( + vmessQRCode.fp + if vmessQRCode.fp + else ( + streamSetting.tlsSettings.fingerprint + if streamSetting.tlsSettings + else None + ) + ) streamSetting.populateTlsSettings( - streamSecurity = vmessQRCode.tls, - allowInsecure = allowInsecure, - sni = sni if vmessQRCode.sni == "" else vmessQRCode.sni, - fingerprint = fingerprint, - alpns = vmessQRCode.alpn, - publicKey = None, - shortId = None, - spiderX = None + streamSecurity=vmessQRCode.tls, + allowInsecure=allowInsecure, + sni=sni if vmessQRCode.sni == "" else vmessQRCode.sni, + fingerprint=fingerprint, + alpns=vmessQRCode.alpn, + publicKey=None, + shortId=None, + spiderX=None, ) v2rayConfig = V2rayConfig( - _comment = Comment(remark = vmessQRCode.ps), - log = get_log(), - inbounds = [get_inbound()], - outbounds = [outbound, get_outbound1(), get_outbound2()], - dns = get_dns(dns_list=dns_list), - routing = get_routing(), + _comment=Comment(remark=vmessQRCode.ps), + log=get_log(), + inbounds=[get_inbound()], + outbounds=[outbound, get_outbound1(), get_outbound2()], + dns=get_dns(dns_list=dns_list), + routing=get_routing(), ) - v2rayConfig_str_json = json.dumps(v2rayConfig, default=vars) - - res = json.loads(v2rayConfig_str_json) - res = remove_nulls(res) - - return json.dumps(res) - elif protocol == EConfigType.VLESS.protocolName: parsed_url = urlparse(config) @@ -1214,10 +1312,21 @@ def generateConfig(config: str, dns_list = ["8.8.8.8"]): for k, v in parse_qs(parsed_url.query).items() ) + fragment_config = netquery.get("fragment", None) outbound = get_outbound_vless() - streamSetting = outbound.streamSettings - fingerprint = netquery.get('fp') if 'fp' in netquery else streamSetting.tlsSettings.fingerprint if streamSetting.tlsSettings else None + streamSetting: OutboundBean.StreamSettingsBean = ( + outbound.streamSettings + ) + fingerprint = ( + netquery.get("fp") + if "fp" in netquery + else ( + streamSetting.tlsSettings.fingerprint + if streamSetting.tlsSettings + else None + ) + ) vnext = outbound.settings.vnext[0] vnext.address = hostname @@ -1226,46 +1335,52 @@ def generateConfig(config: str, dns_list = ["8.8.8.8"]): user = vnext.users[0] user.id = uid user.encryption = netquery.get("encryption", "none") - user.flow = netquery.get("flow", "") + user.flow = netquery.get("flow", None) sni = streamSetting.populateTransportSettings( - transport = netquery.get("type", "tcp"), - headerType = netquery.get("headerType", None), - host = netquery.get("host", None), - path = netquery.get("path", None), - seed = netquery.get("seed", None), - quicSecurity = netquery.get("quicSecurity", None), - key = netquery.get("key", None), - mode = netquery.get("mode", None), - serviceName = netquery.get("serviceName", None), + transport=netquery.get("type", "tcp"), + headerType=netquery.get("headerType", None), + host=netquery.get("host", None), + path=netquery.get("path", None), + seed=netquery.get("seed", None), + quicSecurity=netquery.get("quicSecurity", None), + key=netquery.get("key", None), + mode=netquery.get("mode", None), + serviceName=netquery.get("serviceName", None), ) streamSetting.populateTlsSettings( - streamSecurity = netquery.get("security", ""), - allowInsecure = allowInsecure, - sni = sni if netquery.get("sni", None) == None else netquery.get("sni", None), - fingerprint = fingerprint, - alpns = netquery.get("alpn", None), - publicKey = netquery.get("pbk", ""), - shortId = netquery.get("sid", ""), - spiderX = netquery.get("spx", ""), + streamSecurity=netquery.get("security", None), + allowInsecure=allowInsecure, + sni=( + sni + if netquery.get("sni", None) == None + else netquery.get("sni", None) + ), + fingerprint=fingerprint, + alpns=netquery.get("alpn", None), + publicKey=netquery.get("pbk", None), + shortId=netquery.get("sid", None), + spiderX=netquery.get("spx", None), ) + outbounds = [outbound, get_outbound1(), get_outbound2()] + if fragment_config is not None: + streamSetting.populateFragmentSettings() + length, inteval, packets = fragment_config.split(",") + outbounds.append( + get_fragment_outbound( + interval=inteval, length=length, packets=packets + ) + ) v2rayConfig = V2rayConfig( - _comment = Comment(remark = name), - log = get_log(), - inbounds = [get_inbound()], - outbounds = [outbound, get_outbound1(), get_outbound2()], - dns = get_dns(dns_list = dns_list), - routing = get_routing(), + _comment=Comment(remark=name), + log=get_log(), + inbounds=[get_inbound()], + outbounds=outbounds, + dns=get_dns(dns_list=dns_list), + routing=get_routing(), ) - v2rayConfig_str_json = json.dumps(v2rayConfig, default = vars) - - res = json.loads(v2rayConfig_str_json) - res = remove_nulls(res) - - return json.dumps(res) - elif protocol == EConfigType.TROJAN.protocolName: parsed_url = urlparse(config) @@ -1275,7 +1390,7 @@ def generateConfig(config: str, dns_list = ["8.8.8.8"]): uid = _netloc[0] hostname = _netloc[1].rsplit(":", 1)[0] port = int(_netloc[1].rsplit(":", 1)[1]) - + netquery = dict( (k, v if len(v) > 1 else v[0]) for k, v in parse_qs(parsed_url.query).items() @@ -1294,40 +1409,44 @@ def generateConfig(config: str, dns_list = ["8.8.8.8"]): if len(netquery) > 0: sni = streamSetting.populateTransportSettings( - transport = netquery.get("type", "tcp"), - headerType = netquery.get("headerType", None), - host = netquery.get("host", None), - path = netquery.get("path", None), - seed = netquery.get("seed", None), - quicSecurity = netquery.get("quicSecurity", None), - key = netquery.get("key", None), - mode = netquery.get("mode", None), - serviceName = netquery.get("serviceName", None), + transport=netquery.get("type", "tcp"), + headerType=netquery.get("headerType", None), + host=netquery.get("host", None), + path=netquery.get("path", None), + seed=netquery.get("seed", None), + quicSecurity=netquery.get("quicSecurity", None), + key=netquery.get("key", None), + mode=netquery.get("mode", None), + serviceName=netquery.get("serviceName", None), ) streamSetting.populateTlsSettings( - streamSecurity = netquery.get("security", TLS), - allowInsecure = allowInsecure, - sni = sni if netquery.get("sni", None) == None else netquery.get("sni", None), - fingerprint = fingerprint, - alpns = netquery.get("alpn", None), - publicKey = None, - shortId = None, - spiderX = None, + streamSecurity=netquery.get("security", TLS), + allowInsecure=allowInsecure, + sni=( + sni + if netquery.get("sni", None) == None + else netquery.get("sni", None) + ), + fingerprint=fingerprint, + alpns=netquery.get("alpn", None), + publicKey=None, + shortId=None, + spiderX=None, ) flow = netquery.get("flow", "") else: streamSetting.populateTlsSettings( - streamSecurity = TLS, - allowInsecure = allowInsecure, - sni = "", - fingerprint = fingerprint, - alpns = None, - publicKey = None, - shortId = None, - spiderX = None, + streamSecurity=TLS, + allowInsecure=allowInsecure, + sni="", + fingerprint=fingerprint, + alpns=None, + publicKey=None, + shortId=None, + spiderX=None, ) server = outbound.settings.servers[0] @@ -1337,39 +1456,43 @@ def generateConfig(config: str, dns_list = ["8.8.8.8"]): server.flow = flow v2rayConfig = V2rayConfig( - _comment = Comment(remark = name), - log = get_log(), - inbounds = [get_inbound()], - outbounds = [outbound, get_outbound1(), get_outbound2()], - dns = get_dns(dns_list = dns_list), - routing = get_routing(), + _comment=Comment(remark=name), + log=get_log(), + inbounds=[get_inbound()], + outbounds=[outbound, get_outbound1(), get_outbound2()], + dns=get_dns(dns_list=dns_list), + routing=get_routing(), ) - v2rayConfig_str_json = json.dumps(v2rayConfig, default = vars) - - res = json.loads(v2rayConfig_str_json) - res = remove_nulls(res) - - return json.dumps(res) - elif protocol == EConfigType.SHADOWSOCKS.protocolName: outbound = get_outbound_ss() if not try_resolve_resolve_sip002(raw_config, outbound): - result = raw_config.replace(EConfigType.SHADOWSOCKS.protocolScheme, "") + result = raw_config.replace( + EConfigType.SHADOWSOCKS.protocolScheme, "" + ) index_split = result.find("#") if index_split > 0: try: - outbound.remarks = unquote(result[index_split + 1:]) + outbound.remarks = unquote(result[index_split + 1 :]) except Exception as e: - None # print(e) + None # print(e) result = result[:index_split] # part decode index_s = result.find("@") - result = base64.b64decode(result[:index_s]).decode(encoding = "utf-8", errors = "ignore") + result[index_s:] if index_s > 0 else base64.b64decode(result).decode(encoding = "utf-8", errors = "ignore") + result = ( + base64.b64decode(result[:index_s]).decode( + encoding="utf-8", errors="ignore" + ) + + result[index_s:] + if index_s > 0 + else base64.b64decode(result).decode( + encoding="utf-8", errors="ignore" + ) + ) - legacy_pattern = re.compile(r'^(.+?):(.*)@(.+):(\d+)\/?.*$') + legacy_pattern = re.compile(r"^(.+?):(.*)@(.+):(\d+)\/?.*$") match = legacy_pattern.match(result) if not match: @@ -1382,37 +1505,34 @@ def generateConfig(config: str, dns_list = ["8.8.8.8"]): server.method = match.group(1).lower() v2rayConfig = V2rayConfig( - _comment = Comment(remark = outbound.remarks), - log = get_log(), - inbounds = [get_inbound()], - outbounds = [outbound, get_outbound1(), get_outbound2()], - dns = get_dns(dns_list = dns_list), - routing = get_routing(), + _comment=Comment(remark=outbound.remarks), + log=get_log(), + inbounds=[get_inbound()], + outbounds=[outbound, get_outbound1(), get_outbound2()], + dns=get_dns(dns_list=dns_list), + routing=get_routing(), ) - v2rayConfig_str_json = json.dumps(v2rayConfig, default = vars) + v2rayConfig_str_json = json.dumps(v2rayConfig, default=vars) - res = json.loads(v2rayConfig_str_json) - res = remove_nulls(res) + res = json.loads(v2rayConfig_str_json) + res = remove_nulls(res) - return json.dumps(res) + return res if __name__ == "__main__": parser = argparse.ArgumentParser( - description = "v2ray2json convert vmess, vless, trojan, ... link to client json config." + description="v2ray2json convert vmess, vless, trojan, ... link to client json config." ) parser.add_argument( "config", - nargs = "?", - help = "A vmess://, vless://, trojan://, ... link.", + nargs="?", + help="A vmess://, vless://, trojan://, ... link.", ) - - option = parser.parse_args() config = option.config print(generateConfig(config)) -