Как парсить вк python
Перейти к содержимому

Как парсить вк python

  • автор:

Парсинг данных через api vk и google sheets api на python

Появилась потребность собирать статистику постов из группы в контакте и затем проанализировать реакции подписчиков на конкретные посты. Если переформулировать на выходе стоит задача с заданной периодичностью снимать показания статистики постов в вк и сохранять их.

Я не профессиональный программист и не претендую, поэтому решил сделать все довольно просто. При помощи api VK забирать посты из группы, собираю нужный мне датафрейм и записываю данные в гугл таблицу, так же через api.

Может быть это и не самое оптимальное решение,

Настраиваем API VK

В этом блоке мы хотим собрать статистику постов из группы vk.

Для начала работы нам нужен user_token из vk. Мне понравилась видеоинструкция здесь, коротко и по делу.

Токен держим в секрете. Переходим в https://dev.vk.com изучаем документацию API.

Прямо на сайте документации можем попробовать дернуть запрос.

Для этого нам нужно access_token, domain, count, v, filter.

access_token – получили на прошлом шаге. domain – название группы вы увидите в url название группы например https://vk.com/adminsclub. count – количество постов которые можем дернуть. v – версия api. filter – хотим получить только посты от группы устанавливаем owner.

vk можно попробовать, как работает сам метод.

Прописываем логику сбора

Импортируем библиотеку requests. Дергаем тестовый запрос. Поcле анализа структуры решаю, что мне нужен раздел items

# переменные TOKEN_USER = #ваш токен VERSION = #версися api vk DOMAIN = #ваш domain # через api vk вызываем статистику постов response = requests.get('https://api.vk.com/method/wall.get', params=) data = response.json()['response']['items']

Отдельное поле в статистики количество фотографий для поста, я не нашел.

Через цикл перебираем каждый пост и считаем количество фото, если фотографии нет скрипт ловит ошибку. Обрабатываем ошибку и ставим 0. Собираем новый список с полями id поста и количество фото.

Пишем обработчик. Вызываем pandas

# считаем сколько фото у поста, заводи все в df id = [] photo = [] for post in data: id.append(post['id']) try: photo.append(len(post['attachments'])) except: photo.append(0) df_photo = pd.DataFrame( )

Переводим cловарь в df. Импортируем метод from pandas import json_normalize

Оставляем нужные атрибуты и переводим дату в другой формат.

В переменной post_id запихиваем id наших постов.

Я бы хотел обогатить свою статистику более расширенными измерениями

Из документации по api о которой рассказывал выше подобрал метод status.getPostReach

В методе обнаружил новый аргумент owner_id, его можно найти в настройках группы.

Делаем еще один запрос и новые данные сохраняем в датафрейм df_stat_post

# вытаскиваем нужные нам столбцы и переводим формат даты df = json_normalize(data) df = df[['id','date','comments.count','likes.count','reposts.count','reposts.wall_count','reposts.mail_count','views.count','text']] df['date']= [datetime.fromtimestamp(df['date'][i]) for i in range(len(df['date']))] # для каждого поста вытаскиваем дополнительную статистику post_id = ','.join(df['id'].astype("str")) response = requests.get('https://api.vk.com/method/stats.getPostReach', params=) data = response.json()['response'] df_stat_post = json_normalize(data)

Теперь приступим к сборке объединяем все наши датафреймы, накидываем дополнительные метрики.

Далее наши данные преобразовываем для загрузки в гугл таблицу.

# объединяем все df cо всеми статистиками и количествам фото df_final = df.merge(df_stat_post, how='left', left_on='id', right_on="post_id") df_final = df_final.merge(df_photo, how='left', left_on='id', right_on="id") df_final.drop(columns='post_id',inplace=True) # добавляем дополнительные столбцы с временем df_final['date_time_report'] = datetime.now() df_final['date_report'] = date.today() df_final['year'] = df_final['date_time_report'].dt.year df_final['month'] = df_final['date_time_report'].dt.month df_final['day'] = df_final['date_time_report'].dt.day df_final['hour'] = df_final['date_time_report'].dt.hour df_final['minute'] = df_final['date_time_report'].dt.minute df_final[['date','date_report','date_time_report']] = df_final[['date','date_report','date_time_report']].astype('str') # сохраняем все значения data_list = df_final.values.tolist()

Грузим в google sheet через api

Есть готовые библиотеки для работы с google sheet например pygsheets, но мне было важно поработать с API поэтому легких путей не искал.

Прежде чем загрузить надо настроить наш api прекрасная статья, в который пошагово написано и даст возможность поиграться с листами https://habr.com/ru/post/483302/

# подключаемся к гугл таблице CREDENTIALS_FILE = # Имя файла с закрытым ключом, вы должны подставить свое # Читаем ключи из файла credentials = ServiceAccountCredentials.from_json_keyfile_name(CREDENTIALS_FILE, ['https://www.googleapis.com/auth/spreadsheets', 'https://www.googleapis.com/auth/drive']) httpAuth = credentials.authorize(httplib2.Http()) # Авторизуемся в системе service = apiclient.discovery.build('sheets', 'v4', http = httpAuth) # Выбираем работу с таблицами и 4 версию API spreadsheetId = # ваш id лист

После подключения к листу. Находим последнюю заполненную строку.

В моем примере я заполняю последние 10 строк ровно по количеству постов которые я получил из get запроса. Подготавливаем шаблон для запроса, заполняем шаблон данными какие ячейки заполняем и заполняем. Далее выполняем запрос. Готово

# находим последнию строку заполненную response = service.spreadsheets().values().get(spreadsheetId = spreadsheetId,range="Лист номер один!A1:A").execute() # последние 10 строк заполняем number_sheet = "Лист номер один!A" + str(len(response['values'])+1) + ':AA' + str(len(response['values'])+10) # создаем запрос и вставляем туда данные data_vk = < "valueInputOption": "USER_ENTERED", # Данные воспринимаются, как вводимые пользователем (считается значение формул) "data": [ ] > data_vk['data'][0]['range'] = number_sheet data_vk['data'][0]['values'] = data_list # выполняем запрос results = service.spreadsheets().values().batchUpdate(spreadsheetId = spreadsheetId, body = data_vk).execute()

Заключение

После написания этого кода мне требовалось запускать его каждый час и принял решение арендовать сервер, установить туда docker и через crontab запускать.

Парсинг целевой аудитории ВКонтакте

При размещении рекламы некоторые площадки в настройках аудитории позволяют загрузить список конкретных людей, которые увидят рекламу. Для парсинга id по конкретным пабликам существуют специальные инструменты, но куда интереснее (и дешевле) сделать это собственноручно при помощи Python и VK API. Сегодня расскажем, как для рекламной кампании LEFTJOIN мы спарсили целевую аудиторию и загрузили её в рекламный кабинет.

В материале «Собираем данные по рекламным кампаниям ВКонтакте» подробно описан процесс получения токена пользователя для VK API

Парсинг пользователей

Для отправки запросов потребуется токен пользователя и список пабликов, чьих участников мы хотим получить. Мы собрали около 30 сообществ, посвящённых аналитике, BI-инструментам и Data Science.

import requests import time group_list = ['datacampus', '185023286', 'data_mining_in_action', '223456', '187222444', 'nta_ds_ai', 'business__intelligence', 'club1981711', 'datascience', 'ozonmasters', 'businessanalysts', 'datamining.team', 'club.shad', '174278716', 'sqlex', 'sql_helper', 'odssib', 'sapbi', 'sql_learn', 'hsespbcareer', 'smartdata', 'pomoshch_s_spss', 'dwhexpert', 'k0d_ds', 'sql_ex_ru', 'datascience_ai', 'data_club', 'mashinnoe_obuchenie_ai_big_data', 'womeninbigdata', 'introstats', 'smartdata', 'data_mining_in_action', 'dlschool_mipt'] token = 'ваш_токен'

Запрос на получение участников сообщества к API ВКонтакте вернёт максимум 1000 строк — для получения последующих тысяч потребуется смещать параметр offset на единицу. Но нужно знать, до какого момента это делать — поэтому опишем функцию, которая принимает id сообщества, получает информацию о числе участников сообщества и возвращает максимальное значение для offset — отношение числа участников к 1000, ведь мы можем получить ровно тысячу человек за раз.

def get_offset(group_id): count = requests.get('https://api.vk.com/method/groups.getMembers', params=< 'access_token':token, 'v':5.103, 'group_id': group_id, 'sort':'id_desc', 'offset':0, 'fields':'last_seen' >).json()['response']['count'] return count // 1000

Следующим этапом опишем функцию, которая принимает id сообщества, собирает в один список id всех подписчиков и возвращает его. Для этого отправляем запросы на получение 1000 человек, пока не кончается offset, вносим данные в список и возвращаем его. Проходя по каждому человеку дополнительно проверяем дату его последнего посещения социальной сети — если он не заходил с середины ноября, добавлять его не будем. Время указывается в формате unixtime.

def get_users(group_id): good_id_list = [] offset = 0 max_offset = get_offset(group_id) while offset < max_offset: response = requests.get('https://api.vk.com/method/groups.getMembers', params=< 'access_token':token, 'v':5.103, 'group_id': group_id, 'sort':'id_desc', 'offset':offset, 'fields':'last_seen' >).json()['response'] offset += 1 for item in response['items']: try: if item['last_seen']['time'] >= 1605571200: good_id_list.append(item['id']) except Exception as E: continue return good_id_list

Теперь пройдём по всем сообществам из списка и для каждого соберём участников, а затем внесём их в общий список all_users. В конце переводим сначала список в множество, а затем опять в список, чтобы избавиться от возможных дубликатов: одни и те же люди могли быть участниками разных пабликов. Лишним не будет после каждого паблика приостановить работу программы на секунду, чтобы не столкнуться с ограничениями на число запросов.

all_users = [] for group in group_list: print(group) try: users = get_users(group) all_users.extend(users) time.sleep(1) except KeyError as E: print(group, E) continue all_users = list(set(all_users))

Последним шагом записываем каждого пользователя в файл с новой строки.

with open('users.txt', 'w') as f: for item in all_users: f.write("%s\n" % item)

Аудитория в рекламном кабинете из файла

Переходим в свой рекламный кабинет ВКонтакте и заходим во вкладку «Ретаргетинг». Там будем кнопка «Создать аудиторию»:

После нажатия на неё откроется новое окно, где можно будет выбрать в качестве источника файл и указать название для аудитории:

После загрузки пройдёт несколько секунд и аудитория будет доступна. Первые минут 10 будет указано, что аудитория слишком мала: это не так и панель вскоре обновится, если в вашей аудитории действительно более 100 человек.

Итоги

Сравним среднюю стоимость привлечённого в наше сообщество участника в объявлении с автоматической настройкой аудитории и в объявлении, аудиторию для которого мы спарсили. В первом случае получаем среднюю стоимость в 52,4 рубля, а во втором — в 33,2 рубля. Подбор качественной аудитории при помощи методов парсинга данных из ВКонтакте помог снизить среднюю стоимость на 37%.

Для рекламной кампании мы подготовили такой пост (нажмите на картинку, чтобы перейти к нему):

Пишем простой парсер ВК на Python

Парсер — программа, которая по заданному алгоритму собирает нужную информацию на сайте. Парсеры часто применяют в маркетинговых целях, чтобы собрать базу для таргетированной рекламы или спам рассылки. Наш парсер будет способен формировать базу участников выбранного сообщества, сохранять базы в txt файле, высчитывать процент пересечения двух баз, объединять две базы в одну без повторов, а также будет доступен ввод сохраненной базы из txt файла.

Подготовка

Для взаимодействия с API Вконтакте мы будем использовать модуль vk. Установить vk можно через pip с помощью команды: pip install vk

Также, чтобы получить сервисный ключ доступа, нам потребуется создать приложение на сайте Вконтакте. Это можно сделать по этой ссылке.

Пишем код

Первым делом давайте импортируем модуль vk, обозначим имена наших будущих функций и напишем авторизацию.

Проблема заключается в том, что этот метод может возвратить нам максимум 1000 id’шников. Но благодаря параметру offset, который отвечает за смещение от начала полного списка участников(по умолчанию 0), и циклу for, мы можем получить список всех участников сообщества.

На этом все! При желании, немного изменив код, можно собирать больше информации о пользователях.

Статья Загружаем видео из ВК с помощью Python

Загрузка видео на локальный диск, это дело хорошее. Потому, что у любого контента в интернете есть не очень хорошее свойство — рано или поздно он попросту теряется, либо его удаляют. Я немного покопался в коде страничек ВК и сделал небольшой скрипт, который загружает видео. А использовал я для этого Python.

kak-skatshat-video-vk_1588839602-1280x640.jpg

Что потребуется?

Для корректной работы скрипта нужно установить библиотеку requests, чтобы можно было выполнять запросы к странице и загружать видео. Также, для того, чтобы парсить содержимое полученных страниц нужно установить библиотеки BeautifulSoup и lxml. А для того, чтобы немного раскрасить вывод в терминале, установим библиотеку colorama. Для установки данных библиотек пишем в терминале команду:

pip install requests bs4 lxml colorama

После того, как необходимые библиотеки установятся, нужно импортировать их в скрипт. А также выполнить импорт библиотек, которые уже предустановлены вместе с python и также нужны для работы. Инициализируем colorama.

import os import re import shutil import sys import requests from bs4 import BeautifulSoup from colorama import Fore from colorama import init init()

Создадим заголовки для запросов. Для того, чтобы хотя бы немного походить на обычный браузер.

headers = < 'user-agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/105.0.0.0 ' 'Safari/537.36', 'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,' 'application/signed-exchange;v=b3;q=0.9 ' >

Вспомогательный скрипт замены символов

Для того, чтобы сохранить видео на диск, нужно получить его название. А вот с названием не всегда все бывает хорошо. Иногда в нем встречаются символы в виде «/», «?» и прочих, которые не дадут сохранить видео и вызовут исключение при сохранении файла. Вот для этого нужно сделать функцию, чтобы все эти символы по возможности заменить.

Создадим функцию rep_symbol(text: str). На вход она получает текст, в котором нужно произвести замену символов. После чего, с помощью функции replace выполним замену и возвратим уже очищенный текст из функции.

def rep_symbol(text: str): """ Функция выполняет замену символов в названии, которые могут помешать сохранению файла на диск. :param text: текст для замены символов. :return: модифицированный текст """ tex = text.replace("'", "").replace('"', '').replace('|', '_').replace(' | ', '_').replace('/', '_'). \ replace('\\', '_').replace('*', '_').replace('?', '').replace('', '_').replace(':', ''). \ replace(';', '').replace('.', '').replace(' ', '_').replace(')', '').replace('(', '').strip() return tex

Давайте двигаться дальше. Вспомогательная функция создана и теперь начнем создавать функции в порядке их использования.

Получение ссылки на плейлист со ссылками на фрагменты

Создадим функцию get_m3u8(url). На вход она получает ссылку на страницу с видео. Создадим словарь, в который будем складывать все найденные разрешения и ссылки на них. Выполним запрос к странице с видео, после чего передадим полученный код в BeautifulSoup, для парсинга. И сразу же найдем название видео, которое отправим в функцию замены символов. Забыл упомянуть ранее, что функция замены символов также заменяет пробелы на «_». Это необходимо для того, чтобы ffmpeg мог корректно обработать видео. Так как он не понимает русский язык да еще и с пробелами.

 res_m3u8 = dict() req_m3u8 = requests.get(url=url, headers=headers) soup = BeautifulSoup(req_m3u8.text, 'lxml') title = rep_symbol(soup.find('h1', class_='VideoPageInfoRow__title').text)

Теперь обернем весь остальной код в блок try-except. Это необходимо для обработки ошибки, когда не будет найден тег со ссылками на плейлисты. Такое случается, если попытаться скачать стрим. К сожалению, ссылок на фрагменты видео и плейлисты я не нашел, чтобы его скачивать.

Находим и забираем со страницы ссылку на плейлист, в котором содержаться ссылки на плейлисты видео с разным разрешением. Скачиваем его, так как это простой текст и разбиваем на строки.

 m3u8_link = soup.find('video', class_='vv_inline_video').find('source').get('src') m3u8_text = requests.get(url=m3u8_link, headers=headers).text.splitlines()

Запускаем цикл от 0 значения длины получившегося списка строк. Обрабатываем каждую строку. Проверяем, есть ли в начале строки текст «#EXT-X-STREAM», так как в этой строке содержится разрешение видео. С помощью регулярного выражения «r’RESOLUTION=*\w*’» находим строку с разрешением, разбиваем ее по «=», забираем 1 элемент, разбиваем его по «х» и забираем 0 элемент. После чего обновляем словарь, добавив в него разрешение и ссылку на плейлист с этим разрешением.

 m3u8_link = soup.find('video', class_='vv_inline_video').find('source').get('src') m3u8_text = requests.get(url=m3u8_link, headers=headers).text.splitlines() for num in range(0, len(m3u8_text)): if m3u8_text[num].startswith("#EXT-X-STREAM"): res_video = re.findall(r'RESOLUTION=*\w*', m3u8_text[num])[0].split("=")[1].split("x")[0] res_m3u8.update()

Создадим список, в который сложим разрешения из словаря, предварительно переведя их в int. Это будет необходимо для их сортировки, чтобы получить максимальное разрешение, которое доступно для данного видео. Для этого пробегаемся в цикле по ключам словаря, и добавляем их в список обернув в int. Затем печатаем сообщения в терминал. Возвращаем из функции ссылку на максимальное значение отсортированного списка и название видео.

 list_res = [] for res in res_m3u8.keys(): list_res.append(int(res)) print(Fore.GREEN + f'[+] Найдено видео: ""') print(Fore.GREEN + f'[+] Доступно для загрузки качество: ') return res_m3u8[str(sorted(list_res)[-1])], title

Код функции получения ссылки на плейлист со ссылками

def get_m3u8(url): """ Выполняем запрос к странице с видео. Получаем код со страницы. Ищем название видео, заменяем все знаки, которые могут помешать сохранить видео с этим названием и сохраняем значение в переменной title. Получаем ссылку на плейлист - m3u8_link, в котором содержаться ссылки на плейлисты с разным разрешением. Забираем все разрешения и ссылки из плейлиста. Добавляем в словарь - res_m3u8. В цикле забираем из словаря все ключи, которыми являются разрешения видео, добавляем в список list_res переведя в int. Возвращаем из функции ссылку на плейлист видео с самым большим разрешением, которую получаем путем сортировки списка с разрешениями, откуда забираем последний элемент и получаем ссылку из словаря. Также возвращаем название видео. :param url: ссылка на страницу с видео. :return: Возвращает ссылку на видео с самым большим разрешением и название. """ res_m3u8 = dict() req_m3u8 = requests.get(url=url, headers=headers) soup = BeautifulSoup(req_m3u8.text, 'lxml') title = rep_symbol(soup.find('h1', class_='VideoPageInfoRow__title').text) try: m3u8_link = soup.find('video', class_='vv_inline_video').find('source').get('src') m3u8_text = requests.get(url=m3u8_link, headers=headers).text.splitlines() for num in range(0, len(m3u8_text)): if m3u8_text[num].startswith("#EXT-X-STREAM"): res_video = re.findall(r'RESOLUTION=*\w*', m3u8_text[num])[0].split("=")[1].split("x")[0] res_m3u8.update() list_res = [] for res in res_m3u8.keys(): list_res.append(int(res)) print(Fore.GREEN + f'[+] Найдено видео: ""') print(Fore.GREEN + f'[+] Доступно для загрузки качество: ') return res_m3u8[str(sorted(list_res)[-1])], title except AttributeError: return

Получение ссылок на фрагменты видео

Теперь необходимо получить ссылки на фрагменты видео, которые мы впоследствии будем загружать. Для этого создадим функцию get_chunk_link(res_max_link), которая на входе принимает ссылку на плейлист с фрагментами максимального разрешения.
Создадим список, в который будем складывать полученные ссылки на фрагменты. Этот список мы будем возвращать из функции. Скачиваем плейлист с названиями фрагментов и так как это текст, делим его на строки.

 links_chunk = [] m3u8_chunk = requests.get(url=res_max_link, headers=headers).text.splitlines()

Пробегаемся по списку строк в цикле. Проверяем, не является ли начало строки «#». Если нет, делаем следующую проверку, нет ли в строке знака вопроса. Если знак вопроса есть, значит ссылка на фрагмент содержится в самом плейлисте. Если знака вопроса нет, значит ссылкой на фрагмент будет ссылка на плейлист. Но, так как нам нужна не вся ссылка, а только часть до «index», разбиваем строку по «/». И проверяем получившийся список в цикле. Если искомого слова нет в элементе списка, добавляем его к формируемой ссылке. Если есть, не добавляем. А так как это и есть последний элемент списка, после этого выходим из цикла. После этого формируем ссылку и добавляем в список. Если вопроса нет, тогда просто формируем ссылку из ссылки на плейлист и названия фрагмента. После этого, возвращаем получившийся словарь из функции.

 for line in m3u8_chunk: if not line.startswith("#"): if "?" in line: res_max = '' for n in res_max_link.split("/"): if not n.startswith("index"): res_max = res_max + n + "/" line = line.split("?")[0] links_chunk.append(f'') else: links_chunk.append(f'') return links_chunk

Код функции получения ссылок на фрагменты видео

def get_chunk_link(res_max_link): """ Загружаем плейлист со списком фрагментов и разбиваем его построчно - m3u8_chunk. В цикле пробегаемся по каждой строке. Забираем названия фрагментов и формируем ссылки на видео, которые добавляем в список. :param res_max_link: ссылка на плейлист с видео. :return: возвращает сформированный список ссылок на фрагменты видео. """ links_chunk = [] m3u8_chunk = requests.get(url=res_max_link, headers=headers).text.splitlines() for line in m3u8_chunk: if not line.startswith("#"): if "?" in line: res_max = '' for n in res_max_link.split("/"): if not n.startswith("index"): res_max = res_max + n + "/" line = line.split("?")[0] links_chunk.append(f'') else: links_chunk.append(f'') return links_chunk

Скачивание фрагментов видео

Для скачивания фрагментов видео я создал отдельную функцию и назвал ее chunk_download(links_chunk, title). На вход она получает список со ссылками на фрагменты и название видео, которое нужно будет для создания папки, в которую будут загружаться фрагменты видео.
Формируем путь к директории, в которую будут загружаться фрагменты видео. Создаем папки, нужные для загрузки.

 dir_vid = os.path.join(os.getcwd(), 'video', title) if not os.path.exists(os.path.join(os.getcwd(), 'video')): os.mkdir(os.path.join(os.getcwd(), 'video')) if not os.path.exists(dir_vid): os.mkdir(dir_vid)

Создадим список, в который будем складывать полный путь к загруженным фрагмента видео. Это необходимо для того, чтобы впоследствии удалить загруженные фрагменты. Выводим сообщение о загрузке в терминал.

 path_list = [] print(Fore.GREEN + f'\n[+] Загрузка фрагментов ():')

Устанавливаем длину прогресс-бара, которая будет равна количеству ссылок на фрагменты. Выводим в консоль сообщение с «[» + пробелы, количество которых равно установленной длине прогресс-бара. Очищаем буфер. Возвращаем каретку в начало строки + 1 символ. Для того, чтобы выводить прогресс загрузки.

 toolbar_width = len(links_chunk) sys.stdout.write("[%s]" % (" " * toolbar_width)) sys.stdout.flush() sys.stdout.write("\b" * (toolbar_width + 1))

Создаем цикл, в котором пробегаемся по каждой ссылке из словаря ссылок к фрагментам видео. Выводим в консоль текст, очищаем буфер. Текст будет равен индексу ссылки на фрагмент + 1, который был получен с помощью функции enumerate. Формируем название для загружаемого фрагмента. Название я решил сделать одинаковое для всех, так как в каждом плейлисте разные названия и не всегда получается легко достать это название оттуда. А так, одно, универсальное название для всех.

 for num, chunk in enumerate(links_chunk): sys.stdout.write(f".") sys.stdout.flush() chunk_name = f'seg.ts'

Скачиваем фрагмент с помощью requests, после чего сохраняем его на диск присвоив ему сформированное имя. В список путей к загруженным фрагментам видео добавляем путь.

 chunk_d = requests.get(url=chunk, headers=headers).content with open(os.path.join(dir_vid, chunk_name), 'wb') as chunk_f: path_list.append(os.path.join(dir_vid, chunk_name)) chunk_f.write(chunk_d)

После того, как последний фрагмент будет загружен, печатаем завершающее сообщение в консоли, выводим сообщение, что загрузка завершена и возвращаем из функции список с путями к загруженным фрагментам и путь к директории, в которую загружали видео.

 sys.stdout.write("end]") print(Fore.CYAN + '\n[+] Загрузка фрагментов завершена') return path_list, dir_vid

Объединение фрагментов в один и конвертация объединенного фрагмента в mp4

После того, как все фрагменты видео будут загружены, все их нужно объединить в один фрагмент для конвертации. Создадим функцию chunk_merge(path_list, dir_vid, title), которая на входе получает список с путями к загруженным фрагментам, путь к папке в которую загружены фрагменты, и название видео.

Выводим в терминал сообщение для пользователя о конвертации. Создаем в той же папке, куда загружали фрагменты файл с расширением «.ts», в который будем записывать объединенные фрагменты и откроем его для записи в побайтовом режиме. Затем в цикле пробежимся по списку ссылок на фрагменты, откроем каждый из фрагментов в побайтовом режиме и добавим его содержимое с помощью функции shutil.copyfileobj к записываемому объединенному файлу.

После того, как цикл завершит работу и объединенный фрагменты будут сохранены с названием видео, которое мы получили на входе функции, запускаем команду для конвертации видео из формата «.ts» в формат mp4 с помощью ffmpeg. В команду, которую нужно выполнить передаем путь к объединенным фрагментам и путь с названием видео, по которому нужно будет сохранить конвертированное видео.

 print(Fore.YELLOW + "\n[+] Конвертация") with open(os.path.join(dir_vid, f'.ts'), 'wb') as merged: for cnk in path_list: with open(cnk, 'rb') as mergefile: shutil.copyfileobj(mergefile, merged) os.system(f"ffmpeg -i .ts .mp4 2> /dev/null")

После того, как закончиться конвертация видео, нужно удалить как загруженные фрагменты, так и объединенный файл. Поэтому в цикле пробегаемся по списку с путями к фрагментам и удаляем их с помощью функции os.remove. После того, как цикл выполниться, удаляем объединенный файл.

 for it_ch in path_list: os.remove(it_ch) os.remove(f'.ts') print(Fore.YELLOW + "[+] Конвертация завершена")

Код функции объединения и конвертации видео

def chunk_merge(path_list, dir_vid, title): """ Открываем каждый файл побайтово, и с помощью shutil.copyfileobj копируем открываемые файлы в файл для записи. После чего, записываем объединенные фрагменты с названием видео. Выполняем команду конвертации видео с помощью ffmpeg, вывод убираем в null. После того как конвертация видео будет завершена, удаляем фрагменты, список путей к которым мы передали в функцию, а также удаляем объединенный файл с расширением .ts :param path_list: список ссылок на загруженные фрагменты видео. :param dir_vid: директория загрузки видео. :param title: название видео. """ print(Fore.YELLOW + "\n[+] Конвертация") with open(os.path.join(dir_vid, f'.ts'), 'wb') as merged: for cnk in path_list: with open(cnk, 'rb') as mergefile: shutil.copyfileobj(mergefile, merged) os.system(f"ffmpeg -i .ts .mp4 2> /dev/null") for it_ch in path_list: os.remove(it_ch) os.remove(f'.ts') print(Fore.YELLOW + "[+] Конвертация завершена")

Функция для пользовательского ввода и запуска функций

Создадим еще одну функцию, которую назовем main(). В данной функции мы будем запрашивать у пользователя ссылку на страницу с видео. В данном случае, так как наш скрипт работает со ссылкой на мобильную версию, нужно будет сделать ряд проверок. Но вводить ссылки можно как к обычной версии ВК, так и к мобильной.

Запросим у пользователя ссылку. Проверим, является ли ссылка ссылкой на ВК. Затем выполним проверку, нет ли в ссылке, которую ввел пользователь слова «pleylist», так как скрипт не умеет работать с плейлистами. А работает только с отдельными видео. Затем проверяем, не является ли введенная ссылка, ссылкой на мобильную версию ВК. Если да, присваиваем переменной url значение, которое ввел пользователь. Если же пользователем была введена обычная ссылка, тогда разберем ее на части и соберем снова, добавив к адресу «m», что и будет ссылкой на мобильную версию.

 user_input = input('Введите ссылку на страницу с видео: ') if 'vk.com/video' not in user_input: print(Fore.RED + '[-] Ссылка неверна.') return if 'playlist' in user_input: print(Fore.RED + '[-] Вы ввели ссылку на плейлист.') return if 'https://m.vk.com/' in user_input: url = user_input else: url = f'//m./'

И теперь осталось запускать функции по порядку. Для начала, функцию для получения ссылки на плейлист со ссылками на плейлисты. Затем функцию для получения ссылок на фрагменты, после запускаем функцию загрузки фрагментов видео, и в самом конце функцию объединения и конвертации видео.

Обернем код запуска функций в блок try-except, так как иногда отлавливается исключение, когда не получается найти ссылок на видео. Такое у меня было, пока, только со ссылками на стрим.

 try: res_max_link, title = get_m3u8(url) links_chunk = get_chunk_link(res_max_link) path_list, dir_vid = chunk_download(links_chunk, title) chunk_merge(path_list, dir_vid, title) except TypeError: print(Fore.RED + '[-] Не удалось получить ссылки на фрагменты.') return

Код функции main()

def main(): """ Запрашиваем у пользователя ссылку на видео. Проверяем, является ли ссылка, ссылкой на ВК. Проверяем, не является ли ссылка ссылкой на плейлист. Проверяем, не ввел ли пользователь ссылку на страницу для мобильных телефонов. Запускаем функцию получения ссылки на плейлист видео и заголовок. Запускаем функцию получения ссылок на фрагменты видео. Запускаем функцию скачивания видео. Запускаем функцию конвертации из ts в mp4. :return: выход из функции. """ user_input = input('Введите ссылку на страницу с видео: ') if 'vk.com/video' not in user_input: print(Fore.RED + '[-] Ссылка неверна.') return if 'playlist' in user_input: print(Fore.RED + '[-] Вы ввели ссылку на плейлист.') return if 'https://m.vk.com/' in user_input: url = user_input else: url = f'//m./' try: res_max_link, title = get_m3u8(url) links_chunk = get_chunk_link(res_max_link) path_list, dir_vid = chunk_download(links_chunk, title) chunk_merge(path_list, dir_vid, title) except TypeError: print(Fore.RED + '[-] Не удалось получить ссылки на фрагменты.') return

Таким вот способом, с помощью обычного парсинга мы смогли загрузить видео. На самом деле, я и раньше загружал видео из ВК при большой необходимости. И пользовался для этого сторонними сервисами, такими, как SaveFromNet. Но, почему-то, загрузка видео из ВК для этого сервиса прямо испытание. Очень медленно грузит. С другой стороны, если видео большое, то и скрипт на питоне будет так же долго подгружать фрагменты, а потом еще время уйдет на конвертацию. Тем не менее, я хотя бы понимаю, что происходит в данный момент.

В процессе написания данного скрипта я понял, как осуществляется доставка видео до конечного пользователя из ВК. А теперь, вот полный код скрипта загрузки видео:

Полный код скрипта загрузки видео из ВК

""" Скрипт для загрузки видео из ВК. Загружает почти любое видео, проблемы при загрузке стримов. На них он не находит ссылок. Для того чтобы данный скрипт работал, нужно установить следующие библиотеки: pip install requests bs4 lxml colorama """ import os import re import shutil import sys import requests from bs4 import BeautifulSoup from colorama import Fore from colorama import init init() headers = < 'user-agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/105.0.0.0 ' 'Safari/537.36', 'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,' 'application/signed-exchange;v=b3;q=0.9 ' >def rep_symbol(text: str): """ Функция выполняет замену символов в названии, которые могут помешать сохранению файла на диск. :param text: текст для замены символов. :return: модифицированный текст """ tex = text.replace("'", "").replace('"', '').replace('|', '_').replace(' | ', '_').replace('/', '_'). \ replace('\\', '_').replace('*', '_').replace('?', '').replace('', '_').replace(':', ''). \ replace(';', '').replace('.', '').replace(' ', '_').replace(')', '').replace('(', '').strip() return tex def get_m3u8(url): """ Выполняем запрос к странице с видео. Получаем код со страницы. Ищем название видео, заменяем все знаки, которые могут помешать сохранить видео с этим названием и сохраняем значение в переменной title. Получаем ссылку на плейлист - m3u8_link, в котором содержаться ссылки на плейлисты с разным разрешением. Забираем все разрешения и ссылки из плейлиста. Добавляем в словарь - res_m3u8. В цикле забираем из словаря все ключи, которыми являются разрешения видео, добавляем в список list_res переведя в int. Возвращаем из функции ссылку на плейлист видео с самым большим разрешением, которую получаем путем сортировки списка с разрешениями, откуда забираем последний элемент и получаем ссылку из словаря. Также возвращаем название видео. :param url: ссылка на страницу с видео. :return: Возвращает ссылку на видео с самым большим разрешением и название. """ res_m3u8 = dict() req_m3u8 = requests.get(url=url, headers=headers) soup = BeautifulSoup(req_m3u8.text, 'lxml') title = rep_symbol(soup.find('h1', class_='VideoPageInfoRow__title').text) try: m3u8_link = soup.find('video', class_='vv_inline_video').find('source').get('src') m3u8_text = requests.get(url=m3u8_link, headers=headers).text.splitlines() for num in range(0, len(m3u8_text)): if m3u8_text[num].startswith("#EXT-X-STREAM"): res_video = re.findall(r'RESOLUTION=*\w*', m3u8_text[num])[0].split("=")[1].split("x")[0] res_m3u8.update() list_res = [] for res in res_m3u8.keys(): list_res.append(int(res)) print(Fore.GREEN + f'[+] Найдено видео: ""') print(Fore.GREEN + f'[+] Доступно для загрузки качество: ') return res_m3u8[str(sorted(list_res)[-1])], title except AttributeError: return def get_chunk_link(res_max_link): """ Загружаем плейлист со списком фрагментов и разбиваем его построчно - m3u8_chunk. В цикле пробегаемся по каждой строке. Забираем названия фрагментов и формируем ссылки на видео, которые добавляем в список. :param res_max_link: ссылка на плейлист с видео. :return: возвращает сформированный список ссылок на фрагменты видео. """ links_chunk = [] m3u8_chunk = requests.get(url=res_max_link, headers=headers).text.splitlines() for line in m3u8_chunk: if not line.startswith("#"): if "?" in line: res_max = '' for n in res_max_link.split("/"): if not n.startswith("index"): res_max = res_max + n + "/" line = line.split("?")[0] links_chunk.append(f'') else: links_chunk.append(f'') return links_chunk def chunk_download(links_chunk, title): """ Создаем папки, если они не существуют, в которые будем загружать фрагменты видео. Для загрузки фрагментов создаем папку с названием видео. Устанавливаем значение для отображения прогресс-бара - toolbar_width. Печатаем в консоли "[" и очищаем буфер. После чего возвращаем каретку к началу строки, плюс один символ. Перебираем все ссылки в списке и выполняем загрузку каждого фрагмента. В консоль выводим цифры соответствующие загрузке определенного фрагмента. Сохраняем фрагменты в папку с названием видео. :param links_chunk: список ссылок на фрагменты видео. :param title: название видео. :return: список путей к загруженным фрагментам, путь к папке с фрагментами. """ dir_vid = os.path.join(os.getcwd(), 'video', title) if not os.path.exists(os.path.join(os.getcwd(), 'video')): os.mkdir(os.path.join(os.getcwd(), 'video')) if not os.path.exists(dir_vid): os.mkdir(dir_vid) path_list = [] print(Fore.GREEN + f'\n[+] Загрузка фрагментов ():') toolbar_width = len(links_chunk) sys.stdout.write("[%s]" % (" " * toolbar_width)) sys.stdout.flush() sys.stdout.write("\b" * (toolbar_width + 1)) for num, chunk in enumerate(links_chunk): sys.stdout.write(f".") sys.stdout.flush() chunk_name = f'seg.ts' chunk_d = requests.get(url=chunk, headers=headers).content with open(os.path.join(dir_vid, chunk_name), 'wb') as chunk_f: path_list.append(os.path.join(dir_vid, chunk_name)) chunk_f.write(chunk_d) sys.stdout.write("end]") print(Fore.CYAN + '\n[+] Загрузка фрагментов завершена') return path_list, dir_vid def chunk_merge(path_list, dir_vid, title): """ Открываем каждый файл побайтово, и с помощью shutil.copyfileobj копируем открываемые файлы в файл для записи. После чего, записываем объединенные фрагменты с названием видео. Выполняем команду конвертации видео с помощью ffmpeg, вывод убираем в null. После того как конвертация видео будет завершена, удаляем фрагменты, список путей к которым мы передали в функцию, а также удаляем объединенный файл с расширением .ts :param path_list: список ссылок на загруженные фрагменты видео. :param dir_vid: директория загрузки видео. :param title: название видео. """ print(Fore.YELLOW + "\n[+] Конвертация") with open(os.path.join(dir_vid, f'.ts'), 'wb') as merged: for cnk in path_list: with open(cnk, 'rb') as mergefile: shutil.copyfileobj(mergefile, merged) os.system(f"ffmpeg -i .ts .mp4 2> /dev/null") for it_ch in path_list: os.remove(it_ch) os.remove(f'.ts') print(Fore.YELLOW + "[+] Конвертация завершена") def main(): """ Запрашиваем у пользователя ссылку на видео. Проверяем, является ли ссылка, ссылкой на ВК. Проверяем, не является ли ссылка ссылкой на плейлист. Проверяем, не ввел ли пользователь ссылку на страницу для мобильных телефонов. Запускаем функцию получения ссылки на плейлист видео и заголовок. Запускаем функцию получения ссылок на фрагменты видео. Запускаем функцию скачивания видео. Запускаем функцию конвертации из ts в mp4. :return: выход из функции. """ user_input = input('Введите ссылку на страницу с видео: ') if 'vk.com/video' not in user_input: print(Fore.RED + '[-] Ссылка неверна.') return if 'playlist' in user_input: print(Fore.RED + '[-] Вы ввели ссылку на плейлист.') return if 'https://m.vk.com/' in user_input: url = user_input else: url = f'//m./' try: res_max_link, title = get_m3u8(url) links_chunk = get_chunk_link(res_max_link) path_list, dir_vid = chunk_download(links_chunk, title) chunk_merge(path_list, dir_vid, title) except TypeError: print(Fore.RED + '[-] Не удалось получить ссылки на фрагменты.') return if __name__ == "__main__": main()

А на этом, пожалуй, все.

Спасибо за внимание. Надеюсь, данная информация будет вам полезна

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *