Overhaul extractors.cr to use modules

This commit is contained in:
syeopite 2021-08-03 00:22:31 -07:00
parent 3dea670091
commit 142317c2be
No known key found for this signature in database
GPG Key ID: 6FA616E5A5294A82

View File

@ -3,257 +3,245 @@
# Tuple of Parsers/Extractors so we can easily cycle through them. # Tuple of Parsers/Extractors so we can easily cycle through them.
private ITEM_CONTAINER_EXTRACTOR = { private ITEM_CONTAINER_EXTRACTOR = {
YoutubeTabsExtractor.new, Extractors::YouTubeTabs,
SearchResultsExtractor.new, Extractors::SearchResults,
ContinuationExtractor.new, Extractors::Continuation,
} }
private ITEM_PARSERS = { private ITEM_PARSERS = {
VideoParser.new, Parsers::VideoRendererParser,
ChannelParser.new, Parsers::ChannelRendererParser,
GridPlaylistParser.new, Parsers::GridPlaylistRendererParser,
PlaylistParser.new, Parsers::PlaylistRendererParser,
CategoryParser.new, Parsers::CategoryRendererParser,
} }
private struct AuthorFallback record AuthorFallback, name : String? = nil, id : String? = nil
property name, id
def initialize(@name : String? = nil, @id : String? = nil)
end
end
# The following are the parsers for parsing raw item data into neatly packaged structs. # The following are the parsers for parsing raw item data into neatly packaged structs.
# They're accessed through the process() method which validates the given data as applicable # They're accessed through the process() method which validates the given data as applicable
# to their specific struct and then use the internal parse() method to assemble the struct # to their specific struct and then use the internal parse() method to assemble the struct
# specific to their category. # specific to their category.
private abstract struct ItemParser private module Parsers
# Base type for all item parsers. module VideoRendererParser
def process(item : JSON::Any, author_fallback : AuthorFallback) def self.process(item : JSON::Any, author_fallback : AuthorFallback)
end if item_contents = (item["videoRenderer"]? || item["gridVideoRenderer"]?)
return self.parse(item_contents, author_fallback)
private def parse(item_contents : JSON::Any, author_fallback : AuthorFallback)
end
end
private struct VideoParser < ItemParser
def process(item, author_fallback)
if item_contents = (item["videoRenderer"]? || item["gridVideoRenderer"]?)
return self.parse(item_contents, author_fallback)
end
end
private def parse(item_contents, author_fallback)
video_id = item_contents["videoId"].as_s
title = item_contents["title"].try { |t| t["simpleText"]?.try &.as_s || t["runs"]?.try &.as_a.map(&.["text"].as_s).join("") } || ""
author_info = item_contents["ownerText"]?.try &.["runs"]?.try &.as_a?.try &.[0]?
author = author_info.try &.["text"].as_s || author_fallback.name || ""
author_id = author_info.try &.["navigationEndpoint"]?.try &.["browseEndpoint"]["browseId"].as_s || author_fallback.id || ""
published = item_contents["publishedTimeText"]?.try &.["simpleText"]?.try { |t| decode_date(t.as_s) } || Time.local
view_count = item_contents["viewCountText"]?.try &.["simpleText"]?.try &.as_s.gsub(/\D+/, "").to_i64? || 0_i64
description_html = item_contents["descriptionSnippet"]?.try { |t| parse_content(t) } || ""
length_seconds = item_contents["lengthText"]?.try &.["simpleText"]?.try &.as_s.try { |t| decode_length_seconds(t) } ||
item_contents["thumbnailOverlays"]?.try &.as_a.find(&.["thumbnailOverlayTimeStatusRenderer"]?).try &.["thumbnailOverlayTimeStatusRenderer"]?
.try &.["text"]?.try &.["simpleText"]?.try &.as_s.try { |t| decode_length_seconds(t) } || 0
live_now = false
paid = false
premium = false
premiere_timestamp = item_contents["upcomingEventData"]?.try &.["startTime"]?.try { |t| Time.unix(t.as_s.to_i64) }
item_contents["badges"]?.try &.as_a.each do |badge|
b = badge["metadataBadgeRenderer"]
case b["label"].as_s
when "LIVE NOW"
live_now = true
when "New", "4K", "CC"
# TODO
when "Premium"
# TODO: Potentially available as item_contents["topStandaloneBadge"]["metadataBadgeRenderer"]
premium = true
else nil # Ignore
end end
end end
SearchVideo.new({ private def self.parse(item_contents, author_fallback)
title: title, video_id = item_contents["videoId"].as_s
id: video_id, title = item_contents["title"].try { |t| t["simpleText"]?.try &.as_s || t["runs"]?.try &.as_a.map(&.["text"].as_s).join("") } || ""
author: author,
ucid: author_id,
published: published,
views: view_count,
description_html: description_html,
length_seconds: length_seconds,
live_now: live_now,
premium: premium,
premiere_timestamp: premiere_timestamp,
})
end
end
private struct ChannelParser < ItemParser author_info = item_contents["ownerText"]?.try &.["runs"]?.try &.as_a?.try &.[0]?
def process(item, author_fallback) author = author_info.try &.["text"].as_s || author_fallback.name || ""
if item_contents = (item["channelRenderer"]? || item["gridChannelRenderer"]?) author_id = author_info.try &.["navigationEndpoint"]?.try &.["browseEndpoint"]["browseId"].as_s || author_fallback.id || ""
return self.parse(item_contents, author_fallback)
end
end
private def parse(item_contents, author_fallback) published = item_contents["publishedTimeText"]?.try &.["simpleText"]?.try { |t| decode_date(t.as_s) } || Time.local
author = item_contents["title"]["simpleText"]?.try &.as_s || author_fallback.name || "" view_count = item_contents["viewCountText"]?.try &.["simpleText"]?.try &.as_s.gsub(/\D+/, "").to_i64? || 0_i64
author_id = item_contents["channelId"]?.try &.as_s || author_fallback.id || "" description_html = item_contents["descriptionSnippet"]?.try { |t| parse_content(t) } || ""
length_seconds = item_contents["lengthText"]?.try &.["simpleText"]?.try &.as_s.try { |t| decode_length_seconds(t) } ||
item_contents["thumbnailOverlays"]?.try &.as_a.find(&.["thumbnailOverlayTimeStatusRenderer"]?).try &.["thumbnailOverlayTimeStatusRenderer"]?
.try &.["text"]?.try &.["simpleText"]?.try &.as_s.try { |t| decode_length_seconds(t) } || 0
author_thumbnail = item_contents["thumbnail"]["thumbnails"]?.try &.as_a[0]?.try &.["url"]?.try &.as_s || "" live_now = false
subscriber_count = item_contents["subscriberCountText"]?.try &.["simpleText"]?.try &.as_s.try { |s| short_text_to_number(s.split(" ")[0]) } || 0 paid = false
premium = false
auto_generated = false premiere_timestamp = item_contents["upcomingEventData"]?.try &.["startTime"]?.try { |t| Time.unix(t.as_s.to_i64) }
auto_generated = true if !item_contents["videoCountText"]?
video_count = item_contents["videoCountText"]?.try &.["runs"].as_a[0]?.try &.["text"].as_s.gsub(/\D/, "").to_i || 0
description_html = item_contents["descriptionSnippet"]?.try { |t| parse_content(t) } || ""
SearchChannel.new({ item_contents["badges"]?.try &.as_a.each do |badge|
author: author, b = badge["metadataBadgeRenderer"]
ucid: author_id, case b["label"].as_s
author_thumbnail: author_thumbnail, when "LIVE NOW"
subscriber_count: subscriber_count, live_now = true
video_count: video_count, when "New", "4K", "CC"
description_html: description_html, # TODO
auto_generated: auto_generated, when "Premium"
}) # TODO: Potentially available as item_contents["topStandaloneBadge"]["metadataBadgeRenderer"]
end premium = true
end else nil # Ignore
end
end
private struct GridPlaylistParser < ItemParser SearchVideo.new({
def process(item, author_fallback) title: title,
if item_contents = item["gridPlaylistRenderer"]? id: video_id,
return self.parse(item_contents, author_fallback) author: author,
end ucid: author_id,
end published: published,
views: view_count,
private def parse(item_contents, author_fallback) description_html: description_html,
title = item_contents["title"]["runs"].as_a[0]?.try &.["text"].as_s || "" length_seconds: length_seconds,
plid = item_contents["playlistId"]?.try &.as_s || "" live_now: live_now,
premium: premium,
video_count = item_contents["videoCountText"]["runs"].as_a[0]?.try &.["text"].as_s.gsub(/\D/, "").to_i || 0 premiere_timestamp: premiere_timestamp,
playlist_thumbnail = item_contents["thumbnail"]["thumbnails"][0]?.try &.["url"]?.try &.as_s || ""
SearchPlaylist.new({
title: title,
id: plid,
author: author_fallback.name || "",
ucid: author_fallback.id || "",
video_count: video_count,
videos: [] of SearchPlaylistVideo,
thumbnail: playlist_thumbnail,
})
end
end
private struct PlaylistParser < ItemParser
def process(item, author_fallback)
if item_contents = item["playlistRenderer"]?
return self.parse(item_contents, author_fallback)
end
end
def parse(item_contents, author_fallback)
title = item_contents["title"]["simpleText"]?.try &.as_s || ""
plid = item_contents["playlistId"]?.try &.as_s || ""
video_count = item_contents["videoCount"]?.try &.as_s.to_i || 0
playlist_thumbnail = item_contents["thumbnails"].as_a[0]?.try &.["thumbnails"]?.try &.as_a[0]?.try &.["url"].as_s || ""
author_info = item_contents["shortBylineText"]?.try &.["runs"]?.try &.as_a?.try &.[0]?
author = author_info.try &.["text"].as_s || author_fallback.name || ""
author_id = author_info.try &.["navigationEndpoint"]?.try &.["browseEndpoint"]["browseId"].as_s || author_fallback.id || ""
videos = item_contents["videos"]?.try &.as_a.map do |v|
v = v["childVideoRenderer"]
v_title = v["title"]["simpleText"]?.try &.as_s || ""
v_id = v["videoId"]?.try &.as_s || ""
v_length_seconds = v["lengthText"]?.try &.["simpleText"]?.try { |t| decode_length_seconds(t.as_s) } || 0
SearchPlaylistVideo.new({
title: v_title,
id: v_id,
length_seconds: v_length_seconds,
}) })
end || [] of SearchPlaylistVideo
# TODO: item_contents["publishedTimeText"]?
SearchPlaylist.new({
title: title,
id: plid,
author: author,
ucid: author_id,
video_count: video_count,
videos: videos,
thumbnail: playlist_thumbnail,
})
end
end
private struct CategoryParser < ItemParser
def process(item, author_fallback)
if item_contents = item["shelfRenderer"]?
return self.parse(item_contents, author_fallback)
end end
end end
def parse(item_contents, author_fallback) module ChannelRendererParser
# Title extraction is a bit complicated. There are two possible routes for it def self.process(item : JSON::Any, author_fallback : AuthorFallback)
# as well as times when the title attribute just isn't sent by YT. if item_contents = (item["channelRenderer"]? || item["gridChannelRenderer"]?)
title_container = item_contents["title"]? || "" return self.parse(item_contents, author_fallback)
if !title_container.is_a? String end
if title = title_container["simpleText"]? end
title = title.as_s
private def self.parse(item_contents, author_fallback)
author = item_contents["title"]["simpleText"]?.try &.as_s || author_fallback.name || ""
author_id = item_contents["channelId"]?.try &.as_s || author_fallback.id || ""
author_thumbnail = item_contents["thumbnail"]["thumbnails"]?.try &.as_a[0]?.try &.["url"]?.try &.as_s || ""
subscriber_count = item_contents["subscriberCountText"]?.try &.["simpleText"]?.try &.as_s.try { |s| short_text_to_number(s.split(" ")[0]) } || 0
auto_generated = false
auto_generated = true if !item_contents["videoCountText"]?
video_count = item_contents["videoCountText"]?.try &.["runs"].as_a[0]?.try &.["text"].as_s.gsub(/\D/, "").to_i || 0
description_html = item_contents["descriptionSnippet"]?.try { |t| parse_content(t) } || ""
SearchChannel.new({
author: author,
ucid: author_id,
author_thumbnail: author_thumbnail,
subscriber_count: subscriber_count,
video_count: video_count,
description_html: description_html,
auto_generated: auto_generated,
})
end
end
module GridPlaylistRendererParser
def self.process(item : JSON::Any, author_fallback : AuthorFallback)
if item_contents = item["gridPlaylistRenderer"]?
return self.parse(item_contents, author_fallback)
end
end
private def self.parse(item_contents, author_fallback)
title = item_contents["title"]["runs"].as_a[0]?.try &.["text"].as_s || ""
plid = item_contents["playlistId"]?.try &.as_s || ""
video_count = item_contents["videoCountText"]["runs"].as_a[0]?.try &.["text"].as_s.gsub(/\D/, "").to_i || 0
playlist_thumbnail = item_contents["thumbnail"]["thumbnails"][0]?.try &.["url"]?.try &.as_s || ""
SearchPlaylist.new({
title: title,
id: plid,
author: author_fallback.name || "",
ucid: author_fallback.id || "",
video_count: video_count,
videos: [] of SearchPlaylistVideo,
thumbnail: playlist_thumbnail,
})
end
end
module PlaylistRendererParser
def self.process(item : JSON::Any, author_fallback : AuthorFallback)
if item_contents = item["playlistRenderer"]?
return self.parse(item_contents, author_fallback)
end
end
private def self.parse(item_contents, author_fallback)
title = item_contents["title"]["simpleText"]?.try &.as_s || ""
plid = item_contents["playlistId"]?.try &.as_s || ""
video_count = item_contents["videoCount"]?.try &.as_s.to_i || 0
playlist_thumbnail = item_contents["thumbnails"].as_a[0]?.try &.["thumbnails"]?.try &.as_a[0]?.try &.["url"].as_s || ""
author_info = item_contents["shortBylineText"]?.try &.["runs"]?.try &.as_a?.try &.[0]?
author = author_info.try &.["text"].as_s || author_fallback.name || ""
author_id = author_info.try &.["navigationEndpoint"]?.try &.["browseEndpoint"]["browseId"].as_s || author_fallback.id || ""
videos = item_contents["videos"]?.try &.as_a.map do |v|
v = v["childVideoRenderer"]
v_title = v["title"]["simpleText"]?.try &.as_s || ""
v_id = v["videoId"]?.try &.as_s || ""
v_length_seconds = v["lengthText"]?.try &.["simpleText"]?.try { |t| decode_length_seconds(t.as_s) } || 0
SearchPlaylistVideo.new({
title: v_title,
id: v_id,
length_seconds: v_length_seconds,
})
end || [] of SearchPlaylistVideo
# TODO: item_contents["publishedTimeText"]?
SearchPlaylist.new({
title: title,
id: plid,
author: author,
ucid: author_id,
video_count: video_count,
videos: videos,
thumbnail: playlist_thumbnail,
})
end
end
module CategoryRendererParser
def self.process(item : JSON::Any, author_fallback : AuthorFallback)
if item_contents = item["shelfRenderer"]?
return self.parse(item_contents, author_fallback)
end
end
private def self.parse(item_contents, author_fallback)
# Title extraction is a bit complicated. There are two possible routes for it
# as well as times when the title attribute just isn't sent by YT.
title_container = item_contents["title"]? || ""
if !title_container.is_a? String
if title = title_container["simpleText"]?
title = title.as_s
else
title = title_container["runs"][0]["text"].as_s
end
else else
title = title_container["runs"][0]["text"].as_s title = ""
end end
else
title = ""
end
url = item_contents["endpoint"]?.try &.["commandMetadata"]["webCommandMetadata"]["url"].as_s url = item_contents["endpoint"]?.try &.["commandMetadata"]["webCommandMetadata"]["url"].as_s
# Sometimes a category can have badges. # Sometimes a category can have badges.
badges = [] of Tuple(String, String) # (Badge style, label) badges = [] of Tuple(String, String) # (Badge style, label)
item_contents["badges"]?.try &.as_a.each do |badge| item_contents["badges"]?.try &.as_a.each do |badge|
badge = badge["metadataBadgeRenderer"] badge = badge["metadataBadgeRenderer"]
badges << {badge["style"].as_s, badge["label"].as_s} badges << {badge["style"].as_s, badge["label"].as_s}
end
# Category description
description_html = item_contents["subtitle"]?.try { |desc| parse_content(desc) } || ""
# Content parsing
contents = [] of SearchItem
# Content could be in three locations.
if content_container = item_contents["content"]["horizontalListRenderer"]?
elsif content_container = item_contents["content"]["expandedShelfContentsRenderer"]?
elsif content_container = item_contents["content"]["verticalListRenderer"]?
else
content_container = item_contents["contents"]
end
raw_contents = content_container["items"].as_a
raw_contents.each do |item|
result = extract_item(item)
if !result.nil?
contents << result
end end
end
Category.new({ # Category description
title: title, description_html = item_contents["subtitle"]?.try { |desc| parse_content(desc) } || ""
contents: contents,
description_html: description_html, # Content parsing
url: url, contents = [] of SearchItem
badges: badges,
}) # Content could be in three locations.
if content_container = item_contents["content"]["horizontalListRenderer"]?
elsif content_container = item_contents["content"]["expandedShelfContentsRenderer"]?
elsif content_container = item_contents["content"]["verticalListRenderer"]?
else
content_container = item_contents["contents"]
end
raw_contents = content_container["items"].as_a
raw_contents.each do |item|
result = extract_item(item)
if !result.nil?
contents << result
end
end
Category.new({
title: title,
contents: contents,
description_html: description_html,
url: url,
badges: badges,
})
end
end end
end end
@ -262,88 +250,82 @@ end
# a structure we can more easily use via the parsers above. Their internals are # a structure we can more easily use via the parsers above. Their internals are
# identical to the item parsers. # identical to the item parsers.
private abstract struct ItemsContainerExtractor private module Extractors
def process(item : Hash(String, JSON::Any)) module YouTubeTabs
end def self.process(initial_data : Hash(String, JSON::Any))
if target = initial_data["twoColumnBrowseResultsRenderer"]?
private def extract(target : JSON::Any) self.extract(target)
end
end
private struct YoutubeTabsExtractor < ItemsContainerExtractor
def process(initial_data)
if target = initial_data["twoColumnBrowseResultsRenderer"]?
self.extract(target)
end
end
private def extract(target)
raw_items = [] of JSON::Any
selected_tab = extract_selected_tab(target["tabs"])
content = selected_tab["content"]
content["sectionListRenderer"]["contents"].as_a.each do |renderer_container|
renderer_container = renderer_container["itemSectionRenderer"]
renderer_container_contents = renderer_container["contents"].as_a[0]
# Category extraction
if items_container = renderer_container_contents["shelfRenderer"]?
raw_items << renderer_container_contents
next
elsif items_container = renderer_container_contents["gridRenderer"]?
else
items_container = renderer_container_contents
end
items_container["items"].as_a.each do |item|
raw_items << item
end end
end end
return raw_items private def self.extract(target)
end raw_items = [] of JSON::Any
end selected_tab = extract_selected_tab(target["tabs"])
content = selected_tab["content"]
private struct SearchResultsExtractor < ItemsContainerExtractor content["sectionListRenderer"]["contents"].as_a.each do |renderer_container|
def process(initial_data) renderer_container = renderer_container["itemSectionRenderer"]
if target = initial_data["twoColumnSearchResultsRenderer"]? renderer_container_contents = renderer_container["contents"].as_a[0]
self.extract(target)
# Category extraction
if items_container = renderer_container_contents["shelfRenderer"]?
raw_items << renderer_container_contents
next
elsif items_container = renderer_container_contents["gridRenderer"]?
else
items_container = renderer_container_contents
end
items_container["items"].as_a.each do |item|
raw_items << item
end
end
return raw_items
end end
end end
private def extract(target) module SearchResults
raw_items = [] of Array(JSON::Any) def self.process(initial_data : Hash(String, JSON::Any))
content = target["primaryContents"] if target = initial_data["twoColumnSearchResultsRenderer"]?
renderer = content["sectionListRenderer"]["contents"].as_a.each do |node| self.extract(target)
if node = node["itemSectionRenderer"]?
raw_items << node["contents"].as_a
end end
end end
raw_items = raw_items.flatten private def self.extract(target)
raw_items = [] of Array(JSON::Any)
content = target["primaryContents"]
renderer = content["sectionListRenderer"]["contents"].as_a.each do |node|
if node = node["itemSectionRenderer"]?
raw_items << node["contents"].as_a
end
end
return raw_items raw_items = raw_items.flatten
end
end
private struct ContinuationExtractor < ItemsContainerExtractor return raw_items
def process(initial_data)
if target = initial_data["continuationContents"]?
self.extract(target)
elsif target = initial_data["appendContinuationItemsAction"]?
self.extract(target)
end end
end end
private def extract(target) module Continuation
raw_items = [] of JSON::Any def self.process(initial_data : Hash(String, JSON::Any))
if content = target["gridContinuation"]? if target = initial_data["continuationContents"]?
raw_items = content["items"].as_a self.extract(target)
elsif content = target["continuationItems"]? elsif target = initial_data["appendContinuationItemsAction"]?
raw_items = content.as_a self.extract(target)
end
end end
return raw_items private def self.extract(target)
raw_items = [] of JSON::Any
if content = target["gridContinuation"]?
raw_items = content["items"].as_a
elsif content = target["continuationItems"]?
raw_items = content.as_a
end
return raw_items
end
end end
end end