-
-
Notifications
You must be signed in to change notification settings - Fork 808
/
Copy pathsql_export.py
188 lines (173 loc) · 6.96 KB
/
sql_export.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
# Copyright 2019 Akretion
# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl.html).
from datetime import datetime, timedelta
from odoo import SUPERUSER_ID, _, api, fields, models
from odoo.exceptions import UserError
class SqlExport(models.Model):
_inherit = "sql.export"
mail_user_ids = fields.Many2many(
"res.users",
"mail_user_sqlquery_rel",
"sql_id",
"user_id",
"User to notify",
help="Add the users who want to receive the report by e-mail. You "
"need to link the sql query with a cron to send mail automatically",
)
mail_partner_ids = fields.Many2many(
"res.partner",
"mail_partner_sqlquery_rel",
"sql_id",
"partner_id",
help="Add the partners who wants to receive the report by e-mail. You "
"need to link the sql query with a cron to send a mail automatically",
)
cron_ids = fields.Many2many(
"ir.cron",
"cron_sqlquery_rel",
"sql_id",
"cron_id",
"Crons",
groups="base.group_system",
)
# We could implement other conditions, that is why it is a selection field
mail_condition = fields.Selection(
[("not_empty", "File Not Empty")], default="not_empty"
)
def _prepare_cron_mail(self):
self.ensure_one()
return {
"active": True,
"model_id": self.env.ref("sql_export.model_sql_export").id,
"state": "code",
"code": "model._run_all_sql_export_for_cron()",
"name": "SQL Export : %s" % self.name,
"nextcall": datetime.now() + timedelta(hours=2),
"doall": False,
"numbercall": -1,
"user_id": SUPERUSER_ID,
}
def create_cron(self):
self.ensure_one()
cron = self.env["ir.cron"].create(self._prepare_cron_mail())
# We need to pass cron_id in the cron args because a cron is not
# aware of itself in the end method and we need it to find all
# linked sql exports
write_vals = {"code": "model._run_all_sql_export_for_cron([%s])" % cron.id}
cron.write(write_vals)
self.write({"cron_ids": [(4, cron.id)]})
def send_mail(self, params=None):
self.ensure_one()
params = params or {}
mail_template = self.env.ref("sql_export_mail.sql_export_mailer")
attach_obj = self.env["ir.attachment"]
if self.mail_condition == "not_empty":
res = self._execute_sql_request(params=params, mode="fetchone")
if not res:
return
wizard = self.env["sql.file.wizard"].create(
{
"sql_export_id": self.id,
}
)
if "user_id" in params:
wizard = wizard.with_context(force_user=params["user_id"])
if "company_id" in params:
wizard = wizard.with_context(force_company=params["company_id"])
wizard.export_sql()
binary = wizard.binary_file
filename = wizard.file_name
msg_id = mail_template.send_mail(self.id, force_send=False)
mail = self.env["mail.mail"].browse(msg_id)
attach_vals = {
"name": filename,
"datas": binary,
"res_model": "mail.mail",
"res_id": mail.id,
}
attachment = attach_obj.create(attach_vals)
mail.write({"attachment_ids": [(4, attachment.id)]})
@api.model
def _run_all_sql_export_for_cron(self, cron_ids):
exports = self.search(
[("cron_ids", "in", cron_ids), ("state", "=", "sql_valid")]
)
for export in exports:
if "%(company_id)s" in export.query and "%(user_id)s" not in export.query:
variable_dict = {}
companies = self.env["res.company"].search([])
for company in companies:
users = export.mail_user_ids.filtered(
lambda u: u.company_id == company
)
if users:
variable_dict["company_id"] = users[0].company_id.id
export.with_context(mail_to=users.ids).send_mail(
params=variable_dict
)
elif "%(user_id)s" in export.query:
variable_dict = {}
for user in export.mail_user_ids:
variable_dict["user_id"] = user.id
if "%(company_id)s" in export.query:
variable_dict["company_id"] = user.company_id.id
export.with_context(mail_to=[user.id]).send_mail(
params=variable_dict
)
else:
export.send_mail()
@api.constrains("query_properties_definition", "mail_user_ids")
def check_no_parameter_if_sent_by_mail(self):
for export in self:
if export.query_properties_definition and export.mail_user_ids:
raise UserError(
_(
"It is not possible to execute and send a query "
"automatically by mail if there are parameters to fill"
)
)
@api.constrains("mail_user_ids")
def check_mail_user(self):
for export in self:
for user in export.mail_user_ids:
if not user.email:
raise UserError(_("The user does not have any e-mail address."))
@api.constrains("mail_partner_ids", "query")
def _check_mail_partner(self):
for export in self:
if export.mail_partner_ids and (
"%(company_id)s" in export.query or "%(user_id)s" in export.query
):
raise UserError(
_(
"A query that uses the company_id or user_id parameter "
"cannot be directly sent to a partner."
)
)
missing_email_partners = export.mail_partner_ids.filtered(
lambda partner: not partner.email
)
if missing_email_partners:
raise UserError(
_(
"Missing email address for partner(s): %(names)s",
names=", ".join(missing_email_partners.mapped("name")),
)
)
def get_email_address_for_template(self):
"""
Called from mail template.
Collects email addresses from both users and partners.
"""
self.ensure_one()
if self.env.context.get("mail_to"):
mail_users = self.env["res.users"].browse(self.env.context.get("mail_to"))
mail_partners = self.env["res.partner"]
else:
mail_users = self.mail_user_ids
mail_partners = self.mail_partner_ids
email_addresses = set(
mail_users.mapped("email") + mail_partners.mapped("email")
)
email_addresses.discard(False)
return ",".join(email_addresses)