-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathweibo_tiny.py
132 lines (104 loc) · 3.73 KB
/
weibo_tiny.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
# -*- coding: utf-8 -*-
"""Python sina weibo sdk.
https://github.com/lxyu/weibo
Rely on `requests` to do the dirty work, so it's much simpler and cleaner
than the official SDK.
For more info, refer to:
http://lxyu.github.io/weibo/
"""
from __future__ import absolute_import
try:
from urllib.parse import urlencode
except ImportError:
from urllib import urlencode
import json
import time
import requests
class Client(object):
def __init__(self, api_key, api_secret, redirect_uri, token=None,
username=None, password=None):
# const define
self.site = 'https://api.weibo.com/'
self.authorization_url = self.site + 'oauth2/authorize'
self.token_url = self.site + 'oauth2/access_token'
self.api_url = self.site + '2/'
# init basic info
self.client_id = api_key
self.client_secret = api_secret
self.redirect_uri = redirect_uri
self.uid = None
self.token = None
self.session = requests.session()
if username and password:
self.session.auth = username, password
# activate client directly if given token
if token:
self.set_token(token)
@property
def authorize_url(self):
params = {
'client_id': self.client_id,
'redirect_uri': self.redirect_uri
}
return "{0}?{1}".format(self.authorization_url, urlencode(params))
@property
def alive(self):
if self.expires_at:
return self.expires_at > time.time()
else:
return False
def set_code(self, authorization_code):
"""Activate client by authorization_code.
"""
params = {
'client_id': self.client_id,
'client_secret': self.client_secret,
'grant_type': 'authorization_code',
'code': authorization_code,
'redirect_uri': self.redirect_uri
}
res = requests.post(self.token_url, data=params)
token = json.loads(res.text)
self._assert_error(token)
token[u'expires_at'] = int(time.time()) + int(token.pop(u'expires_in'))
self.set_token(token)
def set_token(self, token):
"""Directly activate client by access_token.
"""
self.token = token
self.uid = token['uid']
self.access_token = token['access_token']
self.expires_at = token['expires_at']
self.session.params = {'access_token': self.access_token}
def _assert_error(self, d):
"""Assert if json response is error.
"""
if 'error_code' in d and 'error' in d:
raise RuntimeError("{0} {1}".format(
d.get("error_code", ""), d.get("error", "")))
def get(self, uri, **kwargs):
"""Request resource by get method.
"""
url = "{0}{1}.json".format(self.api_url, uri)
# for username/password client auth
if self.session.auth:
kwargs['source'] = self.client_id
res = json.loads(self.session.get(url, params=kwargs).text)
self._assert_error(res)
return res
def post(self, uri, **kwargs):
"""Request resource by post method.
"""
url = "{0}{1}.json".format(self.api_url, uri)
# for username/password client auth
if self.session.auth:
kwargs['source'] = self.client_id
if "pic" not in kwargs:
res = json.loads(self.session.post(url, data=kwargs).text)
else:
files = {"pic": kwargs.pop("pic")}
res = json.loads(self.session.post(url,
data=kwargs,
files=files).text)
self._assert_error(res)
return res