|
1 |
| -# Copyright (C) 2023 KMEE Informatica LTDA |
2 |
| -# License AGPL-3 or later (http://www.gnu.org/licenses/agpl) |
| 1 | +# Copyright (C) 2025-Today - Engenere (<https://engenere.one>). |
| 2 | +# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl). |
| 3 | +import base64 |
| 4 | +from io import BytesIO |
3 | 5 |
|
4 |
| -import logging |
5 |
| -import re |
| 6 | +from brazilfiscalreport.danfe import Danfe |
6 | 7 |
|
7 |
| -from erpbrasil.transmissao import TransmissaoSOAP |
8 |
| -from nfelib.nfe.ws.edoc_legacy import NFeAdapter as edoc_nfe |
9 |
| -from requests import Session |
| 8 | +from odoo import _, fields, models |
| 9 | +from odoo.exceptions import UserError |
10 | 10 |
|
11 |
| -from odoo import _, api, fields, models |
12 |
| - |
13 |
| -from ..tools import utils |
14 |
| - |
15 |
| -_logger = logging.getLogger(__name__) |
| 11 | +from ..constants.dfe import ( |
| 12 | + OPERATION_TYPE, |
| 13 | + SITUACAO_NFE, |
| 14 | +) |
16 | 15 |
|
17 | 16 |
|
18 | 17 | class DFe(models.Model):
|
19 | 18 | _name = "l10n_br_fiscal.dfe"
|
| 19 | + _description = "DF-e" |
20 | 20 | _inherit = ["mail.thread", "mail.activity.mixin"]
|
21 |
| - _description = "Consult DF-e" |
22 | 21 | _order = "id desc"
|
23 | 22 | _rec_name = "display_name"
|
24 | 23 |
|
25 |
| - display_name = fields.Char(compute="_compute_display_name") |
| 24 | + dfe_access_key_id = fields.Many2one( |
| 25 | + comodel_name="l10n_br_fiscal.dfe_access_key", string="Chave de Acesso" |
| 26 | + ) |
26 | 27 |
|
27 |
| - company_id = fields.Many2one(comodel_name="res.company", string="Company") |
| 28 | + key = fields.Char(string="Access Key", size=44, related="dfe_access_key_id.key") |
28 | 29 |
|
29 |
| - version = fields.Selection(related="company_id.dfe_version") |
| 30 | + serie = fields.Char(size=3, index=True) |
30 | 31 |
|
31 |
| - environment = fields.Selection(related="company_id.dfe_environment") |
| 32 | + number = fields.Float(string="Document Number", index=True, digits=(18, 0)) |
32 | 33 |
|
33 |
| - last_nsu = fields.Char(string="Last NSU", size=25, default="0") |
| 34 | + emitter = fields.Char(size=60) |
34 | 35 |
|
35 |
| - last_query = fields.Datetime(string="Last query") |
| 36 | + cnpj_cpf = fields.Char(string="CNPJ/CPF", size=18) |
36 | 37 |
|
37 |
| - imported_document_ids = fields.One2many( |
38 |
| - comodel_name="l10n_br_fiscal.document", |
39 |
| - inverse_name="dfe_id", |
40 |
| - string="Imported Documents", |
| 38 | + nsu = fields.Char(string="NSU", size=25, index=True) |
| 39 | + |
| 40 | + operation_type = fields.Selection( |
| 41 | + selection=OPERATION_TYPE, |
41 | 42 | )
|
42 | 43 |
|
43 |
| - use_cron = fields.Boolean( |
44 |
| - default=False, |
45 |
| - string="Download new documents automatically", |
46 |
| - help="If activated, allows new manifestations to be automatically " |
47 |
| - "searched with a Cron", |
| 44 | + document_value = fields.Float( |
| 45 | + string="Document Total Value", |
| 46 | + readonly=True, |
| 47 | + digits=(18, 2), |
48 | 48 | )
|
49 | 49 |
|
50 |
| - @api.depends("company_id.name", "last_nsu") |
51 |
| - def name_get(self): |
52 |
| - return self.mapped(lambda d: (d.id, f"{d.company_id.name} - NSU: {d.last_nsu}")) |
53 |
| - |
54 |
| - @api.model |
55 |
| - def _get_processor(self): |
56 |
| - certificado = self.env.company._get_br_ecertificate() |
57 |
| - session = Session() |
58 |
| - session.verify = False |
59 |
| - return edoc_nfe( |
60 |
| - TransmissaoSOAP(certificado, session), |
61 |
| - self.company_id.state_id.ibge_code, |
62 |
| - versao=self.version, |
63 |
| - ambiente=self.environment, |
64 |
| - ) |
| 50 | + ie = fields.Char(string="Inscrição estadual", size=18) |
65 | 51 |
|
66 |
| - @api.model |
67 |
| - def validate_distribution_response(self, result): |
68 |
| - valid = False |
69 |
| - message = result.resposta.xMotivo |
70 |
| - if result.retorno.status_code != 200: |
71 |
| - code = result.retorno.status_code |
72 |
| - elif result.resposta.cStat != "138": |
73 |
| - code = result.resposta.cStat |
74 |
| - else: |
75 |
| - valid = True |
76 |
| - |
77 |
| - if not valid: |
78 |
| - self.message_post( |
79 |
| - body=_( |
80 |
| - _( |
81 |
| - "Error validating document distribution:" |
82 |
| - "\n\n%(code)s - %(message)s", |
83 |
| - code=code, |
84 |
| - message=message, |
85 |
| - ) |
86 |
| - ) |
87 |
| - ) |
| 52 | + partner_id = fields.Many2one( |
| 53 | + comodel_name="res.partner", |
| 54 | + string="Supplier (partner)", |
| 55 | + ) |
88 | 56 |
|
89 |
| - return valid |
| 57 | + company_id = fields.Many2one( |
| 58 | + comodel_name="res.company", |
| 59 | + string="Company", |
| 60 | + default=lambda self: self.env.company, |
| 61 | + readonly=True, |
| 62 | + ) |
90 | 63 |
|
91 |
| - @api.model |
92 |
| - def _document_distribution(self): |
93 |
| - maxNSU = "" |
94 |
| - while maxNSU != self.last_nsu: |
95 |
| - try: |
96 |
| - result = self._get_processor().consultar_distribuicao( |
97 |
| - cnpj_cpf=re.sub("[^0-9]", "", self.company_id.cnpj_cpf), |
98 |
| - ultimo_nsu=utils.format_nsu(self.last_nsu), |
99 |
| - ) |
100 |
| - except Exception as e: |
101 |
| - self.message_post( |
102 |
| - body=_("Error on searching documents.\n%(error)s", error=e) |
103 |
| - ) |
104 |
| - break |
| 64 | + emission_datetime = fields.Datetime( |
| 65 | + string="Emission Date", |
| 66 | + index=True, |
| 67 | + default=fields.Datetime.now, |
| 68 | + ) |
105 | 69 |
|
106 |
| - self.write( |
107 |
| - { |
108 |
| - "last_nsu": result.resposta.ultNSU, |
109 |
| - "last_query": fields.Datetime.now(), |
110 |
| - } |
111 |
| - ) |
| 70 | + inclusion_datetime = fields.Datetime( |
| 71 | + string="Inclusion Date", |
| 72 | + index=True, |
| 73 | + default=fields.Datetime.now, |
| 74 | + ) |
112 | 75 |
|
113 |
| - if not self.validate_distribution_response(result): |
114 |
| - break |
| 76 | + inclusion_mode = fields.Char(size=255) |
115 | 77 |
|
116 |
| - self._process_distribution(result) |
| 78 | + document_state = fields.Selection( |
| 79 | + selection=SITUACAO_NFE, |
| 80 | + index=True, |
| 81 | + ) |
117 | 82 |
|
118 |
| - maxNSU = result.resposta.maxNSU |
| 83 | + cfop_ids = fields.Many2many( |
| 84 | + comodel_name="l10n_br_fiscal.cfop", |
| 85 | + string="CFOPs", |
| 86 | + ) |
119 | 87 |
|
120 |
| - @api.model |
121 |
| - def _process_distribution(self, result): |
122 |
| - """Method to process the distribution data.""" |
| 88 | + dfe_nfe_document_type = fields.Selection( |
| 89 | + selection=[ |
| 90 | + ("dfe_nfe_complete", "NF-e Completa"), |
| 91 | + ("dfe_nfe_summary", "Resumo da NF-e"), |
| 92 | + ("dfe_nfe_event", "Evento da NF-e"), |
| 93 | + ], |
| 94 | + string="DFe Document Type", |
| 95 | + ) |
123 | 96 |
|
124 |
| - @api.model |
125 |
| - def _parse_xml_document(self, document): |
126 |
| - schema_type = document.schema.split("_")[0] |
127 |
| - method = "parse_%s" % schema_type |
128 |
| - if not hasattr(self, method): |
129 |
| - return |
| 97 | + dfe_monitor_id = fields.Many2one( |
| 98 | + comodel_name="l10n_br_fiscal.dfe_monitor", |
| 99 | + string="DFe Monitor", |
| 100 | + ) |
130 | 101 |
|
131 |
| - xml = utils.parse_gzip_xml(document.valueOf_) |
132 |
| - return getattr(self, method)(xml) |
| 102 | + attachment_id = fields.Many2one(comodel_name="ir.attachment") |
133 | 103 |
|
134 |
| - @api.model |
135 |
| - def _download_document(self, nfe_key): |
136 |
| - try: |
137 |
| - result = self._get_processor().consultar_distribuicao( |
138 |
| - chave=nfe_key, cnpj_cpf=re.sub("[^0-9]", "", self.company_id.cnpj_cpf) |
| 104 | + document_id = fields.Many2one( |
| 105 | + comodel_name="l10n_br_fiscal.document", |
| 106 | + string="Fiscal Document", |
| 107 | + ) |
| 108 | + |
| 109 | + def name_get(self): |
| 110 | + result = [] |
| 111 | + for rec in self: |
| 112 | + document_type = dict(rec._fields["dfe_nfe_document_type"].selection).get( |
| 113 | + rec.dfe_nfe_document_type |
139 | 114 | )
|
| 115 | + result.append( |
| 116 | + ( |
| 117 | + rec.id, |
| 118 | + f"{rec.key} - {document_type}", |
| 119 | + ) |
| 120 | + ) |
| 121 | + return result |
| 122 | + |
| 123 | + def create_xml_attachment(self, xml): |
| 124 | + file_name = "NFe%s.xml" % self.key |
| 125 | + self.attachment_id = self.env["ir.attachment"].create( |
| 126 | + { |
| 127 | + "name": file_name, |
| 128 | + "datas": base64.b64encode(xml), |
| 129 | + "store_fname": file_name, |
| 130 | + "description": "NFe via Manifesto", |
| 131 | + "res_model": self._name, |
| 132 | + "res_id": self.id, |
| 133 | + } |
| 134 | + ) |
| 135 | + |
| 136 | + def action_download_xml(self): |
| 137 | + if len(self) == 1: |
| 138 | + return self.download_attachment(self.attachment_id) |
| 139 | + |
| 140 | + compressed_attachment_id = ( |
| 141 | + self.env["l10n_br_fiscal.attachment"] |
| 142 | + .create([]) |
| 143 | + .build_compressed_attachment(self.mapped("attachment_id")) |
| 144 | + ) |
| 145 | + return self.download_attachment(compressed_attachment_id) |
| 146 | + |
| 147 | + def download_attachment(self, attachment_id): |
| 148 | + return { |
| 149 | + "type": "ir.actions.act_url", |
| 150 | + "url": ( |
| 151 | + f"/web/content/{attachment_id.id}" |
| 152 | + f"/{attachment_id.name}?download=true" |
| 153 | + ), |
| 154 | + "target": "self", |
| 155 | + } |
| 156 | + |
| 157 | + def import_document(self): |
| 158 | + self.ensure_one() |
| 159 | + try: |
| 160 | + document = self.dfe_monitor_id._download_document(self.key) |
| 161 | + document_id = self.dfe_monitor_id._parse_xml_document(document) |
140 | 162 | except Exception as e:
|
141 | 163 | self.message_post(
|
142 |
| - body=_("Error on searching documents.\n%(error)s", error=e) |
| 164 | + body=_("Error importing document: \n\n %(error)s", error=e) |
143 | 165 | )
|
144 | 166 | return
|
| 167 | + if document_id: |
| 168 | + document_id = self.id |
| 169 | + self.document_id = document_id |
| 170 | + |
| 171 | + def import_document_multi(self): |
| 172 | + for rec in self: |
| 173 | + rec.import_document() |
| 174 | + |
| 175 | + def make_pdf(self): |
| 176 | + if self.dfe_nfe_document_type != "dfe_nfe_complete": |
| 177 | + raise UserError(_("Can only generate DANFE when DF-e is complete.")) |
| 178 | + nfe_xml = base64.b64decode(self.attachment_id.datas) |
| 179 | + |
| 180 | + danfe = Danfe(xml=nfe_xml) |
| 181 | + |
| 182 | + tmpDanfe = BytesIO() |
| 183 | + danfe.output(tmpDanfe) |
| 184 | + danfe_file = tmpDanfe.getvalue() |
| 185 | + tmpDanfe.close() |
| 186 | + |
| 187 | + pdf_attachment = self.env["ir.attachment"].create( |
| 188 | + { |
| 189 | + "name": "DANFE.pdf", |
| 190 | + "type": "binary", |
| 191 | + "datas": base64.b64encode(danfe_file), |
| 192 | + "res_model": self._name, |
| 193 | + "res_id": self.id, |
| 194 | + "mimetype": "application/pdf", |
| 195 | + } |
| 196 | + ) |
145 | 197 |
|
146 |
| - if not self.validate_distribution_response(result): |
147 |
| - return |
148 |
| - |
149 |
| - return result.resposta.loteDistDFeInt.docZip[0] |
150 |
| - |
151 |
| - @api.model |
152 |
| - def _cron_search_documents(self): |
153 |
| - self.search([("use_cron", "=", True)]).search_documents() |
154 |
| - |
155 |
| - def search_documents(self): |
156 |
| - for record in self: |
157 |
| - record._document_distribution() |
| 198 | + return { |
| 199 | + "type": "ir.actions.act_url", |
| 200 | + "url": f"/web/content/{pdf_attachment.id}?download=true", |
| 201 | + "target": "self", |
| 202 | + } |
0 commit comments