diff --git a/ai/worker/runner.gen.go b/ai/worker/runner.gen.go index 68197ee283..a47eb474d0 100644 --- a/ai/worker/runner.gen.go +++ b/ai/worker/runner.gen.go @@ -317,6 +317,9 @@ type LiveVideoToVideoParams struct { // ControlUrl URL for subscribing via Trickle protocol for updates in the live video-to-video generation params. ControlUrl *string `json:"control_url,omitempty"` + // DataUrl URL for publishing data via Trickle protocol for pipeline status and logs. + DataUrl *string `json:"data_url,omitempty"` + // EventsUrl URL for publishing events via Trickle protocol for pipeline status and logs. EventsUrl *string `json:"events_url,omitempty"` @@ -347,6 +350,9 @@ type LiveVideoToVideoResponse struct { // ControlUrl URL for updating the live video-to-video generation ControlUrl *string `json:"control_url,omitempty"` + // DataUrl URL for publishing data for pipeline + DataUrl *string `json:"data_url,omitempty"` + // EventsUrl URL for subscribing to events for pipeline status and logs EventsUrl *string `json:"events_url,omitempty"` @@ -3160,93 +3166,93 @@ func HandlerWithOptions(si ServerInterface, options ChiServerOptions) http.Handl // Base64 encoded, gzipped, json marshaled Swagger object var swaggerSpec = []string{ - "H4sIAAAAAAAC/+xdeW/ctrb/KoTeA5IAM97a3D4YuH84SxPj2onhpQtaYy5HOjPDWCJVkrI9zfN3f+Am", - "kRI1I7u229c7f2UscTnr7xySh8rXJGVFyShQKZL9r4lIF1Bg/fPg5PA954yr3xmIlJNSEkaTffUGgXqF", - "OIiSUQGoYBnkW8koKTkrgUsCeoxCzLvdzxdguxcgBJ6D6ieJzCHZT47FXP21LNUfQnJC58nd3Sjh8FtF", - "OGTJ/i961MumS01o3Y9Nv0Aqk7tRclBlhJ1aKruknAb0oxnjCKseaA4UOFatukzpFvpHnn+eJfu/fE3+", - "m8Ms2U/+a7uR5rYV5fYxZARfnB4ld5ejiCTsTJCZmbc63JrpfH4DniJMv2HZcjIHqhues3O4lYrcHi5C", - "ki7KnOHMUYNmJAckGZoCkhxT1XIKmZLJjPECy2Q/mRKK+TJp0ddV4igpQOIMS2xmneEqV/2/3iVtuRxk", - "GVE/cY6+sCki1ExGGLW0lFgIyNQfcgGoJCXkhIZ25OaK0aGUPSFZSEeHio/VfE7oHH2PU2cgh+9QpSZW", - "huLkUTorqac2TbPY1BxkxelEkgKExEUpQhokr6BDx6nug5o+ZvpFoBIk4VZuobOqLBlX1nSN8wrEPnoh", - "gEqgKbwYoRc3jGcvRkiZOTJEoSljOWCKXr5Qk79Q717McC7gxast9M5QhohA9vXLZrxXW64lKgBTgSjz", - "iNyys9l36vd4irXWmjae1CyX541k1sFAxzFidr/CPQ4LPIdzpv/p+se8IhmmKUxEinMI1PTd1uu2jt7T", - "lFUcz0FYS5E1hgAihX6R5kxAvkQ5oVeN8Sq9oZKzopTo5YLMF8Ct7lCBl4hDVqV2CPRbhXMil698uX2w", - "dKIzTWfNL62KKXDFL3EM9ni6GVsyRTmZLdENkYuOX/W7u5FfxNb1uJMVctztyvEdzDloYm4WJDVkNAhp", - "KCUClZVYaBHeYJ4J3YpQIgnOTZutNn1ovZhyxrFYAwkH6IidHqCXR+xmfIrpFTrIcCk1Mr2yisc0Q0QK", - "lDJuomOmvOwGyHwhteMaJrwAg97f4qLMYR99Rb8mOZZA5ThlVBChHG25nafFWFE3Ftlt/muyj3a3dkbo", - "14QCJ1/EdkluIR9jLsfu7d6dL4AjzdiT4WCHn4FQSGGOJbmGiTH+NUScN27yUrzS7lWRDNDNAkv1F9ym", - "eZUBmnFWRER8OKeMKwuaodAg0a/Vzs43Kdr1yf5kSUMnhrQY9VUxMX49KYHHeNhts/BJmxpiMwcIPkaU", - "wC17ASFVgQ5N4xPgHXIIlTA31qvpoTPgoFmT0Aotuzs7/fRkQBkRSse64xY6ZhzMb1SJCucKtQBrzLIQ", - "ZaHIsTKtJBI5uwGOairUMFmVa8+dLlW8ATqXiw5/rj0601THuPPFO8QqVtlkv04FnoFcTtIFpFeB8FTo", - "a0vvBLjCRBVIdTeku2lTFJIUGvdnbexSsFDlmUph2GwGVCgjYxwtMC9mVe6TeWZGfauJqYm10VpTC5B1", - "JXIG1i05phkrkMG3HlGoxlF5O10FUtjZ+p8euGYzk4o0aRouy5w0QY6D07HRzMsd9WY3CGRnbs4ONrfi", - "fukUaAJbJAEIIvv6DCCeIA8OmzXrjxY5HzFBrVUyFJb/EBr3T9nndS3drlPpwJzuB5IB66p01gLFf8QW", - "ZDOOCxAakAWkjGbavIM85FoN73P3fQ9uLXTYD+Z8/V10VtMSEYp0OBcDJv1oBo/NO9h26/iDzfg6fv6p", - "VmvIuH86UTDVejKt0iuQbSp2975rk3HhJlQq1qtNRZQSOS5YRaVSgBmzXm75CYXWmQmF6pWFWfWzULHT", - "9rwhea7AnlD9qqPCY9PsjSY6YMwP7YwImOBqPumB5Z29Tp5as6A7I5xlDRgHDJt0GX0MFh520cFBQDHN", - "ddrc29ckvDTlgIXjOwjxmoCDao76AX59+rL3+v9x9rLJK5wkbkjWst7dnb1vY3ioW94LDn/UY3dnvWeE", - "MaFjRYg5g3kBVB7QpVwQOt/rhpkpu41smqJcGxD6FmHO8RLNyTVQhAXCaMpu3RaA9TONiyPF/08///Qz", - "Mmjsc/uG3fauubuTHzq8F4b4hyI8FlcTQstKRvljN2MOguWVBjXVGOnGLabksiSp9kq9WMOo5HBNWCXU", - "j4ykujeR1q5GTVal/WL39uPtj+jlx3/++M+91//QJnl2cBxkksdq5kNN5l9u1VtUufJicTVhlawFuQIP", - "DlVuXcGokaCJKtzuCi5UAq4GNNuCuJiSeaWEaURvzEqMEJtJoOrPrEr1vh9ICdz2lAtMFeIQOs/BU0PA", - "laMcfTaUx8CDKqPKye8wSRnjmbgfeyUjVCLdk1AsQdQBtB63WVJgOgf0y85o99KaiO5t50VwW0IqTfMp", - "mAYchHqoHhn1ZaRQWMmoCCOWnQu9NTzEGPUn6zrDp9s96+VsZrmyimj5ws0COCDAqSUfEaU49PKn0c+v", - "GvQLEmndrE2Zl79rwnI8hTxC2JF+Xmc0AWmOml1EaEZSLX+smsKcs4pmtrWK9ztBkylOr/wmXXLNtCs2", - "xHM2J/Ie1mK6CVTRsfIAsWC5ynC0eZqxEKFCqqjPZopEjXH6fWTT+cjM3tXz0NjRiQkr4sdFWe+EPnDB", - "+cj7tI8DiJVhK3v4fuCaFPC71/9BG1iDpLnZyVqXcd5758g5Z8R/3y4qehXLe1L1QieoSpnaK3FzyNU9", - "P5Z2u6mb9OoBbKarR/VZDLc+Gl3XM/WM6V53BiYSCkXQnTdHPVY9kQ5jHUlKv6EizJOlEVREgh9OLt6y", - "oqwkHNJZ5Oz5uD6Ez0Biosz/w8kFSk0f/xi4K1QDXzXWxXMv/MWUMzSJ4he/bMDz1wIKxpeTGQcIOujH", - "6Hv1eEU3ySTOI/3O9fNoR0JbpOkH0e0AXAQ0fVJ/r91XUwKhpmVAZMiqk5EjyNNqS3lx9V5IkpPftYrW", - "qVhptmqaIyGxJEKSVDxQuc+ssWFqGCUejxNryX43T2LIyjc6nT+MoblvFEP5gIXwUJOIcRAlKLSWti1E", - "LObj+flJT42RejWwyMiAxfCCnLpeqFuQ887hjpk5QJy2/Oy0HtMNOz28/oBzkunhaq77WHHgvJKT9nge", - "khtOYjDuU9seIEY35tkN5trrrSwG1VEp/16J2POyUvmeqaSqC35OgjarmG8BksfZh7JCfSbnJ7uDjkNs", - "bu23P3HP1qFu2TSs5x01jPumE5HyCmWcSSzFIDVwwPlYRWutkFUgq8gSbtwHKqTt8y2lGLL/ulox7EfU", - "0kv4R8C5XLx1eXYoUTVcJeJp2UJ3RKaJS808yoBWhSL26PPBu8NPH5JR8vlfySh5f3r6+TQZJYfvjt77", - "hJ6ZqdaxbinyOfQYiPCnF433qp+MLVgi69wesbRXGX6eur7K0rc3UzaxLo+1tFy2+q2qrvROIu8lGJ3c", - "r5JL/7qgkYqu8lu7KGgn660EPcZBhNGjo+O3C0ZSiMWp3JRxDou4R0fHx6bUN7m7vBslM0KJWEw4YOGi", - "ibftUB+q6lbo1LSKbXbTDG6DzFA/iKd5ZvoHkty2Gj2PJ9FGVHE5HjfTh4JMGZVApc/EW/sotlnG8gDy", - "Tlm+Hu64aeRmCql2hMXJPoXfKhCR2ogC304kuwLaPqX7h78Lf4vOTZsVGhGDkx1fI36drx2m7egW3Hut", - "S4eX6N6J5ICLoJ+ucg0rVXAR3cqQUJTKVSsOrUPa73ynbRpFzkIlKyfhrs141+vMSvSvqERVv7JdguZ3", - "O1lbWlMrJbQSZwd9VtKgYMu6tU/cS8XWje787QQzSFfBKQcFioHz2EfRooghq0hnM2vNpHIOvYYh7QIX", - "xmxjK7DCTpDWlFfWI5302rpYidjedBGwKcoc9Iqtcd4GdtzLVV5rdtAi3c1e5KquenEZ6amX2P0do1t4", - "bphRhKfWTKH0PPHE5EeuQZ//2mPgE8yxsdouanOWTyqer9k0vzg90sFfVFNdqE/oHF0TjM45Sa/0URuT", - "LGW53ULP9ImKrRbJybUtGRlLNm7XxaBSE+cnA28NWeiCRy0WrpVt3oPosprmRCwUzaZvP+kuc3UJLaYZ", - "ytk8IO+9GaOHujmWcIOXE26wZv15hEqMDt+51PmD6Y5sd/RSUZUzc2RRVrxkAkRYOG97WHDrO6/FlMwG", - "0+NaK8L04bYijfF0AUJyLBkfQNdxPUb2B05rPuEC2ssKJBniFe3al3phfnxh0y30iUmSApK6tHpBBCIC", - "UVxAhtzkrnDKlaibXW8mF8ARZ5UEMdInKkSijIFAlElTA6pmwih6lGRqquAWp9I8eyleoQxKoJlAjIac", - "EOX1BVBpy01phgpdQDfVNUozMq84nuagLVb1/Ldxl38jzOeVK4EYtNSsMaCW9te7zrmkvXygG4ME7h2x", - "Ri4nWVyJIJB1ucZJw90pIQk1HCsftepllZwzc/al8hIlYjtMMKd51Od+pu8wKzf5j7Lx9dbs2kZl61AR", - "4vyesYqn4LNKaMqKkNV6DCSDcqCz+nmU4/bCOKAk1IMfQeIBYkAouddScTXyd1eO949HOtS4gsfV0z1h", - "gPGjomQuyKyKJ4PDyXDc/uzDcwDg2oOJcDFlOEY/rhcjyYb78X3DZ7O1sILd1SHysb34uZ14VUZ9jMWV", - "uJfvmr6u0K7HYf1yl7ZSOL4ZoYp6FU9NPZZAL03XV3V80QVc4XWvsJglLN9bu+/WGU+LIKr3lPG+fTwt", - "jxfCxOJMF0eY5ppuXe8UThlAtxl47TVwS5hwza1UL1u0r9Sv3jaMHOoX6oVTpsJXTEzNs3cXGU9ZJVtl", - "qbpfV+FUzG660/y4AOkqyM2EN1igWY7nc8gQFujT2fc/BuUmapjhJRRKE+qNqdLxy/3rGQeV7Ub9Wg2u", - "nNoUjTUspJiqLAynKQhh7orXx5QDnNi4rjCkaLH5+tTq6tPjxelRTJU62qhk3Fwp7aUy1Nhz89zmUjET", - "YfTxt511PYgYsvFsSkeG78mbQpC7VmVIbCfnabe+R47Hy7D3KmBQ7+0Nsr5dgL/PDfHHvKbUuX+94prS", - "5sr15sr13/fK9ev/6BvX6AxKrOWsrzyUZsdQl8DrzaIX//tCmYaoP1gyXTaF8Zsq1z/tXlUHvwfeq7IG", - "0wqxYQjtjbNnJUC66Au0ARc+ZB2gQuGJKAFfAUcZ5OQauFA6zhX450sEtyUHofWmwgSmWtWZ6gPpwlXr", - "KqPTtqoeZ7plSWSqPaezend/Kdm5qdUSVgLYdEv9ZcaP69Eb5Amvfw+hZFW0aJKy1SHCXDTS+zmrpurN", - "10J7CUwhYjBra+hylgYHjpgu7Xl/m8OvHZu+vPNjeNqqw2pOls1nzVpn0FEZ6gdNU00zOldP16Wuig8z", - "lW3pudaAur0fgAvrMK2z+yettRol183EXfixLx3snFaUAveMxFH9sBouN/XligE9Cd17X3b9Tqz5eMC6", - "pYy7aq/aBqupe5Y6tVdRrnDGELGm9MmS6stq9W6YjmFpxYlcnilSDJ8fz89P3gDmwOvvB+rAZx7Vgyyk", - "LJO7O121EysMP7DfDEnrz7zxiqKDw3on2N/6PSLXUCq0PThsTKi2u2Rna/ebrddKIqwEikuS7CffbO1u", - "7Sh1YbnQdG/rz4eNJRs7nCuZiCU89TfWvE/imWt8dkHKSmsOh5labbW/P2a3U9+wbNmq+DGJEeZyW2Um", - "Y/dpPKPndVYQ+9jZXahjlQbpB0ajmu29nZ0WFZ7Yt7/YeqxhJARraD13K7ep9H7IrMpR02yUfPuIJDQl", - "15H53+DMne6aeXefZ94Liiu5YJz8DpmeePeb55nY7dO/p1KtFM4ZQ0eYm4Kpb3dfPxf3TU6vocqEO0XC", - "3t6jktApf+8S0zRBdYn86+eyv0MqgVOcozPg18AdBR6O6rTER9BfLu8uR4moigLzpfuWJjpnyGVPeC4U", - "eLtYouD7dmyyUCyWY4oLGLNr4JxkGvoDdBgl2wtb0bztYHgOWgQhiPnl6MkTIkis7H0okNz5cnIDmbr/", - "kNO6qH0lq67E+8l5NRP9MS7dGIpNXcDdz555/ZR8eRXkD+PKkKi50atPFZTry9DxqHxQlvnS3YgOPjol", - "TIlJyZnKsrz1bCdMt74S9sRxOpjtmQN1WNO+idT9kXoToe4bocynZc4Zqr8vcM8QRULH8EFgQGau9/QM", - "DqxPzMOPyD2Pw/8ZiXnsgsfG6//i+fkGeh4MPQ9MjkngoT7wXNffj4wiz4fYVxPvlXS4r4w9DwaZ2Z4Z", - "hMLdpA38bJKOJ/D8+mt9D3N95xijZDsn1zAOi2DXLT+iCw+vqt6UN/pfQZYVp5AhoJn+UJaIQkS7PnEl", - "TDxcRz21zM+MEr3FmBvA2ADG4wGGMjMDFn8ENfK2ZxrkyIsBqYI+jq10yQdGOabzSkFYXe3QRYGj46dy", - "/OZC53M7u3d9cePfG/9+RP/W3nJvf84L48K2Wn+M7Qccx3v9Hm2/9Whrw/VtTUxXZPyRb0M+cdbfmfGZ", - "3Tysut84+sbRH8/Rnfc540Z7D/B70XWQUbKtIvSAo4cPraJtc8+3KYuMJ/VeMdwThfVuud3mlGHj9n8T", - "t9eFhn/gkEF67hc4uylZHLTVF3bx/4NM8/8aujvqbhNQNsWRmGZelWrwv0b2IIUpg3xSqAgqLZ8ZK8L/", - "w3SDFRuseHysqF3oYWBhu2u0qLxvtkdhwn43ul4JoOnS/ac4+taoFKj5rzGibt98efqJVwduok12sPH4", - "v4nHe19tv6erV7UzjJJtr3I9WkvV1JI/3aGZneJBhVRBZ6GlKbTsWv8HiKuefpuzKkNvWVFUlMil+4ZS", - "Yi9865ptsb+9nXHAxdh+oGkrt923UtVdX6PoGf9M6hSpb9h6IKHbbeOSbE9B4u1aeXeXd/8XAAD//5FW", - "1i+hfgAA", + "H4sIAAAAAAAC/+xdeW/ctrb/KoTeA+IAM97atO8ZuH84SxPj2onhpQtaw5cjnZlhLJEqSdme5vm7P3CT", + "SImakR3b7e2dvzKWuJz1dw7JQ+VLkrKiZBSoFMnel0Skcyiw/rl/fPCOc8bV7wxEykkpCaPJnnqDQL1C", + "HETJqABUsAzyzWSUlJyVwCUBPUYhZt3uZ3Ow3QsQAs9A9ZNE5pDsJUdipv5alOoPITmhs+TubpRw+L0i", + "HLJk71c96kXTpSa07scmnyGVyd0o2a8ywk4slV1STgL60ZRxhFUPNAMKHKtWXaZ0C/0jzz9Nk71fvyT/", + "zWGa7CX/tdVIc8uKcusIMoLPTw6Tu4tRRBJ2JsjMzJsdbs10Pr8BTxGmX7NscTkDqhuesTO4lYrcHi5C", + "ks7LnOHMUYOmJAckGZoAkhxT1XICmZLJlPECy2QvmRCK+SJp0ddV4igpQOIMS2xmneIqV/2/3CVtuexn", + "GVE/cY4+swki1ExGGLW0lFgIyNQfcg6oJCXkhIZ25OaK0aGUfUmykI4OFR+q2YzQGfoBp85ADt6iSk2s", + "DMXJo3RWUk9tmmaxqTnIitNLSQoQEhelCGmQvIIOHSe6D2r6mOnngUqQhFu5iU6rsmRcWdM1zisQe+iF", + "ACqBpvBihF7cMJ69GCFl5sgQhSaM5YAp2nihJn+h3r2Y4lzAi5eb6K2hDBGB7OuNZryXm64lKgBTgSjz", + "iNy0s9l36vd4grXWmjae1CyXZ41kVsFAxzFidr/EPQ4KPIMzpv/p+sesIhmmKVyKFOcQqOn7zVdtHb2j", + "Kas4noGwliJrDAFECv0izZmAfIFyQq8a41V6QyVnRSnRxpzM5sCt7lCBF4hDVqV2CPR7hXMiFy99ub23", + "dKJTTWfNL62KCXDFL3EM9ni6GVsyRTmZLtANkfOOX/W7u5FfxNb1uJdL5LjTleNbmHHQxNzMSWrIaBDS", + "UEoEKisx1yK8wTwTuhWhRBKcmzabbfrQajHljGOxAhL20SE72Ucbh+xmfILpFdrPcCk1Mr20isc0Q0QK", + "lDJuomOmvOwGyGwuteMaJrwAg97d4qLMYQ99Qb8lOZZA5ThlVBChHG2xlafFWFE3Ftlt/luyh3Y2t0fo", + "t4QCJ5/FVkluIR9jLsfu7e6dL4BDzdiT4WCHn4FQSGGGJbmGS2P8K4g4a9xkQ7zU7lWRDNDNHEv1F9ym", + "eZUBmnJWRER8MKOMKwuaotAg0W/V9vY3Kdrxyf5oSUPHhrQY9VVxafz6sgQe42GnzcJHbWqITR0g+BhR", + "ArfsBYRUBTowjY+Bd8ghVMLMWK+mh06Bg2ZNQiu07Gxv99OTAWVEKB3rjpvoiHEwv1ElKpwr1AKsMctC", + "lIUix8qkkkjk7AY4qqlQw2RVrj13slDxBuhMzjv8ufboVFMd484X7xCrWGaT/ToVeApycZnOIb0KhKdC", + "X1t6x8AVJqpAqrsh3U2bopCk0Lg/bWOXgoUqz1QKw6ZToEIZGeNojnkxrXKfzFMz6htNTE2sjdaaWoCs", + "K5FTsG7JMc1YgQy+9YhCNY7K2+kqkML25v/0wDWbmlSkSdNwWeakCXIcnI6NZja21ZudIJCdujk72NyK", + "+6VToAlskQQgiOyrM4B4gjw4bNasP1rkfMQEtVbJUFj+KjTun7LP61q6XaXSgTndjyQD1lXptAWK38UW", + "ZFOOCxAakAWkjGbavIM85FoN73P3Qw9uzXXYD+Z89X10VtMSEYp0OBcDJv1gBo/NO9h26/iDzfg6fv6p", + "VmvIuH86UTDV+nJSpVcg21Ts7H7fJuPcTahUrFebiiglclywikqlADNmvdzyEwqtMxMK1SsLs+pnoWKn", + "7XlD8lyBPaH6VUeFR6bZa010wJgf2hkRcImr2WUPLG/vdvLUmgXdGeEsa8A4YNiky+hDsPCwiw4OAopJ", + "rtPm3r4m4aUpBywc30GI1wTsVzPUD/Cr05fdV//G2cs6r3CSuCFZy3p3tne/jeGhbnkvOPxJj92d9Z4R", + "xoSOJSHmFGYFULlPF3JO6Gy3G2Ym7DayaYpybUDoW4Q5xws0I9dAERYIowm7dVsA1s80Lo4U/z//8vMv", + "yKCxz+1rdtu75u5OfuDwXhjiH4rwWFxdElpWMsofuxlzECyvNKipxkg3bjElFyVJtVfqxRpGJYdrwiqh", + "fmQk1b2JtHY1arIq7Rc7tx9uf0IbH/7x0z92X32nTfJ0/yjIJI/UzAeazL/cqreocuXF4uqSVbIW5BI8", + "OFC5dQWjRoImqnC7KzhXCbga0GwL4mJCZpUSphG9MSsxQmwqgao/syrV+34gJXDbU84xVYhD6CwHTw0B", + "V45y9MlQHgMPqowqJ3/AZcoYz8T92CsZoRLpnoRiCaIOoPW4zZIC0xmgX7dHOxfWRHRvOy+C2xJSaZpP", + "wDTgINRD9cioLyOFwkpGRRix7FzojeEhxqg/WdcZPt7uWi9nU8uVVUTLF27mwAEBTi35iCjFoY2fR7+8", + "bNAvSKR1szZlXv6uCcvxBPIIYYf6eZ3RBKQ5anYQoRlJtfyxagozziqa2dYq3m8HTSY4vfKbdMk10y7Z", + "EM/ZjMh7WIvpJlBFx8oDxJzlKsPR5mnGQoQKqaI+myoSNcbp95FN50Mze1fPQ2NHJyYsiR/nZb0T+sAF", + "5yPv0z4OIFaGrezh+4ErUsDvX/0HbWANkuZ6J2tVxnnvnSPnnBH/fTOv6FUs70nVC52gKmVqr8TNIVf3", + "/Fja7aZu0qsHsJmuHtVnMdz6aHRdz9QzpnvdGZhIKBRBd94c9Vj1RDqMdSQp/YaKME+WRlARCb4/Pn/D", + "irKScECnkbPno/oQPgOJiTL/98fnKDV9/GPgrlANfNVYF8+98GdTztAkip/9sgHPXwsoGF9cTjlA0EE/", + "Rj+ox0u6SSZxHul3pp9HOxLaIk0/iG4H4CKg6aP6e+W+mhIINS0DIkNWnYwcQZ5WW8qLq/dckpz8oVW0", + "SsVKs1XTHAmJJRGSpOKByn1mjQ1TwyjxeLy0lux38ySGrHyj0/nDGJr7RjGUD1gIDzWJGAdRgkJradtC", + "xGI+nJ0d99QYqVcDi4wMWAwvyKnrhboFOW8d7piZA8Rpy89O6zHdsNPD6484J5kerua6jxUHzks5aY/n", + "IbnhJAbjPrXtAWJ0Y57dYK693spiUB2V8u+liD0rK5XvmUqquuDnOGizjPkWIHmcvS8r1GdyfrI76DjE", + "5tZ++2P3bBXqlk3Det5Rw7hvOhEpL1HGqcRSDFIDB5yPVbTWClkGsoos4cZ9oELaPt9SiiH7r6sVw35E", + "Lb2EfwCcy/kbl2eHElXDVSKels11R2SauNTMowxoVShiDz/tvz34+D4ZJZ/+mYySdycnn06SUXLw9vCd", + "T+ipmWoV65Yin0OPgQh/etF4r/rJ2IIlss7tEUt7leHnqaurLH17M2UTq/JYS8tFq9+y6krvJPJegtHJ", + "/TK59K8LGqnoKr+Vi4J2st5K0GMcRBg9PDx6M2ckhVicyk0Z57CIe3h4dGRKfZO7i7tRMiWUiPklByxc", + "NPG2HepDVd0KnZhWsc1umsFtkBnqB/E0z0z/QJLbVqPn8STaiCoux6Nm+lCQKaMSqPSZeGMfxTbLWB5A", + "3gnLV8MdN43cTCHVjrA42SfwewUiUhtR4NtLya6Atk/pvvN34W/RmWmzRCNicLLja8Sv87XDtB3dgnuv", + "denwEt07kRxwEfTTVa5hpQouolsZEopSuWrFoXVI+73vtE2jyFmoZOVluGsz3vE6sxL9MypR1a9sl6D5", + "3Y5XltbUSgmtxNlBn5U0KNiybu0T91KxdaM7fzvBDNJVcMpBgWLgPPZRtChiyCrS2cxKM6mcQ69gSLvA", + "uTHb2AqssBOkNeWV9UgnvbYuliK2N10EbIoyB71ia5y3gR33cpnXmh20SHezF7msq15cRnrqJXZ/x+gW", + "nhtmFOGpNVMoPU88MfmRa9Dnv/YY+BhzbKy2i9qc5ZcVz1dsmp+fHOrgL6qJLtQndIauCUZnnKRX+qiN", + "SZay3G6hZ/pExVaL5OTaloyMJRu362JQqYnzk4E3hix0zqMWm2GJ70FyWU1yIuaKYtWzn2yXtbpkFtMM", + "5WwWkPZWjdBDF1wrn3kYZabv19H2zozRQ90MS7jBi0tuMHD1OYlK2A7eupT+vemObHe0oajKmTlKKSte", + "MgEiLOi3PSzo9p0jY0qmg+lxrRVh+tBdkcZ4OgchOZaMD6DrqB4j+4pTpI+4gPZyB0mGeEW7dq9emB+f", + "2WQTfWSSpICkLvmeE4GIQBQXkCE3uSvocqXzZjeeyTlwxFklQYz0SQ+RKGMgEGXS1KaqmTCKHnGZWi+4", + "xak0zzbES5RBCTQTiNGQE6LQqAAqbRkszVChC/smunZqSmYVx5MctMWqnv8ybvwvhPmscqUZg5bANTbV", + "0v5y1zkvtZcidGOQwL2j38ilKYt3EWS0Ltc4abhrJiShhmPlo1a9rJIzZs7kVL6kRGyHCeY0j/rcz/Qd", + "ZuUmL1M2vtqaXduobB1aQ5zfU1bxFHxWCU1ZEbJaj4FkUKZ0Wj+PctxesAeUhHrwI1s8cA0Icfdawi6P", + "SN0V7f3jpA6BrhBz+XTPFvj8OPJUIc3PDyRzYW1ZBBscwIZHik9+QAhChsYMIlwUGx4VHhc3kGTDkeO+", + "AbvZZFnC7vKg/Ni48dywsWxtcYTFlbgXWpi+ruSwByL8wp+2Uji+GaGKerVfTWWaQBum68s6oulStvDi", + "W1jWExYyrtyB7IynRRDVe8p4346mlscLYaJ/pstETHNNt678CqcMgoUZeOWFeEuYcM2tVC9atC/Vr95A", + "jZQ3FOqFU6ZCdExM9bd3KxtPWCVbBbq6X1fhVExvutP8NAfpaunNhDdYoGmOZzPIEBbo4+kPPwWFN2qY", + "4cUkShPqjalX8i8+1DMOKmCO+rUaXDm1KZ9rWEgxVXkfTlMQwtyarw9sBzixcV1hSNFi8/Wp1dWnx/OT", + "w5gqdbRR6b+5XNtLZaix5+a5zaViJsLo42/A68oYMWQL3hTRDD+dMCUxd60amdie1tMeAowcjxdh72XA", + "oN7bu3R9+yF/n7vyj3lhq3MTfcmFrfXl8/Xl87/v5fNX/9F3z9EplFjLWV/+KM0epb4MoLenXvzfC2Ua", + "ov50y2TRXBFY1/v+aTfMOvg98IaZNZhWiA1DaG+cPS0B0nlfoA248CFrHxUKT0QJ+Ao4yiAn18CF0nGu", + "wD9fILgtOQitNxUmMNWqzlQfSOeublkZnbZV9TjTLUsiU+05ndW7+0vJzk2tlrASwKZb6i8zflyP3iBP", + "eBF+CCXLokWTlC0PEebKld7PWTZVb74W2ktgChGDWVlNmLM0OHrFdGErH9ocfunY9MWdH8PTVkVac8Zu", + "PvDWOo2PylA/aJpqmtGZeroqdVV8mKlsS8+1BlQw/ghcWIdpVTE8adXZKLluJu7Cj33pYOekohS4ZySO", + "6odVs7mpL5YM6Eno3jvBq/d+zWcUVi1l3EcHVNtgNXXPoq/2KsqVEBkiVhSBWVJ9WS3fDdMxLK04kYtT", + "RYrh88PZ2fFrwBx4/SVFHfjMo3qQuZRlcnen65diJfL79uspaf3BO15RtH9Q7wT7W7+H5BpKhbb7B40J", + "1XaXbG/ufLP5v0oirASKS5LsJd9s7mxuK3VhOdd0b+kPqY0lGzucK5mIJTz11+a8jwOaC412QcpKaw4H", + "mVpttb/EZrdTX7Ns0ap9MokR5nJLZSZj95FAo+dVVhD77NtdqGOVBukHRqOa7d3t7RYVnti3PtvKtGEk", + "BGtoPXcrt6n0fsi0ylHTbJR8+4gkNMXnkflf48ydJ5t5d55n3nOKKzlnnPwBmZ5455vnmdjt07+jUq0U", + "zhhDh5ib0rFvd149F/dNTq+hyoQ7RcLu7qOS0LkI0CWmaYLqywKvnsv+DqgETnGOToFfA3cUeDiq0xIf", + "QX+9uLsYJaIqCswX7qui6Iwhlz3hmVDg7WKJgu/bsclCsViMKS5gzK6Bc5Jp6A/QYZRszW1t95aD4Rlo", + "EYQg5hfmJ0+IILELAEOB5M6XkxvI3IAIOa3L+5ey6ordn5xXM9HXcenGUGzqUvZ+9szrp+TLq6V/GFeG", + "RM2NXn2qoFxfC49H5f2yzBfubnjw+S1hilpKzlSW5a1nO2G69b20J47TwWzPHKjD6v51pO6P1OsIdd8I", + "ZT6yc8ZQ/aWFe4YoEjqGDwIDMnO9p2dwYHViHn5O73kc/s9IzGNXXdZe/xfPz9fQ82DoeWByTAIP9YHn", + "uv6SZhR53se+H3mvpMN9b+15MMjM9swgFO4mreFnnXQ8gefX3y18mOs7xxglWzm5hnFYdrtq+RFdeHh1", + "/Ka80f8etKw4hQwBzfQnw0QUItr1iUth4uE66qmefmaU6C3GXAPGGjAeDzCUmRmw+BrUyNueaZAjLwak", + "Cvo4ttIlHxjlmM4qBWF1tUMXBQ6Pnsrxm6utz+3s3kXOtX+v/fsR/Vt7y739OS+MC9tq/TG2n7Ic7/Z7", + "tP3qpa0N1/dWMV2S8Ue+kvnEWX9nxmd287Dqfu3oa0d/PEd33ueMG+0+wO9F10FGyZaK0AOOHt63irbN", + "zeKmLDKe1HvFcE8U1rvldutThrXb/03cXhcafsUhg/TcL3B2U7I4aKsv7OL/V6Hmf3h0t+LdJqBsiiMx", + "zbwq1eD/z+xBClMG+aRQEVRaPjNWhP+b6xor1ljx+FhRu9DDwMJ212hReV+vj8KE/YJ2vRJAk4X774H0", + "rVEpUPOfhETdvvkG9xOvDtxE6+xg7fF/E4/3vl9/T1evamcYJVte5Xq0lqqpJX+6QzM7xYMKqYLOQktT", + "aNm1/jcUVz39JmdVht6woqgokQv31abEXvjWNdtib2sr44CLsf0k1GZuu2+mqru+RtEz/qnUKVLfsPVA", + "QrfbwiXZmoDEW7Xy7i7u/j8AAP//9DvSdKt/AAA=", } // GetSwagger returns the content of the embedded swagger specification file diff --git a/core/livepeernode.go b/core/livepeernode.go index 99c28667ab..0a7ebbce06 100644 --- a/core/livepeernode.go +++ b/core/livepeernode.go @@ -19,6 +19,7 @@ import ( "time" "github.com/golang/glog" + "github.com/livepeer/go-livepeer/media" "github.com/livepeer/go-livepeer/pm" "github.com/livepeer/go-livepeer/trickle" @@ -174,6 +175,7 @@ type LivePipeline struct { Pipeline string ControlPub *trickle.TricklePublisher StopControl func() + DataWriter *media.SegmentWriter } // NewLivepeerNode creates a new Livepeer Node. Eth can be nil. diff --git a/server/ai_http.go b/server/ai_http.go index 97a68a6530..b084ba4a16 100644 --- a/server/ai_http.go +++ b/server/ai_http.go @@ -145,8 +145,29 @@ func (h *lphttp) StartLiveVideoToVideo() http.Handler { subUrl = pubUrl + "-out" controlUrl = pubUrl + "-control" eventsUrl = pubUrl + "-events" + dataUrl = pubUrl + "-data" ) + //if data is not enabled remove the url and do not start the data channel + if enableData, ok := (*req.Params)["enableData"]; ok { + if val, ok := enableData.(bool); ok { + //turn off data channel if request sets to false + if !val { + dataUrl = "" + } else { + clog.Infof(ctx, "data channel is enabled") + } + } else { + clog.Warningf(ctx, "enableData is not a bool, got type %T", enableData) + } + + //delete the param used for go-livepeer signaling + delete((*req.Params), "enableData") + } else { + //default to no data channel + dataUrl = "" + } + // Handle initial payment, the rest of the payments are done separately from the stream processing // Note that this payment is debit from the balance and acts as a buffer for the AI Realtime Video processing payment, err := getPayment(r.Header.Get(paymentHeader)) @@ -181,6 +202,13 @@ func (h *lphttp) StartLiveVideoToVideo() http.Handler { eventsCh := trickle.NewLocalPublisher(h.trickleSrv, mid+"-events", "application/json") eventsCh.CreateChannel() + //optional channels + var dataCh *trickle.TrickleLocalPublisher + if dataUrl != "" { + dataCh = trickle.NewLocalPublisher(h.trickleSrv, mid+"-data", "application/jsonl") + dataCh.CreateChannel() + } + // Start payment receiver which accounts the payments and stops the stream if the payment is insufficient priceInfo := payment.GetExpectedPrice() var paymentProcessor *LivePaymentProcessor @@ -200,6 +228,9 @@ func (h *lphttp) StartLiveVideoToVideo() http.Handler { subCh.Close() eventsCh.Close() controlPubCh.Close() + if dataCh != nil { + dataCh.Close() + } cancel() } return err @@ -227,17 +258,25 @@ func (h *lphttp) StartLiveVideoToVideo() http.Handler { }() // Prepare request to worker + // required channels controlUrlOverwrite := overwriteHost(h.node.LiveAITrickleHostForRunner, controlUrl) eventsUrlOverwrite := overwriteHost(h.node.LiveAITrickleHostForRunner, eventsUrl) subscribeUrlOverwrite := overwriteHost(h.node.LiveAITrickleHostForRunner, pubUrl) publishUrlOverwrite := overwriteHost(h.node.LiveAITrickleHostForRunner, subUrl) + // optional channels + var dataUrlOverwrite string + if dataCh != nil { + dataUrlOverwrite = overwriteHost(h.node.LiveAITrickleHostForRunner, dataUrl) + } + workerReq := worker.LiveVideoToVideoParams{ ModelId: req.ModelId, PublishUrl: publishUrlOverwrite, SubscribeUrl: subscribeUrlOverwrite, EventsUrl: &eventsUrlOverwrite, ControlUrl: &controlUrlOverwrite, + DataUrl: &dataUrlOverwrite, Params: req.Params, GatewayRequestId: &gatewayRequestID, ManifestId: &mid, @@ -255,6 +294,9 @@ func (h *lphttp) StartLiveVideoToVideo() http.Handler { subCh.Close() controlPubCh.Close() eventsCh.Close() + if dataCh != nil { + dataCh.Close() + } cancel() respondWithError(w, err.Error(), http.StatusInternalServerError) return @@ -266,6 +308,7 @@ func (h *lphttp) StartLiveVideoToVideo() http.Handler { SubscribeUrl: subUrl, ControlUrl: &controlUrl, EventsUrl: &eventsUrl, + DataUrl: &dataUrl, RequestId: &requestID, ManifestId: &mid, }) diff --git a/server/ai_live_video.go b/server/ai_live_video.go index 2acd3d40f2..1c02271d3a 100644 --- a/server/ai_live_video.go +++ b/server/ai_live_video.go @@ -1,6 +1,7 @@ package server import ( + "bufio" "context" "encoding/json" "errors" @@ -505,8 +506,9 @@ func registerControl(ctx context.Context, params aiRequestParams) { } params.node.LivePipelines[stream] = &core.LivePipeline{ - RequestID: params.liveParams.requestID, - Pipeline: params.liveParams.pipeline, + RequestID: params.liveParams.requestID, + Pipeline: params.liveParams.pipeline, + DataWriter: params.liveParams.dataWriter, } } @@ -760,6 +762,125 @@ func startEventsSubscribe(ctx context.Context, url *url.URL, params aiRequestPar }() } +func startDataSubscribe(ctx context.Context, url *url.URL, params aiRequestParams, sess *AISession) { + //only start DataSubscribe if enabled + if params.liveParams.dataWriter == nil { + return + } + + // subscribe to the outputs + subscriber, err := trickle.NewTrickleSubscriber(trickle.TrickleSubscriberConfig{ + URL: url.String(), + Ctx: ctx, + }) + if err != nil { + clog.Infof(ctx, "Failed to create data subscriber: %s", err) + return + } + + dataWriter := params.liveParams.dataWriter + + // read segments from trickle subscription + go func() { + defer dataWriter.Close() + + var err error + firstSegment := true + + retries := 0 + // we're trying to keep (retryPause x maxRetries) duration to fall within one output GOP length + const retryPause = 300 * time.Millisecond + const maxRetries = 5 + for { + select { + case <-ctx.Done(): + clog.Info(ctx, "data subscribe done") + return + default: + } + if !params.inputStreamExists() { + clog.Infof(ctx, "data subscribe stopping, input stream does not exist.") + break + } + var segment *http.Response + readBytes, readMessages := 0, 0 + clog.V(8).Infof(ctx, "data subscribe await") + segment, err = subscriber.Read() + if err != nil { + if errors.Is(err, trickle.EOS) || errors.Is(err, trickle.StreamNotFoundErr) { + stopProcessing(ctx, params, fmt.Errorf("data subscribe stopping, stream not found, err=%w", err)) + return + } + var sequenceNonexistent *trickle.SequenceNonexistent + if errors.As(err, &sequenceNonexistent) { + // stream exists but segment doesn't, so skip to leading edge + subscriber.SetSeq(sequenceNonexistent.Latest) + } + // TODO if not EOS then signal a new orchestrator is needed + err = fmt.Errorf("data subscribe error reading: %w", err) + clog.Infof(ctx, "%s", err) + if retries > maxRetries { + stopProcessing(ctx, params, errors.New("data subscribe stopping, retries exceeded")) + return + } + retries++ + params.liveParams.sendErrorEvent(err) + time.Sleep(retryPause) + continue + } + retries = 0 + seq := trickle.GetSeq(segment) + clog.V(8).Infof(ctx, "data subscribe received seq=%d", seq) + copyStartTime := time.Now() + + defer segment.Body.Close() + scanner := bufio.NewScanner(segment.Body) + for scanner.Scan() { + writer, err := dataWriter.Next() + if err != nil { + if err != io.EOF { + stopProcessing(ctx, params, fmt.Errorf("data subscribe could not get next: %w", err)) + } + return + } + n, err := writer.Write(scanner.Bytes()) + if err != nil { + stopProcessing(ctx, params, fmt.Errorf("data subscribe could not write: %w", err)) + } + readBytes += n + readMessages += 1 + } + if err := scanner.Err(); err != nil { + clog.InfofErr(ctx, "data subscribe error reading seq=%d", seq, err) + subscriber.SetSeq(seq) + retries++ + continue + } + + if firstSegment { + firstSegment = false + delayMs := time.Since(params.liveParams.startTime).Milliseconds() + if monitor.Enabled { + monitor.AIFirstSegmentDelay(delayMs, params.liveParams.sess.OrchestratorInfo) + monitor.SendQueueEventAsync("stream_trace", map[string]interface{}{ + "type": "gateway_receive_first_data_segment", + "timestamp": time.Now().UnixMilli(), + "stream_id": params.liveParams.streamID, + "pipeline_id": params.liveParams.pipelineID, + "request_id": params.liveParams.requestID, + "orchestrator_info": map[string]interface{}{ + "address": params.liveParams.sess.Address(), + "url": params.liveParams.sess.Transcoder(), + }, + }) + } + } + + clog.V(8).Info(ctx, "data subscribe read completed", "seq", seq, "bytes", humanize.Bytes(uint64(readBytes)), "messages", readMessages, "took", time.Since(copyStartTime)) + } + }() +} + func (a aiRequestParams) inputStreamExists() bool { if a.node == nil { return false @@ -795,7 +916,7 @@ const maxInflightSegments = 3 // If inflight max is hit, returns true, false otherwise. func (s *SlowOrchChecker) BeginSegment() (int, bool) { // Returns `false` if there are multiple segments in-flight - // this means the orchestrator is slow reading them + // this means the orchestrator is slow reading // If all-OK, returns `true` s.mu.Lock() defer s.mu.Unlock() diff --git a/server/ai_mediaserver.go b/server/ai_mediaserver.go index e65903525f..99db461019 100644 --- a/server/ai_mediaserver.go +++ b/server/ai_mediaserver.go @@ -109,6 +109,10 @@ func startAIMediaServer(ctx context.Context, ls *LivepeerServer) error { ls.HTTPMux.Handle("OPTIONS /live/video-to-video/{streamId}/status", ls.WithCode(http.StatusNoContent)) ls.HTTPMux.Handle("/live/video-to-video/{streamId}/status", ls.GetLiveVideoToVideoStatus()) + // Stream data SSE endpoint + ls.HTTPMux.Handle("OPTIONS /live/video-to-video/{stream}/data", ls.WithCode(http.StatusNoContent)) + ls.HTTPMux.Handle("GET /live/video-to-video/{stream}/data", ls.GetLiveVideoToVideoData()) + //API for dynamic capabilities ls.HTTPMux.Handle("/process/request/", ls.SubmitJob()) @@ -633,6 +637,15 @@ func (ls *LivepeerServer) StartLiveVideo() http.Handler { }, } + //create a dataWriter for data channel if enabled + if enableData, ok := pipelineParams["enableData"]; ok { + if enableData == true || enableData == "true" { + params.liveParams.dataWriter = media.NewSegmentWriter(5) + pipelineParams["enableData"] = true + clog.Infof(ctx, "Data channel enabled for stream %s", streamName) + } + } + registerControl(ctx, params) // Create a special parent context for orchestrator cancellation @@ -735,6 +748,7 @@ func processStream(ctx context.Context, params aiRequestParams, req worker.GenLi func newParams(params *liveRequestParams, cancelOrch context.CancelCauseFunc) *liveRequestParams { return &liveRequestParams{ segmentReader: params.segmentReader, + dataWriter: params.dataWriter, rtmpOutputs: params.rtmpOutputs, localRTMPPrefix: params.localRTMPPrefix, stream: params.stream, @@ -757,6 +771,8 @@ func startProcessing(ctx context.Context, params aiRequestParams, res interface{ resp := res.(*worker.GenLiveVideoToVideoResponse) host := params.liveParams.sess.Transcoder() + + //required channels pub, err := common.AppendHostname(resp.JSON200.PublishUrl, host) if err != nil { return fmt.Errorf("invalid publish URL: %w", err) @@ -773,16 +789,30 @@ func startProcessing(ctx context.Context, params aiRequestParams, res interface{ if err != nil { return fmt.Errorf("invalid events URL: %w", err) } + if resp.JSON200.ManifestId != nil { ctx = clog.AddVal(ctx, "manifest_id", *resp.JSON200.ManifestId) params.liveParams.manifestID = *resp.JSON200.ManifestId } + clog.V(common.VERBOSE).Infof(ctx, "pub %s sub %s control %s events %s", pub, sub, control, events) startControlPublish(ctx, control, params) startTricklePublish(ctx, pub, params, params.liveParams.sess) startTrickleSubscribe(ctx, sub, params, params.liveParams.sess) startEventsSubscribe(ctx, events, params, params.liveParams.sess) + + //optional channels + var data *url.URL + if *resp.JSON200.DataUrl != "" { + data, err = common.AppendHostname(*resp.JSON200.DataUrl, host) + if err != nil { + return fmt.Errorf("invalid data URL: %w", err) + } + clog.V(common.VERBOSE).Infof(ctx, "data %s", data) + startDataSubscribe(ctx, data, params, params.liveParams.sess) + } + return nil } @@ -1094,6 +1124,15 @@ func (ls *LivepeerServer) CreateWhip(server *media.WHIPServer) http.Handler { }, } + //create a dataWriter for data channel if enabled + if enableData, ok := pipelineParams["enableData"]; ok { + if enableData == true || enableData == "true" { + params.liveParams.dataWriter = media.NewSegmentWriter(5) + pipelineParams["enableData"] = true + clog.Infof(ctx, "Data channel enabled for stream %s", streamName) + } + } + registerControl(ctx, params) req := worker.GenLiveVideoToVideoJSONRequestBody{ @@ -1304,3 +1343,75 @@ func (ls *LivepeerServer) SmokeTestLiveVideo() http.Handler { }() }) } + +// @Summary Get Live Stream Data +// @Param stream path string true "Stream Key" +// @Success 200 +// @Router /live/video-to-video/{stream}/data [get] +func (ls *LivepeerServer) GetLiveVideoToVideoData() http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + stream := r.PathValue("stream") + if stream == "" { + http.Error(w, "stream name is required", http.StatusBadRequest) + return + } + + ctx := r.Context() + ctx = clog.AddVal(ctx, "stream", stream) + + // Get the live pipeline for this stream + livePipeline, ok := ls.LivepeerNode.LivePipelines[stream] + if !ok { + http.Error(w, "Stream not found", http.StatusNotFound) + return + } + + // Get the data readerring buffer + if livePipeline.DataWriter == nil { + clog.Infof(ctx, "No data writer available for stream %s", stream) + http.Error(w, "Stream data not available", http.StatusServiceUnavailable) + return + } + dataReader := livePipeline.DataWriter.MakeReader(media.SegmentReaderConfig{}) + + // Set up SSE headers + w.Header().Set("Content-Type", "text/event-stream") + w.Header().Set("Cache-Control", "no-cache") + w.Header().Set("Connection", "keep-alive") + w.Header().Set("Access-Control-Allow-Origin", "*") + + flusher, ok := w.(http.Flusher) + if !ok { + http.Error(w, "Streaming not supported", http.StatusInternalServerError) + return + } + + clog.Infof(ctx, "Starting SSE data stream for stream=%s", stream) + + // Listen for broadcast signals from ring buffer writes + // dataReader.Read() blocks on rb.cond.Wait() until startDataSubscribe broadcasts + for { + select { + case <-ctx.Done(): + clog.Info(ctx, "SSE data stream client disconnected") + return + default: + reader, err := dataReader.Next() + if err != nil { + if err == io.EOF { + // Stream ended + fmt.Fprintf(w, `event: end\ndata: {"type":"stream_ended"}\n\n`) + flusher.Flush() + return + } + clog.Errorf(ctx, "Error reading from ring buffer: %v", err) + return + } + + data, err := io.ReadAll(reader) + fmt.Fprintf(w, "data: %s\n\n", data) + flusher.Flush() + } + } + }) +} diff --git a/server/ai_process.go b/server/ai_process.go index cc50b380dd..a0034e57a1 100644 --- a/server/ai_process.go +++ b/server/ai_process.go @@ -96,6 +96,7 @@ type aiRequestParams struct { // For live video pipelines type liveRequestParams struct { segmentReader *media.SwitchableSegmentReader + dataWriter *media.SegmentWriter stream string requestID string streamID string