Skip to content

Data Sources

Models

Pydantic models for representing OpenAIRE Data Source entities and related structures.

This module defines the Pydantic model for an OpenAIRE Data Source, including nested models for controlled vocabulary fields and type literals for restricted string values based on the OpenAIRE data model documentation. Reference: https://graph.openaire.eu/docs/data-model/entities/data-source

DataSourceResponse = ApiResponse[DataSource] module-attribute

Type alias for an API response containing a list of DataSource entities.

ControlledField

Bases: BaseModel

Represents a field with a controlled vocabulary, typically including a scheme and a value.

This model is used for structured data elements where the value has a specific meaning defined by an associated scheme (e.g., a PID like DOI, or a subject classification from a specific thesaurus).

Attributes:

Name Type Description
scheme str | None

The scheme or system defining the context of the value (e.g., "doi", "orcid", "mesh").

value str | None

The actual value from the controlled vocabulary.

Source code in src/aireloom/models/data_source.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
class ControlledField(BaseModel):
    """Represents a field with a controlled vocabulary, typically including a scheme and a value.

    This model is used for structured data elements where the value has a specific
    meaning defined by an associated scheme (e.g., a PID like DOI, or a subject
    classification from a specific thesaurus).

    Attributes:
        scheme: The scheme or system defining the context of the value (e.g., "doi", "orcid", "mesh").
        value: The actual value from the controlled vocabulary.
    """

    scheme: str | None = None
    value: str | None = None

    model_config = ConfigDict(extra="allow")

DataSource

Bases: BaseEntity

Model representing an OpenAIRE Data Source entity.

A data source in OpenAIRE can be a repository, journal, aggregator, etc. This model captures various metadata fields associated with a data source.

Source code in src/aireloom/models/data_source.py
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
class DataSource(BaseEntity):
    """Model representing an OpenAIRE Data Source entity.

    A data source in OpenAIRE can be a repository, journal, aggregator, etc.
    This model captures various metadata fields associated with a data source.
    """

    originalIds: list[str] | None = Field(default_factory=list)
    pids: list[ControlledField] | None = Field(default_factory=list)
    type: ControlledField | None = None
    openaireCompatibility: str | None = None
    officialName: str | None = None
    englishName: str | None = None
    websiteUrl: str | None = None
    logoUrl: str | None = None
    dateOfValidation: str | None = None
    description: str | None = None
    subjects: list[str] | None = Field(default_factory=list)
    languages: list[str] | None = Field(default_factory=list)
    contentTypes: list[str] | None = Field(default_factory=list)
    releaseStartDate: str | None = None
    releaseEndDate: str | None = None
    accessRights: AccessRightType | None = None
    uploadRights: AccessRightType | None = None
    databaseAccessRestriction: DatabaseRestrictionType | None = None
    dataUploadRestriction: str | None = None
    versioning: bool | None = None
    citationGuidelineUrl: str | None = None
    pidSystems: str | None = None
    certificates: str | None = None
    policies: list[str] | None = Field(default_factory=list)
    missionStatementUrl: str | None = None
    # Added based on documentation/analysis
    journal: Container | None = None

    model_config = ConfigDict(extra="allow")

Filters

Bases: BaseModel

Filter model for Data Sources API endpoint. Represents an author of a research product.

Attributes:

Name Type Description
search str | None

Search term for the data source.

officialName str | None

Official name of the data source.

englishName str | None

English name of the data source.

legalShortName str | None

Legal short name of the data source.

id str | None

OpenAIRE id for the data source.

pid str | None

Persistent identifier for the data source.

subjects list[str] | None

List of subjects associated with the data source.

dataSourceTypeName str | None

Type name of the data source.

contentTypes list[str] | None

List of content types available in the data source.

openaireCompatibility str | None

Compatibility status with OpenAIRE standards.

relOrganizationId str | None

Related organization ID.

relCommunityId str | None

Related community ID.

relCollectedFromDatasourceId str | None

ID of the datasource from which this was collected.

Source code in src/aireloom/endpoints.py
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
class DataSourcesFilters(BaseModel):
    """Filter model for Data Sources API endpoint.
    Represents an author of a research product.

    Attributes:
        search (str | None): Search term for the data source.
        officialName (str | None): Official name of the data source.
        englishName (str | None): English name of the data source.
        legalShortName (str | None): Legal short name of the data source.
        id (str | None): OpenAIRE id for the data source.
        pid (str | None): Persistent identifier for the data source.
        subjects (list[str] | None): List of subjects associated with the data source.
        dataSourceTypeName (str | None): Type name of the data source.
        contentTypes (list[str] | None): List of content types available in the data source.
        openaireCompatibility (str | None): Compatibility status with OpenAIRE standards.
        relOrganizationId (str | None): Related organization ID.
        relCommunityId (str | None): Related community ID.
        relCollectedFromDatasourceId (str | None): ID of the datasource from which this was collected.
    """

    search: str | None = None
    officialName: str | None = None
    englishName: str | None = None
    legalShortName: str | None = None
    id: str | None = None
    pid: str | None = None
    subjects: list[str] | None = None
    dataSourceTypeName: str | None = None
    contentTypes: list[str] | None = None
    openaireCompatibility: str | None = None
    relOrganizationId: str | None = None
    relCommunityId: str | None = None
    relCollectedFromDatasourceId: str | None = None

    model_config = ConfigDict(extra="forbid")

Client

Client for interacting with the OpenAIRE Data Sources API endpoint.

This module provides the DataSourcesClient for accessing OpenAIRE's data source information. Like the OrganizationsClient, it currently uses custom implementations for get, search, and iterate operations.

DataSourcesClient

Bases: BaseResourceClient

Client for the OpenAIRE Data Sources API endpoint.

This client allows interaction with OpenAIRE's data source entities, offering methods for retrieval (get), searching (search), and iteration (iterate). It currently employs custom logic for these operations.

Attributes:

Name Type Description
_entity_path str

The API path for data sources.

_entity_model type[DataSource]

Pydantic model for a single data source.

_response_model type[DataSourceResponse]

Pydantic model for the search response envelope.

_endpoint_def dict

Configuration for this endpoint from ENDPOINT_DEFINITIONS.

_valid_sort_fields set[str]

Valid sort fields for this endpoint.

Source code in src/aireloom/resources/data_sources_client.py
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
class DataSourcesClient(BaseResourceClient):
    """Client for the OpenAIRE Data Sources API endpoint.

    This client allows interaction with OpenAIRE's data source entities,
    offering methods for retrieval (`get`), searching (`search`), and iteration
    (`iterate`). It currently employs custom logic for these operations.

    Attributes:
        _entity_path (str): The API path for data sources.
        _entity_model (type[DataSource]): Pydantic model for a single data source.
        _response_model (type[DataSourceResponse]): Pydantic model for the
                                                     search response envelope.
        _endpoint_def (dict): Configuration for this endpoint from `ENDPOINT_DEFINITIONS`.
        _valid_sort_fields (set[str]): Valid sort fields for this endpoint.
    """

    _entity_path: str = DATA_SOURCES
    _entity_model: type[DataSource] = DataSource
    _response_model: type[DataSourceResponse] = DataSourceResponse

    def __init__(self, api_client: "AireloomClient"):
        """Initializes the DataSourcesClient.

        Args:
            api_client: An instance of AireloomClient.
        """
        super().__init__(api_client)
        if self._entity_path not in ENDPOINT_DEFINITIONS:
            raise ValueError(
                f"Missing endpoint definition for entity path: {self._entity_path}"
            )
        self._endpoint_def = ENDPOINT_DEFINITIONS[self._entity_path]
        self._valid_sort_fields = self._endpoint_def.get(
            "sort", {}
        ).keys()  # Get sort fields
        logger.debug(f"DataSourcesClient initialized for path: {self._entity_path}")

    # _validate_filters and _validate_and_convert_filter_value are removed as Pydantic handles this.

    def _validate_sort(self, sort_by: str | None) -> None:
        """Validates the sort field against endpoint definitions."""
        if not sort_by:
            return

        if not self._valid_sort_fields:
            logger.warning(
                f"Sort field '{sort_by}' provided for {self._entity_path}, "
                "but no sort fields are defined. Ignoring sort."
            )
            return
        sort_field_name = sort_by.split()[0]
        if sort_field_name not in self._valid_sort_fields:
            raise ValidationError(
                f"Invalid sort field for {self._entity_path}: '{sort_field_name}'. "
                f"Valid fields: {list(self._valid_sort_fields)}"
            )

    def _build_params(
        self,
        page: int | None,
        page_size: int,
        sort_by: str | None,
        filters: dict[str, Any] | None,  # Changed to Optional[dict]
        *,
        is_iteration: bool = False,
    ) -> dict[str, Any]:
        """Builds the query parameter dictionary."""
        params: dict[str, Any] = {"pageSize": page_size}
        if is_iteration:
            params["cursor"] = "*"
        elif page is not None:
            params["page"] = page
        if sort_by:
            params["sortBy"] = sort_by
        if filters:
            params.update(filters)
        return {k: v for k, v in params.items() if v is not None}

    async def _fetch_single_entity_impl(self, entity_id: str) -> DataSource:
        """Generic method to fetch a single entity by ID using search-by-ID."""
        try:
            # Use search with ID parameter instead of direct GET
            params = {"id": entity_id, "pageSize": 1}
            response = await self._api_client.request(
                "GET", self._entity_path, params=params, data=None, json_data=None
            )
            data = response.json()

            # Parse the search response
            search_response = self._response_model.model_validate(data)

            if not search_response.results:
                raise BibliofabricError(
                    f"{self._entity_model.__name__} with ID '{entity_id}' not found."
                )

            # Return the first (and should be only) result
            return search_response.results[0]

        except httpx.HTTPStatusError as e:
            logger.error(
                f"HTTPStatusError for {self._entity_model.__name__} ID '{entity_id}': {e.response.status_code}"
            )
            raise BibliofabricError(
                f"API error fetching {self._entity_model.__name__} {entity_id}: "
                f"Status {e.response.status_code}"
            ) from e
        except Exception as e:
            if isinstance(e, BibliofabricError):
                raise
            logger.exception(
                f"Failed to fetch {self._entity_model.__name__} {entity_id} from {self._entity_path}"
            )
            raise BibliofabricError(
                f"Unexpected error fetching {self._entity_model.__name__} {entity_id}: {e}"
            ) from e

    async def _search_entities_impl(self, params: dict[str, Any]) -> DataSourceResponse:
        """Generic method to search for entities."""
        try:
            response = await self._api_client.request(
                "GET", self._entity_path, params=params, data=None, json_data=None
            )
            return self._response_model.model_validate(response.json())
        except Exception as e:
            if isinstance(e, BibliofabricError | ValidationError):
                raise
            logger.exception(
                f"Failed to search {self._entity_path} with params {params}"
            )
            raise BibliofabricError(
                f"Unexpected error searching {self._entity_path}: {e}"
            ) from e

    async def _iterate_entities_impl(
        self, params: dict[str, Any]
    ) -> AsyncIterator[DataSource]:
        """Generic method to iterate through all results using cursor pagination."""
        current_params = params.copy()
        while True:
            try:
                logger.debug(
                    f"Iterating {self._entity_path} with params: {current_params}"
                )
                response = await self._api_client.request(
                    "GET",
                    self._entity_path,
                    params=current_params,
                    data=None,
                    json_data=None,
                )
                data = response.json()
                api_response = ApiResponse[self._entity_model].model_validate(data)
                if not api_response.results:
                    logger.debug(
                        f"No more results for {self._entity_path}, stopping iteration."
                    )
                    break
                for result in api_response.results:
                    yield result
                next_cursor = api_response.header.nextCursor
                if not next_cursor:
                    logger.debug(
                        f"No nextCursor for {self._entity_path}, stopping iteration."
                    )
                    break
                current_params["cursor"] = next_cursor
                current_params.pop("page", None)
            except Exception as e:
                if isinstance(e, BibliofabricError | ValidationError):
                    raise
                logger.exception(
                    f"Failed during iteration of {self._entity_path} with params {current_params}"
                )
                raise BibliofabricError(
                    f"Unexpected error during iteration of {self._entity_path}: {e}"
                ) from e

    async def get(self, source_id: str) -> DataSource:
        """Retrieves a single Data Source by its ID.

        Args:
            source_id: The ID of the data source.

        Returns:
            A DataSource object.
        """
        logger.info(f"Fetching data source with ID: {source_id}")
        return await self._fetch_single_entity_impl(source_id)

    async def search(
        self,
        page: int = 1,
        page_size: int = DEFAULT_PAGE_SIZE,
        sort_by: str | None = None,
        filters: DataSourcesFilters | None = None,  # Changed to Pydantic model
    ) -> DataSourceResponse:
        """Searches for Data Sources.

        Args:
            page: Page number (1-indexed).
            page_size: Number of results per page.
            sort_by: Field to sort by.
            filters: An instance of DataSourcesFilters with filter criteria.

        Returns:
            A DataSourceResponse object.
        """
        filter_dict = (
            filters.model_dump(exclude_none=True, by_alias=True) if filters else {}
        )
        logger.info(
            f"Searching data sources: page={page}, size={page_size}, sort='{sort_by}', "
            f"filters={filter_dict}"
        )
        # self._validate_filters is removed
        self._validate_sort(sort_by)
        params = self._build_params(
            page=page, page_size=page_size, sort_by=sort_by, filters=filter_dict
        )
        return await self._search_entities_impl(params)

    async def iterate(
        self,
        page_size: int = 100,
        sort_by: str | None = None,
        filters: DataSourcesFilters | None = None,  # Changed to Pydantic model
    ) -> AsyncIterator[DataSource]:
        """Iterates through all Data Source results.

        Args:
            page_size: Number of results per page during iteration.
            sort_by: Field to sort by.
            filters: An instance of DataSourcesFilters with filter criteria.

        Yields:
            DataSource objects.
        """
        filter_dict = (
            filters.model_dump(exclude_none=True, by_alias=True) if filters else {}
        )
        logger.info(
            f"Iterating data sources: size={page_size}, sort='{sort_by}', "
            f"filters={filter_dict}"
        )
        # self._validate_filters is removed
        self._validate_sort(sort_by)
        params = self._build_params(
            page=None,
            page_size=page_size,
            sort_by=sort_by,
            filters=filter_dict,
            is_iteration=True,
        )
        async for item in self._iterate_entities_impl(params):
            yield item

__init__(api_client)

Initializes the DataSourcesClient.

Parameters:

Name Type Description Default
api_client AireloomClient

An instance of AireloomClient.

required
Source code in src/aireloom/resources/data_sources_client.py
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
def __init__(self, api_client: "AireloomClient"):
    """Initializes the DataSourcesClient.

    Args:
        api_client: An instance of AireloomClient.
    """
    super().__init__(api_client)
    if self._entity_path not in ENDPOINT_DEFINITIONS:
        raise ValueError(
            f"Missing endpoint definition for entity path: {self._entity_path}"
        )
    self._endpoint_def = ENDPOINT_DEFINITIONS[self._entity_path]
    self._valid_sort_fields = self._endpoint_def.get(
        "sort", {}
    ).keys()  # Get sort fields
    logger.debug(f"DataSourcesClient initialized for path: {self._entity_path}")

get(source_id) async

Retrieves a single Data Source by its ID.

Parameters:

Name Type Description Default
source_id str

The ID of the data source.

required

Returns:

Type Description
DataSource

A DataSource object.

Source code in src/aireloom/resources/data_sources_client.py
211
212
213
214
215
216
217
218
219
220
221
async def get(self, source_id: str) -> DataSource:
    """Retrieves a single Data Source by its ID.

    Args:
        source_id: The ID of the data source.

    Returns:
        A DataSource object.
    """
    logger.info(f"Fetching data source with ID: {source_id}")
    return await self._fetch_single_entity_impl(source_id)

iterate(page_size=100, sort_by=None, filters=None) async

Iterates through all Data Source results.

Parameters:

Name Type Description Default
page_size int

Number of results per page during iteration.

100
sort_by str | None

Field to sort by.

None
filters DataSourcesFilters | None

An instance of DataSourcesFilters with filter criteria.

None

Yields:

Type Description
AsyncIterator[DataSource]

DataSource objects.

Source code in src/aireloom/resources/data_sources_client.py
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
async def iterate(
    self,
    page_size: int = 100,
    sort_by: str | None = None,
    filters: DataSourcesFilters | None = None,  # Changed to Pydantic model
) -> AsyncIterator[DataSource]:
    """Iterates through all Data Source results.

    Args:
        page_size: Number of results per page during iteration.
        sort_by: Field to sort by.
        filters: An instance of DataSourcesFilters with filter criteria.

    Yields:
        DataSource objects.
    """
    filter_dict = (
        filters.model_dump(exclude_none=True, by_alias=True) if filters else {}
    )
    logger.info(
        f"Iterating data sources: size={page_size}, sort='{sort_by}', "
        f"filters={filter_dict}"
    )
    # self._validate_filters is removed
    self._validate_sort(sort_by)
    params = self._build_params(
        page=None,
        page_size=page_size,
        sort_by=sort_by,
        filters=filter_dict,
        is_iteration=True,
    )
    async for item in self._iterate_entities_impl(params):
        yield item

search(page=1, page_size=DEFAULT_PAGE_SIZE, sort_by=None, filters=None) async

Searches for Data Sources.

Parameters:

Name Type Description Default
page int

Page number (1-indexed).

1
page_size int

Number of results per page.

DEFAULT_PAGE_SIZE
sort_by str | None

Field to sort by.

None
filters DataSourcesFilters | None

An instance of DataSourcesFilters with filter criteria.

None

Returns:

Type Description
DataSourceResponse

A DataSourceResponse object.

Source code in src/aireloom/resources/data_sources_client.py
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
async def search(
    self,
    page: int = 1,
    page_size: int = DEFAULT_PAGE_SIZE,
    sort_by: str | None = None,
    filters: DataSourcesFilters | None = None,  # Changed to Pydantic model
) -> DataSourceResponse:
    """Searches for Data Sources.

    Args:
        page: Page number (1-indexed).
        page_size: Number of results per page.
        sort_by: Field to sort by.
        filters: An instance of DataSourcesFilters with filter criteria.

    Returns:
        A DataSourceResponse object.
    """
    filter_dict = (
        filters.model_dump(exclude_none=True, by_alias=True) if filters else {}
    )
    logger.info(
        f"Searching data sources: page={page}, size={page_size}, sort='{sort_by}', "
        f"filters={filter_dict}"
    )
    # self._validate_filters is removed
    self._validate_sort(sort_by)
    params = self._build_params(
        page=page, page_size=page_size, sort_by=sort_by, filters=filter_dict
    )
    return await self._search_entities_impl(params)