python-inema/inema/am_exchange.py

318 lines
12 KiB
Python

#!/usr/bin/python
from datetime import datetime
from zeep import Client
from zeep.wsse.username import UsernameToken
import logging
import inema
import pkg_resources
_logger = logging.getLogger(__name__)
class AmAddress(object):
"""Representing an Address"""
def __init__(self, am, street, street_number, zipcode, city, cc):
self._am = am
self._street = street
self._street_number = street_number
self._zip = zipcode
self._city = city
self._cc = cc
def to_zeep(self):
ztype = self._am.get_type_common('AddressType')
return ztype(StreetName = self._street, StreetNumber = self._street_number,
Zip = self._zip, CityName = self._city, CC = self._cc)
class AmContact(object):
"""Representing a Contact Person"""
def __init__(self, am, addr, first_name, last_name, position = None, email = None,
phone = None, fax = None):
self._am = am
self._addr = addr
self._first_name = first_name
self._last_name = last_name
self._position = position
self._email = email
self._phone = phone
self._fax = fax
def to_zeep(self):
ztype = self._am.get_type_common('ContactType')
return ztype(FirstName = self._first_name, LastName = self._last_name,
Position = self._position, Address = self._addr.to_zeep(),
Email = self._email, Phone = self._phone, Fax = self._fax)
class AmPeer(object):
def __init__(self, am, name, addr, customer_id = None, contact = None, customs_nr = None):
self._am = am
self.name = name
self.addr = addr
self.customer_id = customer_id
self.contact = contact
self.customs_nr = customs_nr
def to_cust_details(self):
"""Generate a Zeep object for CustDetailsType"""
ztype = self._am.get_type_common('CustDetailsType')
return ztype(CustID = self.customer_id, Name = self.name,
Address = self.addr.to_zeep(),
Contact = self.contact.to_zeep())
def to_cust_details_CN(self):
"""Generate a Zeep object for CustDetailsTypeWithCustomsNumber"""
ztype = self._am.get_type_common('CustDetailsTypeWithCustomsNumber')
return ztype(CustID = self.customer_id, Name = self.name,
Address = self.addr.to_zeep(),
Contact = self.contact.to_zeep(), CustomsNumber = self.customs_nr)
def to_submitter_sms(self):
"""Generate a Zeep object for SubmitterSMSType"""
ztype = self._am.get_type_common('SubmitterSMSType')
return ztype(CustID = self.customer_id, Name = self.name,
Address = self.addr.to_zeep(),
Contact = self.contact.to_zeep())
def to_consignee(self):
"""Generate a Zeep object for ConsigneeType"""
ztype = self._am.get_type_common('ConsigneeType')
return ztype(Name = self.name, CustomsNumber = self.customs_nr,
Contact = self.contact.to_zeep())
class AmAmount(object):
def __init__(self, am, amount, currency = "EUR"):
self._am = am
self.amount = amount
self.currency = currency
def to_cod_amount(self):
ztype = self._am.get_type_common('CODAmountValueType')
return ztype(value = self.amount, currency = self.currency);
def to_insured_value(self):
ztype = self._am.get_type_common('InsuredValueType')
return ztype(value = self.amount, currency = self.currency);
def to_item_total_amount(self):
ztype = self._am.get_type_common('ItemTotalAmountType')
return ztype(value = self.amount, currency = self.currency);
def to_postage_amount(self):
ztype = self._am.get_type_common('PostageAmountValueType')
return ztype(value = self.amount, currency = self.currency);
def to_total_amount(self):
ztype = self._am.get_type_common('TotalAmountType')
return ztype(value = self.amount, currency = self.currency);
class AmGWM(object):
"""Gross Weight"""
def __init__(self, am, value, unit = "kg"):
self._am = am
self.value = value
self.unit = unit
def to_zeep(self):
ztype = self._am.get_type_common('GWMType')
return ztype(value = self.value, unit = self.unit)
class AmContentPiece(object):
def __init__(self, am, qty, description, value, weight, origin_cc, hstariff = None,
weight_unit = "kg", currency = "EUR"):
self._am = am
self.qty = qty
self.description = description
self.value = value
self.weight = weight
self.origin_cc = origin_cc
self.weight_unit = weight_unit
self.currency = currency
self.hstariff = hstariff
def to_zeep(self, pos):
ztype = self._am.get_type_common('ContentPieceType')
return ztype(num = pos, qty = self.qty, desc = self.description,
value = self.value, currency = self.currency,
weight = self.weight, unit = self.weight_unit,
origin = self.origin_cc, hstariff = self.hstariff)
class AmContent(object):
def __init__(self, am, postage_amount, items = []):
self._am = am
self._postage_amount = postage_amount
self.items = items
def add_item(self, item):
_logger.info("Adding position to shipment: %s", item)
self.items.append(item)
def build_nature(self):
ztype = self._am.get_type_common('NatureOfGoodsType')
return ztype(gift = False, doc = False, sample = False, returnedGood = False, other = True)
def to_zeep(self):
pieces = []
total_value = 0.0
total_weight = 0.0
# iterate over list of items, populating pieces[] and updating totals
i = 1
for it in self.items:
pieces.append(it.to_zeep(i))
total_value += it.value
total_weight += it.weight
i += 1
ztype = self._am.get_type_common('ContentType')
return ztype(NatureOfGoods = self.build_nature(),
GWM = AmGWM(self._am, total_weight).to_zeep(),
TotalValue = AmAmount(self._am, total_value).to_item_total_amount(),
PostageAmount = AmAmount(self._am, self._postage_amount).to_postage_amount(),
ContentPiece = pieces)
class AmShipment(object):
def __init__(self, am, ship_id, consignee, content, stype = "EIZ"):
self._am = am
self._ship_id = ship_id
self._consignee = consignee
self._type = stype
self._description = None
self._content = content
def to_zeep(self):
def build_postage(self):
ztype = self._am.get_type_common('PostageType')
amount = AmAmount(self._am, self._content._postage_amount)
return ztype(Type = "PC", # Internetmarke
Amount = amount.to_postage_amount())
def build_dst(self):
ztype = self._am.get_type_common('DstType')
dst = ztype(Consignee = self._consignee.to_consignee(),
Content = self._content.to_zeep())
dst['from'] = 'bla'
return dst
def build_destination(self):
ztype = self._am.get_type_common('DestinationType')
return ztype(prec = "0", Dst = build_dst(self))
ztype = self._am.get_type_common('ShipmentCreateRequestType')
return ztype(ShipmentID = self._ship_id, Desc = self._description, Type = self._type,
Qty = 1, Postage = build_postage(self),
Destination = build_destination(self))
class AM(object):
_wsdl_url = pkg_resources.resource_filename('inema', 'data/XML-Schemas') + '/V4.4.01/Abnahme/OrderManagement.wsdl'
_next_orderid = 0
def __init__(self, username, password, prod_plant_id):
self._urn_common = '{urn:www-deutschepost-de:OrderManagement/CertificationOrderManagement/4.4/common}'
self._urn_xsd = '{http://www.w3.org/2001/XMLSchema}'
self.client = Client(self._wsdl_url, wsse=UsernameToken(username, password))
self._username = username
self._password = password
self.production_plant_id = prod_plant_id
self._submitter = None
self._originator = None
self._payer = None
def set_submitter(self, orig):
self._submitter = orig
def get_submitter(self):
return self._submitter
def set_originator(self, orig):
self._originator = orig
def get_originator(self):
return self._originator or self._submitter
def set_payer(self, payer):
self._payer = payer
def get_payer(self):
return self._payer or self._submitter
def urn_common(self, ptype):
"""Construct a URN for the given 'common' type name"""
return self._urn_common + ptype
def get_type_common(self, ptype):
"""Return a zeep type object for the given 'common' type name"""
return self.client.get_type(self.urn_common(ptype))
def get_type_xsd(self, ptype):
return self.client.get_type(self._urn_xsd + ptype)
def build_msgid(self):
ztype = self.get_type_common('MessageIdType')
now = datetime.now()
# something like "20190116082632A01234567"
return ztype(now.strftime("%Y%m%d%H%M%S") + "A" + "01234567") # FIXME
def build_ctime(self):
ztype = self.get_type_xsd('dateTime')
now = datetime.now().replace(microsecond=0)
# something like "2019-01-16T08:26:32"
return ztype(now)
def build_origin(self):
ztype = self.get_type_common('OriginType')
return ztype(SystemName = "python-inema", SystemVersion = inema.__version__, CertificationDate = "1970-01-01")
def build_hdr_req(self):
ztype = self.get_type_common('MsgHeaderRequestType')
return ztype(User = self._username, Password = self._password, MsgID = self.build_msgid(),
CreationDateTime = self.build_ctime(), Receiver = "DPAG",
SubmitterSMS = self._submitter.to_submitter_sms(), Origin = self.build_origin())
def build_cust_order_id(self):
ztype = self.get_type_common('CustOrderIDType')
now = datetime.now()
# "160119" + "A" + "000001" + "01"
sysid = now.strftime("%y%m%d") + "A" + "000001" + "01" # FIXME
return ztype(CustID = self._submitter.customer_id, SystemID = sysid)
def build_order_hdr_req(self):
ztype = self.get_type_common('OrderHeaderRequestCreateType')
return ztype(OrderType = "EA", State = "AU", Released = True,
CustOrderID = self.build_cust_order_id())
def build_payer(self):
def build_payment_means(self):
ztype = self.get_type_common('PaymentMeansType')
# "Sie sollten einen Kontrakt im Verfahren 51 haben. Den erhalten Sie bei Vertragsabschluss. Diesen
# würden Sie dann so eingeben... "
return ztype(Procedure = "51", Participation = "01")
ztype = self.get_type_common('PayerType')
payer = self.get_payer()
return ztype(CustID = payer.customer_id, Name = payer.name,
Address = payer.addr.to_zeep(),
Contact = payer.contact.to_zeep(), # Contract?
PaymentMeans = build_payment_means(self))
def build_parties(self):
ztype = self.get_type_common('PartiesType')
return ztype(Originator = self.get_originator().to_cust_details_CN(),
Submitter = self.get_submitter().to_cust_details(),
Payer = self.build_payer())
def build_induction(self):
ztype = self.get_type_common('InductionType')
return ztype(TransitDirectionCode = 'E', # Einlieferung
ProductionPlantID = self.production_plant_id,
EarliestDateTime = self.build_ctime())
def build_create_order_req(self, item):
ztype = self.get_type_common('CreateOrderRequestType')
return ztype(version = "1.0", testcase = True,
MsgHeader = self.build_hdr_req(), OrderHeader = self.build_order_hdr_req(),
Parties = self.build_parties(), Induction = self.build_induction(),
ShipmentItem = item.to_zeep())