Кабинет разработчика в VK
Документация по API. Будем использовать Ads и Wall
Статья про OAuth - авторизацию, которую использует VK и другие.
Нам нужно получить статистику в разрезе UTM меток, чтобы можно было сопоставить с Google Analytics / Yandex Metrika. Сразу скажу, что ссылок в статистике нет и придется совершить путешествие, чтобы найти ссылки и достать из них utm_ метки или шаблон, по которому они строятся.
Шаги:
Установим и импортируем набор полезных функций для вывода данных. Любопытных прошу пройти по адресу https://github.com/madiedinro/rodin_helpers_py, не забудьте звездочку постаить, или не пользуйтесь тогда ;)
!pip install -U git+https://github.com/madiedinro/rodin_helpers_py -q
import rodin_helpers as rh
Импортируем необходимые зависимости
import requests
import os
import json
import arrow
from collections import defaultdict
from urllib.parse import urlparse, parse_qs, quote
from dotenv import load_dotenv
from pprint import pprint
Подготовим парочку простеньких функций, которые позволят не получать токен каждый раз:
storage_fn = 'vk_data.json'
def save_dict(d):
raw = json.dumps(d)
with open(storage_fn, 'w') as f:
f.write(raw)
# Загрузка данных из файла, если таковой имеется.
# Если файл отсутствует это нормально и ошибки не будет
def read_dict():
try:
with open(storage_fn, 'r') as f:
raw = f.read()
return json.loads(raw)
except:
pass
return {}
Итак, вы получили от VK параметры приложения, подставьте их в код или запишите в файл creds_fn
# версия API, требуется ее знать и передавать в запросах
vk_v = '5.92'
# Параметры приложения
VK_APP_ID = '666777'
VK_APP_SECRET = 'oe0OsaelohSooph4noh1eu'
VK_TOKEN = read_dict().get('access_token')
# Подсказка для самих себя. Дальше поймете
if VK_TOKEN:
print('Токен уже есть, получать не требуется')
# Форматируем даты в формат, требуемый VK: 2019-01-01
d1 = arrow.get('2019-04.15').format('YYYY-MM-DD')
d2 = arrow.get('2019-04-21').format('YYYY-MM-DD')# Формируем объект запроса к API
Требуется пройти по ссылке, она отфутболит на страницу где можно взять код (вообще задумано что автоматом должно получаться). Его обработка не сделана, поэтому просто возьмите его из строки адреса браузера
# пропускаем ячейку, если токен уже есть
from urllib.parse import urlencode
if not VK_TOKEN:
code_params = {
'redirect_uri': 'https://oauth.vk.com/blank.html',
'client_id': VK_APP_ID,
'scope': 'ads,offline',
'response_type': 'code'
}
vk_auth_url = f'https://oauth.vk.com/authorize?' + urlencode(code_params)
print('redirect to:', vk_auth_url)
print('-----------------------')
print(rh.show_link(vk_auth_url))
code = 'a053b1263e7238efd5'
# пропускаем ячейку, если токен уже есть
if not VK_TOKEN:
token_params = {
'client_id': VK_APP_ID,
'client_secret': VK_APP_SECRET,
'code': code,
'redirect_uri': code_params["redirect_uri"],
}
token_url= f'https://oauth.vk.com/access_token?' + urlencode(token_params)
responce = requests.get(token_url)
if responce.status_code == 200:
auth_result = responce.json()
# Сохраняем токен в файл, чтобы в следующий раз не требовалось
# проходить процедуру получения токена через подтверждение кодом
save_dict(auth_result)
VK_TOKEN = auth_result['access_token']
user_id = auth_result['user_id']
# }
# Обрежем токен, чтобы удостовериться, что он есть, но не палить целиком
print('Ваши ключи для доступа сохранены в переменную.', user_id, VK_TOKEN[:20])
# Если код ответа отличается от 200, печатаем сообщение с ошибкой
else:
print('error!', responce.status_code, responce.text)
Запросим информацию о рекламном аккаунте
# Словарь базовых параметров: авторизационный токен и нужная версия API.
# Требуется передавать со всеми запросами.
common_params = {
'access_token': VK_TOKEN,
'v': vk_v
}
vk_accounts_url = 'https://api.vk.com/method/ads.getAccounts'
resp = requests.get(vk_accounts_url, params=common_params)
pprint(resp.json())
Легко, не правда ли?)
API VK всегда выдает инфоммацию в виде {'response': ...
Ф-ция и переменные, которые потребуются для дальнейшего общения с VK
# Шаблон для адресов методов VK (можно узнать в документации,
# ссылки на которую в самом верху)
vk_call_tmpl = 'https://api.vk.com/method/{method}'
# Ставим пометку, нужно ли запрашивать удаленные элементы
vk_include_deleted = 0
# Функция выполнения запроса к VK
def vk_req(method, params=None, data = None, http_method = 'get'):
# мы специально не используем дефолтовые параметры = {},
# там есть подводные камни, которые мы не будем разбирать.
# Просто поверьте.
if params is None:
params = {}
# Что касается http запросов, что такое data, что такое params смело
# ступайте в документацию по протоколу HTTP и библиотеки requests :) или на курсы digital god :)
if data is None:
data = {}
# Если запрашиваем постом, то перемещаем параметры в тело запроса.
if http_method == 'post':
# Дополняем data содержимым params
data.update(params)
params = {}
# Но в параметрах все равно передадутся базовые параметры (версия и токен)
final_params = {**common_params, **params}
# выполняем запрос, который вернет объект ответа Response
resp = requests.request(http_method, vk_call_tmpl.format(method=method), data=data, params=final_params)
# Воспользуемся ислючениями для остановки выполнения программы.
# Это позволит избежать можества дополнительных проверок дальше
if resp.status_code != 200:
print(resp.status_code, resp.text())
raise Exception('Wrong response code')
# преобразовываем текстовый json в формат данных python
result = resp.json()
# API всегда выдает ответ в {'response':, можно это сразу и обработать
if 'response' not in result:
print(resp.status_code, result)
raise Exception('No response in result')
return result['response']
Чтобы каждый раз не париться с выводом первого элемента (я это делаю для наглядности, а если выводить все, то придется очень много скроллить)
cоздадим функцию, которая выводит n
элементов из словаря, ведь в дальнейшем мы именно их будем использовать для быстрого поиска необходимых объектов
def first(mydict, amount=1):
return list(mydict.values())[:amount]
Запрашиваем аккаунты текущего пользователя, чтобы удостовериться, что все так
vk_accs = vk_req('ads.getAccounts')
account_id = vk_accs[0]['account_id']
pprint(vk_accs)
Интересующие я заранее пометил префиксом ||dg|| в интерфейсе VK, так в обработку попадут только нужные мне кампании
# Словарь, куда будем дописывать структуру кампаний
vk_camps = {}
# Отдельно создадим список с айдишками кампаний, он нам дальше пригодится
vk_camps_ids = []
# Выполняем запрос
call_params={'account_id': account_id, 'include_deleted': vk_include_deleted}
vk_camps_resp = vk_req('ads.getCampaigns', call_params)
for camp in vk_camps_resp:
if True or camp.get('name', '').startswith('||dg||'):
camp_id = str(camp['id'])
vk_camps_ids.append(camp_id)
vk_camps[camp_id] = camp
print('vk_camps_ids:', vk_camps_ids)
# вернуть только первую кампанию
pprint(vk_camps[vk_camps_ids[0]])
first(vk_camps, 2)
Они нужны чтобы достать оттуда ссылки
# В этом словаре будет вся-вся инфа по объявлениям
ads = {}
# Отдельно сохраним айдишки объявлений в списке, чтобы запросить по ним статистику
vk_ads_ids = []
# Строим запрос к методу
call_params = {
'account_id': account_id,
'include_deleted': vk_include_deleted,
'campaign_ids': json.dumps(vk_camps_ids)
}
print('vk_ads_req_params=', call_params)
# Выполняем запрос
vk_ads_resp = vk_req('ads.getAds', params=call_params)
for i, ad in enumerate(vk_ads_resp):
vk_ads_ids.append(ad['id'])
ad_id = ad['id']
ads[ad_id] = {
# Сохраняем структуру объявления
'ad': ad,
# Дописываем прямо к объявлению структуру кампании
'camp': vk_camps.get(str(ad['campaign_id']))
}
print(f'processed {i} ads')
print('---')
pprint(first(ads))
print('---')
ads_list = list(ads.values())
pprint(ads_list[1])
Только вот ссылок тут нет. Ищем дальше и находим метод вads.getAdsLayout
Как оказалось их тут тоже нет, но есть ссылки на специально созданные под эти объявления рекламные посты.
Дополнительно придется построить справочник соответсвия постов и объявлений wall_ids
. Layout объявлениея просто допишем к словарю ads
.
# Маска, по которой можно отличить, что объявление является постом
POST_PREFIX = 'http://vk.com/wall'
wall_ids = {}
call_params={
'account_id': account_id,
'include_deleted': vk_include_deleted,
'campaign_ids': json.dumps(vk_camps_ids)
}
vk_ads_lays = {}
vk_ads_lay_resp = vk_req('ads.getAdsLayout', params=call_params)
# Обходим результаты запроса
for i, ad in enumerate(vk_ads_lay_resp):
ad_id = str(ad.get('id'))
camp_id = str(ad.get('campaign_id'))
# Проверяем что у нас есть такая кампания
if not camp_id or camp_id not in vk_camps:
continue
# Проверяем что такое объявдение получено
if ad_id not in ads:
continue
# Достаем ссылку из словаря по ключу link_url
link_url = ad.get('link_url')
if link_url and link_url.startswith(POST_PREFIX):
# Находим ID записи на стене
wall_id = link_url[len(POST_PREFIX):]
# Берем список из словаря, если элемент отсутствует, то будет выдано дефолтовое значение - пустой список
wall_id_ids = wall_ids.get(wall_id, [])
wall_id_ids.append(ad_id)
# Записываем в словарь
wall_ids[wall_id] = wall_id_ids
# Сохраняем в общем справочнике объявлений
ad['wall_id'] = wall_id
ads[ad_id]['layout'] = ad
print(f'processed {i} ads')
print('---')
pprint(first(ads))
print('---')
pprint(wall_ids)
Делаем это только по подготовленной таблице соответствия постов (словарь, где ключ это id поста).
# Как и с выводом первого элемента, берем ключи словаря и преобразовываем их в список [*wall_ids.keys()]
vk_posts = {}
vk_posts_resp = vk_req('wall.getById', {'posts': ",".join([*wall_ids.keys()])})
for post in vk_posts_resp:
post_id = '{from_id}_{id}'.format(**post)
# По ранее составленному справочнику связей постов и объявлений находим соответствие
for ad_id in wall_ids[post_id]:
ads[ad_id]['post'] = post
# Ищем ссылку в аттачах
for attach in post.get('attachments'):
if attach['type'] == 'link':
link = attach['link']
# Дописываем к объявлению
ads[ad_id]['url'] = link.get('url')
print(f'processed {i} posts\n---')
pprint(post)
Как видите в посте содержится оооочень много всего. Но нас интересует лишь ссылка, ее то мы и достали
В итоге по каждому объявлению получили вот такую структуру
pprint(first(ads))
Поехали дальше. Доставать разметку
Пример разметки:
utm_source=vk&utm_medium=cpm&utm_campaign={campaign_id}&utm_content={ad_id}
# Соответствия между параметрами статистики и шаблона
PARAMS_MAP = {
'campaign_name': '{camp[name]}',
'campaign_id': '{camp[id]}',
'ad_id': '{ad[id]}'
}
# Функция, которая получает на вход ссылку с шаблоном и объявление а на выходе дает полностью сформированную ссылку
def set_params(tmpl, ad):
for prop, value in PARAMS_MAP.items():
key_map = PARAMS_MAP.get(prop, prop.lower())
value_esc = value.format(**ad)
tmpl = tmpl.replace('{'+ prop + '}', value_esc)
return tmpl
# Из ссылки достает utm_* метки.
def extract_utms(url):
parsed_url = urlparse(url)
query = parse_qs(parsed_url.query)
return {k: v[0] for k,v in query.items() if k.startswith('utm_')}
Давайте попробуем посмотреть, что делают эти функции.
tmpl = 'https://digitalgod.be/?utm_source=vk&utm_medium=cpm&utm_campaign={campaign_id}&utm_content={ad_id}'
set_params(tmpl, {'ad': {'id': 49726084}, 'camp': {'id':1010819423, 'name': 'ololo'}})
pprint(extract_utms('https://digitalgod.be/?utm_source=vk&utm_medium=cpm&utm_campaign=1010819423&utm_content=49726084'))
Как видите, мы успешно экстраполировали шаблон, и затем разобрали его при помощи функций разбора строк
stat_params = {
'period': 'day',
'account_id': account_id,
'ids_type': 'ad',
'ids': ",".join(vk_ads_ids),
'date_from': d1,
'date_to': d2,
}
print('--- params ')
pprint(stat_params)
print('---')
vk_stats = vk_req('ads.getStatistics', stat_params)
# Список под статистику, ведь статистика это набор однотипных записей неизвестного кол-ва
vk_ads_stats = []
for ad_stats in vk_stats:
# Находим объявление в заготовленном справочнике
ad_id = str(ad_stats['id'])
# Если у нас такое объявление есть
ad = ads.get(ad_id)
if not ad:
continue
# Проверяем, что у объявления имеется url
url = ad.get('url', None)
if not url:
continue
# Подставляет шаблонные параметры, если имеются
final_url = set_params(url, ad)
# Достаем utm_*
utms = extract_utms(final_url)
# Проверяем что есть хоть какая-то статистика
if not ad_stats.get('stats'):
continue
# pop - забрать из словаря (прям забрать) значение по ключу.
ad_stat = ad_stats.get('stats')
# Обходим пришедшие по объявлению данные
for day_stat in ad_stat:
# Готовим дополнение для словаря записи строки статистики
upd = {
'date': day_stat.pop('day'),
'ad_id': ad_id,
'campaign_id': ads[ad_id]['camp']['id'],
'account_id': account_id
}
# Объединяем все данные
rec = {**utms, **day_stat,**upd}
vk_ads_stats.append(rec)
pprint(vk_ads_stats[:2])
Осмотрим структуры, которые были у нас по пути
print('--- статистика одного объявления ')
pprint(ad_stat)
print('--- дополнение к одной из записей в стате объявления')
pprint(upd)
print('--- финальная версия одной записи в стате ')
pprint(rec)
Вот что получили в итоге. Статистика + метки, все что нужно!
rh.print_rows(vk_ads_stats, limit=10)
Импортируем библиотеку simplech и создаем инстанс клиента. Сразу воспользуемся асинхронной версией, чтобы потом меньше переделывать.
from simplech import ClickHouse
ch = ClickHouse()
Воспользуемся TableDiscovery из модуля simplech для автоматической генерации схемы БД
Выполняем SQL запрос
td.merge_tree(execute=True)
query = f'SELECT count() FROM {td.table}'
print('query:', query)
result = ch.select(query)
print('result:', result)
При использовании td.difference(d1, d2) и td.push нет необходимости самостоятельно вызывать ch.flush в конце, он выполнится автоматически
# в целях наглядности запишем сюда то, что отправилось на запись в БД
written = []
# инициализируем оброаботчик разницы
with td.difference(d1.format('YYYY-MM-DD'), d2.format('YYYY-MM-DD'), vk_ads_stats) as d:
# он является асинхронным генератором и его следует обойти циклом
for row in d:
# добавляем в буфер на запись
td.push(row)
# пишем себе, чтобы потом поглядеть
written.append(row)
rh.print_rows(written, limit=10)
Посмотрим что в таблице
query = 'SELECT * FROM vk_stat ORDER BY date'
rows = [r for r in ch.objects_stream(query)]
rh.print_rows(rows, limit=10)
У нас все получилось. Данные были получены и записаны в ClickHouse. Обрабатывается "дописывание" свежих данных. Многим и этого хватит а кому не хватит, пошли дальше!
Теперь надо превратить это из формата блокнота, где надо все выполнять руками в удобный набор функций, которые можно будет перенести в сервис, где они будут выполняться по расписанию. Более того, будет автоматически обрабатываться получение токена.
Заранее прошу прощения, но подробно расписывать не буду, только какие-то специфичные места. Для всего остального есть документация (ее мало, но есть) и доки питона.
Соберем вместе все, что нам понадобится. Просто соберем, тут не все сможет запуститься
import asyncio
from band import expose, cleanup, worker, settings as cfg, logger, response
import aiohttp
from collections import defaultdict
from urllib.parse import urlparse, parse_qs, quote
import json
from simplech import AsyncClickHouse
import arrow
from aiocron import crontab
vk_v = '5.92'
# Превращаем в обычный шаблон, чтобы применить параметры в момент использования
token_url_tmpl = 'https://oauth.vk.com/access_token?client_id={id}&client_secret={secret}&code={code}&redirect_uri={redir}'
# ссылка, куда отправлять для авторизации
vk_auth = f'https://oauth.vk.com/authorize?client_id={cfg.app_id}&redirect_uri={cfg.app_redir}&scope=ads,offline&response_type=code&v={vk_v}'
vk_call_tmpl = 'https://api.vk.com/method/{method}'
# Шаблон для адресов методов VK (можно узнать в документации, ссылки на которую в самом верху)
vk_call_tmpl = 'https://api.vk.com/method/{method}'
# Ставим пометку, нужно ли запрашивать удаленные элементы
vk_include_deleted = 0
# Папка с конфигами
conf_dir = ''
# Файлы с переменными окружения. Использются для конфигурации приложений
# creds_fn = f'{conf_dir}.env.vk' - в сервисах рокстат есть специальные конфиги
token_fn = f'{conf_dir}.env.vk_token'
POST_PREFIX = 'http://vk.com/wall'
# Соответствия между параметрами статистики и шаблона
params_map = {
'campaign_name': '{camp[name]}',
'campaign_id': '{camp[id]}',
'ad_id': '{ad[id]}'
}
# С редиректом тут будет отдельная история, можно сразу его обработать, не копируя ключ из строки адреса
# https://{{DOMAIN}}/vk_collect/get_code
# Тут:
# {{DOMAIN}} - домен с рокстат - подставляется автоматом
# vk_collect - название сервиса в рокстат
# get_code - название метода в сервисе
# >>> Пометка: не забыть поместить в конфиг APP_ID, APP_SECRET
common_params = {
'access_token': None,
'v': vk_v
}
# создаем клиента ClickHouse
ch = AsyncClickHouse()
app_redir: https://{{DOMAIN}}/vk_collect/get_code
app_id: {{APP_ID}}
app_secret: {{APP_SECRET}}
APP_ID=ваши данные от приложения vk
APP_SECRET=ваши данные от приложения vk
Перепишем функцию выполнения запроса к API в асинхронный формат
async def vk_req_async(method, params=None, data=None, http_method='get'):
if params is None:
params = {}
if http_method == 'post':
data = data or {}
data.update(params)
params = {}
final_params = {**common_params, **params}
url = vk_call_tmpl.format(method=method)
logger.debug('requesting', u=url, p=final_params)
async with aiohttp.ClientSession() as session:
async with session.request(http_method, url, json=data, params=final_params) as res:
if res.status != 200:
print(res.status, await res.text())
raise Exception('Wrong response code')
result = await res.json()
if 'response' not in result:
print(res.status, result)
raise Exception('No response in result')
return result['response']
Пример использования
await vk_req_async('ads.getAccounts')
[
{
'access_role': 'admin',
'account_id': 1603421955,
'account_name': 'Личный кабинет',
'account_status': 1,
'account_type': 'general'
}
]
Набор необходимых функций для хранения токена и разбора url
def save_token(access_token):
with open(token_fn, 'w') as f:
f.write(access_token)
def load_token():
try:
with open(token_fn) as f:
# read читает все содержимое файла. strip убирает переносы строк по краям
return f.read().strip()
except Exception:
logger.exception('ex')
# Функция, которая получает на вход ссылку с шаблоном и объявление и на выходе дает полностью сформированную ссылку
def set_params(url, ad):
for prop, value in params_map.items():
key_map = params_map.get(prop, prop.lower())
value_esc = value.format(**ad)
url = url.replace('{'+ key_map + '}', value_esc)
return url
# Из ссылки достает utm_* метки.
def extract_utms(url):
parsed_url = urlparse(url)
query = parse_qs(parsed_url.query)
return {k: v[0] for k,v in query.items() if k.startswith('utm_')}
async def load_data(d1, d2):
# Аккаунт
vk_accs = await vk_req_async('ads.getAccounts')
account_id = vk_accs[0]['account_id']
# Кампании
camps = {}
vk_camps_ids = []
vk_camps = await vk_req_async('ads.getCampaigns', params={'account_id': account_id, 'include_deleted': vk_include_deleted})
for camp in vk_camps:
if camp.get('name', '').startswith('||dg||'):
vk_camps_ids.append(camp['id'])
camps[camp['id']] = camp
# Немного подождем
await asyncio.sleep(0.5)
# Объявления
ads = {}
vk_ads_ids = []
vk_ads_req_params = {
'account_id': account_id,
'include_deleted': vk_include_deleted,
'campaign_ids': json.dumps(vk_camps_ids)
}
vk_ads_resp = await vk_req_async('ads.getAds', params=vk_ads_req_params)
for i, ad in enumerate(vk_ads_resp):
vk_ads_ids.append(ad['id'])
ad_id = ad['id']
ads[ad_id] = {
'ad': ad,
'camp': camps.get(ad['campaign_id'])
}
wall_ids = defaultdict(list)
al_params = {
'account_id': account_id,
'include_deleted': vk_include_deleted,
'campaign_ids': json.dumps(vk_camps_ids)
}
await asyncio.sleep(1)
vk_ads_lay = await vk_req_async('ads.getAdsLayout', params=al_params)
for i, ad in enumerate(vk_ads_lay):
ad_id = ad.get('id')
camp_id = ad.get('campaign_id')
if not camp_id or camp_id not in vk_camps_ids:
continue
if ad_id not in ads:
continue
link_url = ad.get('link_url')
if link_url and link_url.startswith(POST_PREFIX):
wall_id = link_url[len(POST_PREFIX):]
wall_ids[wall_id].append(ad_id)
ad['wall_id'] = wall_id
ads[ad_id]['layout'] = ad
await asyncio.sleep(1)
vk_posts = await vk_req_async('wall.getById', {'posts': ",".join([*wall_ids.keys()])})
for post in vk_posts:
post_id = '{from_id}_{id}'.format(**post)
for ad_id in wall_ids[post_id]:
ads[ad_id]['post'] = post
for attach in post.get('attachments'):
if attach['type'] == 'link':
link = attach['link']
ads[ad_id]['url'] = link.get('url')
# Формируем объект запроса к API
stat_params = {
'period': 'day',
'account_id': account_id,
'ids_type': 'ad',
'ids': ",".join(vk_ads_ids),
'date_from': d1.format('YYYY-MM-DD'),
'date_to': d2.format('YYYY-MM-DD'),
}
await asyncio.sleep(1)
vk_stats = await vk_req_async('ads.getStatistics', stat_params)
# Список под статистику, ведь статистика это набор однотипных записей неизвестного кол-ва
vk_ads_stats = []
for ad_stats in vk_stats:
# Находим объявление в заготовленном справочнике
ad_id = str(ad_stats['id'])
# Если у нас такое объявление есть
ad = ads.get(ad_id)
if not ad:
continue
# Проверяем, что у объявления имеется url
url = ad.get('url', None)
if not url:
continue
# Подставляет шаблонные параметры, если имеются
final_url = set_params(url, ad)
# Достаем utm_*
utms = extract_utms(final_url)
# Проверяем что есть хоть какая-то статистика
if not ad_stats.get('stats'):
continue
# pop - забрать из словаря (прям забрать) значение по ключу.
ad_stat = ad_stats.pop('stats')
# Обходим пришедшие по объявлению данные
for day_stat in ad_stat:
# Готовим дополнение для словаря записи строки статистики
upd = {
'date': day_stat.pop('day'),
'ad_id': ad_id,
'campaign_id': ads[ad_id]['camp']['id'],
'account_id': account_id
}
# Объединяем все данные
rec = {**utms, **day_stat,**upd}
vk_ads_stats.append(rec)
print(vk_ads_stats)
return vk_ads_stats
Сделаем обработчик авторизации (возврата по ссылке, с кодом для получения токена)
# @expose.handler() - помечает фунуцию обработчиком внешних запросов.
# Она будет доступна по адресу, который мы указали в vk_redir = f'https://<ваш домен>/vk_collect/get_code'
# при условии, что название сервиса будет vk_collect
@expose.handler()
async def get_code(data, **params):
# так работают обработчики в рокстат. Все входящие данные приходят в data
# а вот **params это приемник всех остальных параметров, заранее неизвестно сколько их будет
# поэтому у обработчиков запросов надо всегда добавлять словарь, куда они попадут
code = data.get('code')
token_url = token_url_tmpl.format(
id=cfg.app_id,
secret=cfg.app_secret,
code=code,
redir=cfg.app_redir)
async with aiohttp.ClientSession() as session:
async with session.get(token_url) as res:
if res.status == 200:
result = await res.json()
print(result)
common_params['access_token'] = result['access_token']
save_token(result['access_token'])
else:
print(result.status, await result.text())
Осталось дело за сохранением. Получаем описание таблицы и копируем его в код
print(td.pycode())
td_vk_stat = ch.discover('vk_stat', columns={
'utm_source': 'String',
'utm_campaign': 'String',
'utm_content': 'String',
'utm_term': 'String',
'impressions': 'Int64',
'reach': 'Int64',
'date': 'Date',
'ad_id': 'Int64',
'campaign_id': 'Int64',
'account_id': 'Int64',
'spent': 'Float64',
'clicks': 'Int64'})\
.metrics('clicks', 'reach', 'impressions', 'spent')\
.dimensions('ad_id', 'utm_content', 'account_id', 'utm_term', 'utm_campaign', 'campaign_id', 'date', 'utm_source')\
.date('date')\
.idx('account_id', 'date')
async def save_data(d1, d2, data):
await td_vk_stat.merge_tree(execute=True)
# инициализируем оброаботчик разницы
async with td_vk_stat.difference(d1, d2, data) as d:
# он является асинхронным генератором и его следует обойти циклом
async for row in d:
# добавляем в буффер на запись
td_vk_stat.push(row)
# Смотрем статистику, что у нас получилось
logger.info('diff stat', stat=d.stat)
Воркер
# @crontab('5 5 * * *') - стандартрный формат кронтаб,
# где описывается расписание запуска. Тут указано запускать 5:05 ежедневно
@crontab('5 5 * * *')
# @worker() - делает функцию "воркером", если в ней имеется бесконечный цикл. Запускает при старте
@worker()
async def work():
logger.info('auth link:', link=vk_auth)
token = load_token()
if not token:
logger.warn('authorize please')
return
print(token)
common_params['access_token'] = token
d1 = arrow.now().shift(days=-2).format('YYYY-MM-DD')
d2 = arrow.now().shift(days=-1).format('YYYY-MM-DD')
data = await load_data(d1, d2)
await save_data(d1, d2, data)
Достаем свои параметры
Будьте аккуратны, aiohttp очень быстрый, можно легко попасть на
{
'error': {
'error_code': 9,
'error_msg': 'Flood control',
'request_params': [
{'key': 'oauth', 'value': '1'},
{'key': 'method', 'value': 'ads.getAds'},
{'key': 'v', 'value': '5.92'},
{'key': 'account_id', 'value': '1603421955'},
{'key': 'include_deleted', 'value': '0'},
{'key': 'campaign_ids', 'value': '[1010819423, 1010920266]'}
]
}
}
В этом случае используйте
await asyncio.sleep(1)
длительность ожидания придется подобрать
Скринкаст полного цикла создания сервиса
Запросы:
SELECT
$timeSeries as t,
sum(impressions)
FROM $table
WHERE $timeFilter
GROUP BY t
ORDER BY t
---
SELECT
$timeSeries as t,
sum(spent)
FROM $table
WHERE $timeFilter
GROUP BY t
ORDER BY t
---
$columns(
utm_content,
sum(impressions) c)
FROM $table
---
$columns(
utm_term,
sum(impressions) c)
FROM $table
На этом все. Ждите новых серий :)