Complex EntityID mapping

In one of our projects, we needed ACL (Anti-Corruption Layer) for external entity-id and internal. Generally mapping one to another but based on the complex external id that differs depending on the platform that it came from. Below you have examples of these.

@dataclass(frozen=True)
class AmazonId:
    asin: Text
    sku: Text
    site: Text
    merchant_id: Text
    asdict = asdict


@dataclass(frozen=True)
class CDiscountId:
    sku: Text
    user_id: int
    asdict = asdict


@dataclass(frozen=True)
class EbayId:
    item_id: Text
    sku: Text
    asdict = asdict


Identity = Union[AmazonId, CDiscountId, EbayId]

Table for mappings

To store these mappings we decided to use the JSON field in DB, but for filtering and comparisons, we decided to use a binary field that will store the MD5 value from this JSON. The table will look like below.

class Platform(Enum):
    AMAZON = 'Amazon'
    EBAY = 'eBay'
    CDISCOUNT = 'CDiscount'


sa.Table('mappings', metadata,
    id = sa.Column(sa.BigInteger, primary_key=True)
    platform = sa.Column(sa.Enum(Platform), nullable=False)
    identity = sa.Column(sa.JSON, nullable=False)
    digest = sa.Column(sa.VARBINARY(16), nullable=False)
)

Mapping model

I always want to hide the persistence implementation. So Mapping model should hide all these additional columns and present pure mapping of internal id to platform identity.

To achieve it I’m using the hybrid property with a comparator. With comparator, I can query this table based on identity, but use MD5 for it in DB. Also, the hybrid property allows me to pack and unpack identity-based proper columns (platform and JSON).

class Mapping(Base):
    __tablename__ = 'mappings'
    __table_args__ = (
        sa.UniqueConstraint('platform', 'digest', name='platform_identity'),
    )

    class Platform(Enum):
        AMAZON = 'Amazon'
        EBAY = 'eBay'
        CDISCOUNT = 'CDiscount'

    id = sa.Column(sa.BigInteger, primary_key=True)
    _platform = sa.Column('platform', sa.Enum(Platform), nullable=False)
    _dict = sa.Column('identity', sa.JSON, nullable=False)
    _digest = sa.Column('digest', sa.VARBINARY(16), nullable=False)

    @classmethod
    def get_platform(cls, identity: Identity) -> Platform:
        if isinstance(identity, AmazonId):
            return Mapping.Platform.AMAZON
        elif isinstance(identity, CDiscountId):
            return Mapping.Platform.CDISCOUNT
        elif isinstance(identity, EbayId):
            return Mapping.Platform.EBAY
        raise NotImplementedError(identity)

    @classmethod
    def digest(cls, identity: Identity) -> bytes:
        identity_json = json.dumps(
            identity.asdict(), sort_keys=True,
        ).encode("utf-8")
        return hashlib.md5(identity_json).digest()

    @hybrid_property
    def identity(self) -> Identity:
        if self._platform == Mapping.Platform.AMAZON:
            return AmazonId(**self._dict)
        elif self._platform == Mapping.Platform.CDISCOUNT:
            return CDiscountId(**self._dict)
        elif self._platform == Mapping.Platform.EBAY:
            return EbayId(**self._dict)
        else:
            raise NotImplementedError(self._platform)

    @identity.setter
    def identity(self, value: Identity) -> None:
        self._platform = self.get_platform(value)
        self._dict = value.asdict()
        self._digest = self.digest(value)

    @identity.comparator
    def identity(self) -> Mapping.IdentityComparator:
        return Mapping.IdentityComparator(self._digest)

    class IdentityComparator(Comparator):
        def __eq__(self, other: Identity) -> bool:
            other_digest = Mapping.digest(other)
            return self.__clause_element__() == other_digest

ACL service

Finally service which operates only on identity and internal id. Simple and ignoring all persistency implementation.

class Acl:
    def __init__(self, session: Session) -> None:
        self._session = session

    def add(self, mapped_id: int, identity: Identity) -> None:
        model = Mapping(id=mapped_id, identity=identity)
        self._session.add(model)
        self._session.flush()

    def get_id(self, identity: Identity) -> Optional[int]:
        mapping = self._session.query(Mapping).filter_by(
            identity=identity,
        ).one_or_none()
        return mapping and mapping.id

    def get_identity(self, mapped_id: int) -> List[Identity]:
        query = self._session.query(Mapping).filter(
            Mapping.id == mapped_id,
        )
        return [mapping.identity for mapping in query]

Full code with tests you can find here.

close

Hi there 👋
It’s nice to meet you.

Sign up to join my mailing list.

I don’t spam!