from dataclasses import dataclass
from typing import TYPE_CHECKING, Dict, List, Literal, Optional, Tuple, Iterable
from .models.actions.catalogs.catalogAdded import Marker
from .models.actions.catalogs.catalogAdded import Model as CatalogAdded
from .models.actions.catalogs.catalogDeleted import Model as CatalogDeleted
from .models.actions.catalogs.catalogUpdated import Model as CatalogUpdated
from .tinyid import tinyid
from .vec3 import SkyCoord
from .window import Window
MarkerType = Literal['asterisk', 'circle', 'circledHollowAsterisk', 'circledHollowPlus', 'circledHollowX', 'diamond', 'dot', 'hollowAsterisk', 'hollowPlus', 'hollowX', 'pentagon', 'plus', 'square', 'triangle', 'x']
Color = List[float]
V3 = List[float]
if TYPE_CHECKING: # pragma: no cover
from pandas import DataFrame as pdDataFrame
else:
pdDataFrame = None
[docs]
@dataclass
class CatalogManager:
_w: Window
@property
def members(self) -> List['Catalog']:
self._w.sync()
return [Catalog(c['id'], self._w) for c in self._w._store_state['catalogs']['catalogs']]
[docs]
def clear(self) -> None:
for catalog in self.members:
catalog.delete()
def _new(
self,
*,
name: Optional[str] = None,
table: Dict[str, List[str]],
xyz: List[V3],
color: Optional[List[Optional[Color]]] = None,
type: Optional[Optional[List[MarkerType]]] = None,
base_color: Optional[Color] = None,
default_marker_type: MarkerType = 'circle',
open_catalog_table=False,
) -> 'Catalog':
id = tinyid()
fields = [*table.keys()]
name = name or f'Catalog {len(self.members) + 1}'
# attributesの全ての値が同じ長さであることを確認
for values in table.values():
if len(values) != len(xyz): # pragma: no cover
raise ValueError('All attributes must have the same length as coords')
# colors, typesも同じ長さであることを確認
if color is not None and len(color) != len(xyz): # pragma: no cover
raise ValueError('All colors must have the same length as coords')
if type is not None and len(type) != len(xyz): # pragma: no cover
raise ValueError('All types must have the same length as coords')
markers: List[Marker] = [
Marker(
position=xyz[i],
color=color[i] if color else None,
type=type[i] if type else None,
)
for i in range(len(xyz))
]
attributes = [[table[f][i] for f in table.keys()] for i in range(len(xyz))]
action = CatalogAdded(
type='catalogs/catalogAdded',
payload={
'openDialog': open_catalog_table,
'params': {
'id': id,
'name': name,
'attributes': attributes,
'baseColor': base_color,
'defaultType': default_marker_type,
'fields': fields,
'markers': markers,
},
},
)
self._w._dispatch(action)
return Catalog(id, self._w)
[docs]
def from_pandas(
self,
df: pdDataFrame,
*,
name: Optional[str] = None,
coord_column: Optional[Tuple[str, str]] = None,
color_column: Optional[str] = None,
type_column: Optional[str] = None,
base_color: Optional[Color] = None,
default_marker_type: MarkerType = 'circle',
open_catalog_table=False,
) -> 'Catalog':
w = self._w
if coord_column is None: # pragma: no branch
coord_column = ('ra', 'dec')
ras = df[coord_column[0]]
decs = df[coord_column[1]]
xyz = [
SkyCoord(
w._angle_input(ra),
w._angle_input(dec),
)
.as_vec3()
.as_list()
for ra, dec in zip(ras, decs)
]
color = df[color_column].tolist() if color_column else None
type = df[type_column].tolist() if type_column else None
table = {k: [str(value) for value in df[k].tolist()] for k in df.columns}
return self._new(
name=name,
table=table,
xyz=xyz,
color=color,
type=type,
base_color=base_color,
default_marker_type=default_marker_type,
open_catalog_table=open_catalog_table,
)
[docs]
def new(
self,
ra: List[float],
dec: List[float],
*,
name: Optional[str] = None,
color: Optional[List[Optional[Color]]] = None,
type: Optional[Optional[List[MarkerType]]] = None,
base_color: Optional[Color] = None,
default_marker_type: MarkerType = 'circle',
open_catalog_table=False,
) -> 'Catalog':
# Check that the length of ra and dec is the same
if len(ra) != len(dec): # pragma: no cover
raise ValueError('ra and dec must have the same length')
return self._new(
name=name,
table={},
xyz=[SkyCoord(self._w._angle_input(r), self._w._angle_input(d)).as_vec3().as_list() for r, d in zip(ra, dec)],
color=color,
type=type,
base_color=base_color,
default_marker_type=default_marker_type,
open_catalog_table=open_catalog_table,
)
[docs]
@dataclass
class Catalog:
id: str
_w: Window
[docs]
def delete(self) -> None:
self._w.sync()
self._w._dispatch(
CatalogDeleted(
type='catalogs/catalogDeleted',
payload={'id': self.id},
)
)
def _sync(self):
self._w.sync()
def _state(self):
self._sync()
for catalog in self._w._store_state['catalogs']['catalogs']:
if catalog['id'] == self.id: # pragma: no branch
return catalog
raise ValueError(f'Catalog {self.id} not found') # pragma: no cover
def _update(self, **kwargs):
self._w._dispatch(
CatalogUpdated(
type='catalogs/catalogUpdated',
payload={**kwargs, 'id': self.id}, # type: ignore
)
)
@property
def name(self) -> str:
self._sync()
return self._state()['name']
@name.setter
def name(self, new_name: str) -> None:
self._update(name=new_name)
@property
def color(self) -> Color:
self._sync()
return self._state()['baseColor']
@color.setter
def color(self, new_color: Color) -> None:
self._update(baseColor=new_color)
@property
def marker(self) -> MarkerType:
self._sync()
return self._state()['defaultType']
@marker.setter
def marker(self, new_marker: MarkerType) -> None:
self._update(defaultType=new_marker)
@property
def column_names(self) -> List[str]:
self._sync()
return self._state()['fields']
@property
def visible(self) -> bool:
self._sync()
return self._state()['visible']
@visible.setter
def visible(self, new_visible: bool) -> None:
self._update(visible=new_visible)
@property
def selected_indices(self) -> List[int]:
self._sync()
return [*map(int, self._state()['selectedRecords'].keys())]
@selected_indices.setter
def selected_indices(self, indicse: Iterable[int]) -> None:
# TODO: OPIMIZE
from hscmap.models.actions.catalogs.recordSelected import Model as RecordSelected
def toggle(index: int, selected: bool):
self._w._dispatch(
RecordSelected(
type='catalogs/recordSelected',
payload={
'id': self.id,
'index': index,
'selected': selected,
},
)
)
current = set(self.selected_indices)
new = set(indicse)
for index in current - new:
toggle(index, False)
for index in new - current:
toggle(index, True)
[docs]
def as_pandas_dataframe(self) -> 'pdDataFrame':
import pandas
fields = self._state()['fields']
rows = self._state()['attributes']
return pandas.DataFrame(rows, columns=fields)
[docs]
def sample_pandas_data(n_rows: int = 500):
import numpy
import pandas
# pandasでサンプル用のカタログデータを作る
# カラムは ID, ra, dec, mag
# 行数はn_rows
# 内容は適当に乱数で埋める
# 乱数の範囲は
# ID: [0, nrows)
# ra: [-0.5, 0.5]
# dec: [-0.5, 0.5]
# mag: [15, 25]
# とする
# w.catalogs.from_pandas(df) でカタログを作成できるようにする
df = pandas.DataFrame(
{
'ID': range(n_rows),
'ra': (numpy.random.rand(n_rows) - 0.5) * 1,
'dec': (numpy.random.rand(n_rows) - 0.5) * 1,
'mag': (numpy.random.rand(n_rows) * 10) + 15,
}
)
return df