-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathprocess_data.py
More file actions
172 lines (128 loc) · 4.69 KB
/
process_data.py
File metadata and controls
172 lines (128 loc) · 4.69 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
from youtube_transcript_api import YouTubeTranscriptApi
import pandas as pd
import requests
import json
import os
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.embeddings.openai import OpenAIEmbeddings
from langchain.embeddings import CacheBackedEmbeddings
from langchain.storage import LocalFileStore
from tqdm.auto import tqdm
from uuid import uuid4
import os
from dotenv import load_dotenv
load_dotenv()
from langchain.embeddings.openai import OpenAIEmbeddings
from langchain.embeddings import CacheBackedEmbeddings
from langchain.storage import LocalFileStore
from langchain.vectorstores import Pinecone
from langchain.text_splitter import RecursiveCharacterTextSplitter
import pinecone
from tqdm.auto import tqdm
from uuid import uuid4
import scrapetube
# =============================================================================
# DATA PREPARATION
# =============================================================================
def get_youtube_transcript(video_id):
url = "https://www.youtube.com/watch?v=" + video_id
try:
raw = YouTubeTranscriptApi.get_transcript(video_id)
except:
try:
transcript_list = YouTubeTranscriptApi.list_transcripts(video_id)
for transcript in transcript_list:
raw = transcript.translate('en').fetch()
break
except:
print(f"No transcript found for {url}")
return False
df = pd.DataFrame(raw)
df["transcript"] = df["text"] + '<' + df['start'].astype(str) + '>'
transcript = df['transcript'].str.cat(sep=' ')
return transcript
def get_youtube_data(video_id):
url = "https://www.youtube.com/watch?v=" + video_id
response = requests.get(f"https://noembed.com/embed?dataType=json&url={url}")
data = json.loads(response.content)
title, author = data["title"], data["author_name"]
print(title, author)
# ' is a reserved character
title = title.replace("'", "")
author = author.replace("'", "")
return title, author, url
def create_index(video_id):
transcript = get_youtube_transcript(video_id)
if transcript == False:
return False
title, author, url = get_youtube_data(video_id)
text_splitter = RecursiveCharacterTextSplitter(
chunk_size = 1000, # the character length of the chunk
chunk_overlap = 100, # the character length of the overlap between chunks
length_function = len, # the length function - in this case, character length (aka the python len() fn.)
separators=["\n\n", "\n", " ", ""]
)
index_name = 'youtube-index'
pinecone.init(
api_key=os.getenv('PINECONE_API_KEY'),
environment=os.getenv('PINECONE_ENV')
)
if index_name not in pinecone.list_indexes():
# we create a new index
pinecone.create_index(
name=index_name,
metric='cosine',
dimension=1536
)
index = pinecone.Index(index_name)
store = LocalFileStore("./cache/")
core_embeddings_model = OpenAIEmbeddings()
embedder = CacheBackedEmbeddings.from_bytes_store(
core_embeddings_model,
store,
namespace=core_embeddings_model.model
)
BATCH_LIMIT = 100
texts = []
metadatas = []
metadata = {
'source_document' : title,
'link' : url,
'author' : author
}
record_texts = text_splitter.split_text(transcript)
record_metadatas = [{
"chunk": j, "text": text, **metadata
} for j, text in enumerate(record_texts)]
texts.extend(record_texts)
metadatas.extend(record_metadatas)
if len(texts) >= BATCH_LIMIT:
ids = [str(uuid4()) for _ in range(len(texts))]
embeds = embedder.embed_documents(texts)
index.upsert(vectors=zip(ids, embeds, metadatas))
texts = []
metadatas = []
if len(texts) > 0:
ids = [str(uuid4()) for _ in range(len(texts))]
embeds = embedder.embed_documents(texts)
index.upsert(vectors=zip(ids, embeds, metadatas))
text_field = "text"
Pinecone(
index,
embedder,
text_field
)
# =============================================================================
#
# =============================================================================
def index_channel(channel_id):
videos = scrapetube.get_channel(channel_id)
for video in videos:
create_index(video['videoId'])
live_streams = scrapetube.get_channel(channel_id, content_type='streams')
for stream in live_streams:
create_index(stream['videoId'])
def index_video(video_url):
video_id = video_url.split('=')[-1]
print(video_id)
create_index(video_id)